A more complex and comprehensive interface that provides many additional methods is ExecutorService. This is an enriched version of Executor. Java comes with a fully-fledged implementation of ExecutorService, named ThreadPoolExecutor. This is a thread pool that can be instantiated with a bunch of arguments, as follows:
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Here is a short description of each of the arguments instantiated in the preceding code:
- corePoolSize: The number of threads to keep in the pool, even if they are idle (unless allowCoreThreadTimeOut is set)
- maximumPoolSize: The maximum number of allowed threads
- keepAliveTime: When this time has elapsed, the idle threads will be removed from the pool (these are idle threads that exceed corePoolSize)
- unit: The time unit for the keepAliveTime argument
- workQueue: A queue for holding the instances of Runnable (only the Runnable tasks submitted by the execute() method) before they are executed
- threadFactory: This factory is used when the executor creates a new thread
- handler: When ThreadPoolExecutor cannot execute a Runnable due to saturation, this is when the thread bounds and queue capacities are full (for example, workQueue has a fixed size and maximumPoolSize is set as well)—it gives the control and decision to this handler
In order to optimize the pool size, we need to collect the following information:
- Number of CPUs (Runtime.getRuntime().availableProcessors())
- Target CPU utilization (in range, [0, 1])
- Wait time (W)
- Compute time (C)
The following formula helps us to determine the optimal size of the pool:
Number of threads
= Number of CPUs * Target CPU utilization * (1 + W/C)
Let's see an example of ThreadPoolExecutor:
public class SimpleThreadPoolExecutor implements Runnable {
private final int taskId;
public SimpleThreadPoolExecutor(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
Thread.sleep(2000);
System.out.println("Executing task " + taskId
+ " via " + Thread.currentThread().getName());
}
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
final AtomicInteger counter = new AtomicInteger();
ThreadFactory threadFactory = (Runnable r) -> {
System.out.println("Creating a new Cool-Thread-"
+ counter.incrementAndGet());
return new Thread(r, "Cool-Thread-" + counter.get());
};
RejectedExecutionHandler rejectedHandler
= (Runnable r, ThreadPoolExecutor executor) -> {
if (r instanceof SimpleThreadPoolExecutor) {
SimpleThreadPoolExecutor task=(SimpleThreadPoolExecutor) r;
System.out.println("Rejecting task " + task.taskId);
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 1,
TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);
for (int i = 0; i < 50; i++) {
executor.execute(new SimpleThreadPoolExecutor(i));
}
executor.shutdown();
executor.awaitTermination(
Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
The main() method fires 50 instances of Runnable. Each Runnable sleeps for two seconds and prints a message. The work queue is limited to five instances of Runnable—the core threads to 10, the maximum number of threads to 20, and the idle timeout to one second. A possible output will look as follows:
Creating a new Cool-Thread-1
...
Creating a new Cool-Thread-20
Rejecting task 25
...
Rejecting task 49
Executing task 22 via Cool-Thread-18
...
Executing task 12 via Cool-Thread-2