Using CountedCompleter

CountedCompleter is a type of ForkJoinTask added in JDK 8.

The job of CountedCompleter is to remember the pending task count (nothing less, nothing more). We can set the pending count via setPendingCount() or increment it with an explicit delta via addToPendingCount​(int delta). Commonly, we call these methods right before forking (for example, if we fork twice, then we call addToPendingCount(2) or setPendingCount(2), depending on the case).

In the compute() method, we decrease the pending count via tryComplete() or propagateCompletion(). When the tryComplete() method is called, with a pending count of zero, or the unconditional complete() method is called, the onCompletion() method is called. The propagateCompletion() method is similar with tryComplete(), but it doesn't call onCompletion().

CountedCompleter can optionally return a computed value. For this, we have to override the getRawResult() method to return a value.

The following code sums up all the values of a list via CountedCompleter:

public class SumCountedCompleter extends CountedCompleter<Long> {

private static final Logger logger
= Logger.getLogger(SumCountedCompleter.class.getName());
private static final int THRESHOLD = 10;
private static final LongAdder sumAll = new LongAdder();

private final List<Integer> worklist;

public SumCountedCompleter(
CountedCompleter<Long> c, List<Integer> worklist) {
super(c);
this.worklist = worklist;
}

@Override
public void compute() {
if (worklist.size() <= THRESHOLD) {
partialSum(worklist);
} else {
int size = worklist.size();

List<Integer> worklistLeft
= worklist.subList(0, (size + 1) / 2);
List<Integer> worklistRight
= worklist.subList((size + 1) / 2, size);

addToPendingCount(2);
SumCountedCompleter leftTask
= new SumCountedCompleter(this, worklistLeft);
SumCountedCompleter rightTask
= new SumCountedCompleter(this, worklistRight);

leftTask.fork();
rightTask.fork();
}

tryComplete();
}

@Override
public void onCompletion(CountedCompleter<?> caller) {
logger.info(() -> "Thread complete: "
+ Thread.currentThread().getName());
}

@Override
public Long getRawResult() {
return sumAll.sum();
}

private Integer partialSum(List<Integer> worklist) {
int sum = worklist.stream()
.mapToInt(e -> e)
.sum();

sumAll.add(sum);

logger.info(() -> "Partial sum: " + worklist + " = "
+ sum + " Thread: " + Thread.currentThread().getName());

return sum;
}
}

Now, let's see a potential call and output:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Random rnd = new Random();
List<Integer> list = new ArrayList<>();

for (int i = 0; i < 200; i++) {
list.add(1 + rnd.nextInt(10));
}

SumCountedCompleter sumCountedCompleter
= new SumCountedCompleter(null, list);
forkJoinPool.invoke(sumCountedCompleter);

logger.info(() -> "Done! Result: "
+ sumCountedCompleter.getRawResult());

The output will be as follows:

[11:11:07] Partial sum: [7, 7, 8, 5, 6, 10] = 43
ForkJoinPool.commonPool-worker-7
[11:11:07] Partial sum: [9, 1, 1, 6, 1, 2] = 20
ForkJoinPool.commonPool-worker-3
...
[11:11:07] Thread complete: ForkJoinPool.commonPool-worker-15
[11:11:07] Done! Result: 1159
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset