207. Invoking multiple Callable tasks

Since the producers (checkers) don't work at the same time with the consumers (packers), we can just simulate their work via a for that adds 100 checked bulbs in a queue:

private static final BlockingQueue<String> queue 
= new LinkedBlockingQueue<>();
...
private static void simulatingProducers() {

for (int i = 0; i < MAX_PROD_BULBS; i++) {
queue.offer("bulb-" + rnd.nextInt(1000));
}
}

Now, the consumers must pack each bulb and return it. This means that the Consumer is a Callable:

private static class Consumer implements Callable {

@Override
public String call() throws InterruptedException {
String bulb = queue.poll();

Thread.sleep(100);

if (bulb != null) {
logger.info(() -> "Packed: " + bulb + " by consumer: "
+ Thread.currentThread().getName());

return bulb;
}

return "";
}
}

But remember that we should submit all Callable tasks and wait for all of them to complete. This can be achieved via the ExecutorService.invokeAll() method. This method takes a collection of tasks (Collection<? extends Callable<T>>) and returns a list of instances of Future (List<Future<T>>as an argument. Any call to Future.get() will be blocked until all the instances of Future are complete.

So, first we create a list of 100 tasks:

private static final Consumer consumer = new Consumer();
...
List<Callable<String>> tasks = new ArrayList<>();
for (int i = 0; i < queue.size(); i++) {
tasks.add(consumer);
}

Further, we execute all these tasks and get the list of Future:

private static ExecutorService consumerService
= Executors.newWorkStealingPool();
...
List<Future<String>> futures = consumerService.invokeAll(tasks);

Finally, we process (in this case, display) the results:

for (Future<String> future: futures) {
String bulb = future.get();
logger.info(() -> "Future done: " + bulb);
}

Notice that the first call to the future.get() statement blocks until all the instances of Future are complete. This will lead to the following output:

[12:06:41] [INFO] Packed: bulb-595 by consumer: ForkJoinPool-1-worker-9
...
[12:06:42] [INFO] Packed: bulb-478 by consumer: ForkJoinPool-1-worker-15
[12:06:43] [INFO] Future done: bulb-595
...

Sometimes, we want to submit several tasks and wait for any one of them to complete. This can be achieved via ExecutorService.invokeAny(). Exactly like invokeAll(), this method gets as an argument a collection of tasks (Collection<? extends Callable<T>>). But it returns the result of the fastest task (not a Future) and cancels all other tasks that have not completed yet, for example:

String bulb = consumerService.invokeAny(tasks);

If you don't want to wait for all Future to finish, proceed as follows:

int queueSize = queue.size();
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < queueSize; i++) {
futures.add(consumerService.submit(consumer));
}

for (Future<String> future: futures) {
String bulb = future.get();
logger.info(() -> "Future done: " + bulb);
}

This will not block until all tasks are done. Take a look at the following output sample:

[12:08:56] [INFO ] Packed: bulb-894 by consumer: ForkJoinPool-1-worker-7
[12:08:56] [INFO ] Future done: bulb-894
[12:08:56] [INFO ] Packed: bulb-953 by consumer: ForkJoinPool-1-worker-5
...
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset