在执行一个大任务时,可以将其 fork 为多个小任务,最后 join 得到结果。 fork 任务主要是要设置一个阀值 THRESHOLD,不宜过小,也不宜过大,否则起不到提升性能的作用。
package com.learn.corejava.threading;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.function.DoublePredicate;
public class ForkJoinMain {
public static void main(String[] args) {
final int SIZE = 10000000;
double[] numbers = new double[SIZE];
for (int i = 0; i < SIZE; i++) {
numbers[i] = Math.random();
}
Countr countr = new Countr(numbers, 0, numbers.length, x -> x > 0.5);
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.invoke(countr);
System.out.println(countr.join());
}
}
class Countr extends RecursiveTask<Integer> {
public static final int THRESHOLD = 1000;
private double[] values;
private int from;
private int to;
private DoublePredicate filter;
public Countr(double[] values, int from, int to, DoublePredicate filter) {
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}
@Override
protected Integer compute() {
if (to - from < THRESHOLD) {
int count = 0;
for (int i = from; i < to; i++) {
if (filter.test(values[i])) {
count++;
}
}
return count;
} else {
int mid = (from + to) / 2;
Countr first = new Countr(values, from, mid, filter);
Countr second = new Countr(values, mid, to, filter);
invokeAll(first, second); // 其它小的任务也提交到池
return first.join() + second.join();
}
}
}