Java-并发.06c.JUC-执行器-Fork/Join框架

Fork/Join 框架

Java 在 JDK 7 之后加入了并行计算的框架 Fork/Join,可以解决我们系统中大数据计算的性能问题。Fork/Join 采用的是分治法,Fork 是将一个大任务拆分成若干个子任务,子任务分别去计算,而 Join 是获取到子任务的计算结果,然后合并,这个是递归的过程。子任务被分配到不同的核上执行时,效率最高。

Fork/Join 框架的核心是 ForkJoinPool (类似 ExecuteService ) 会给线程池中的线程分发任务,不同之处在于,ForkJoinPool 将一个大任务拆分为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。

同时,为了最大限度地提高并行处理能力,采用了工作窃取算法来运行任务,也就是说当某个线程处理完自己工作队列中的任务后,尝试当其他线程的工作队列中窃取一个任务来执行,直到所有任务处理完毕。

ForkJoinTask 是一个抽象类,有两个实现子类,RecursiveTask(有返回值)和 RecursiveAction(无返回结果),我们自己定义任务时,只需选择这两个类继承即可。

java.lang.Object
java.util.concurrent.ForkJoinTask<V>
java.util.concurrent.RecursiveTask<V>

继承 RecursiveTaskRecursiveAction 类必须实现 compute() 方法,在这个方法里要实现递归控制条件。

compute() 的实现通常为:

if (任务足够小){
直接执行该任务;
}else{
将任务一分为二;
Fork执行这两个任务;
Join等待结果;
}

下面是一个计算数组之和的 Fork/Join 例子:

public class CJForkJoinTask extends RecursiveTask<Integer>
{
// 要计算和的数组
private long[] array;
private int low;
private int high;

@Override
protected Integer compute() {
int sum = 0;
if (high - low <= THRESHOLD) {
// 小于阈值则直接计算
} else {
// 一个大任务分割成两个子任务
int mid = (low + high) >>> 1;
CJForkJoinTask left = new CJForkJoinTask(array, low, mid);
CJForkJoinTask right = new CJForkJoinTask(array, mid + 1, high);

left.fork();
right.fork();

sum = left.join() + right.join();
}
return sum;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 生成大数组:
long[] array = genArray(1000000);

// 创建Fork/Join任务:
CJForkJoinTask CJForkJoinTask = new CJForkJoinTask(array, 0, array.length - 1);

// 创建Fork/Join线程池:
ForkJoinPool forkJoinPool = new ForkJoinPool();

// 提交任务到线程池:
forkJoinPool.submit(CJForkJoinTask);

// 获取结果:
Integer result = CJForkJoinTask.get();
}
}