Java并发 - Fork/Join框架介绍
Fork/Join 框架
Fork/Join 框架是 JAVA7 中提供的用来并行执行任务的框架,和 MapReduce 的原理类似,都是通过将大任务拆分为小任务来实现并行计算,主要是利用分治法的思想来实现多任务并行计算。
Fork/Join 框架创建的任务需要通过 ForkJoinPool 来启动,ForkJoinPool 是一个线程池,比较特殊的是:其线程数量是根据 CPU 的核心数来设置的。而 ForkJoinPool 是通过工作窃取(work-stealing)算法来提高 CPU 的利用率的。
工作窃取算法介绍
每个线程中维护了一个双端队列来存储所需要执行的任务,而工作窃取算法允许从其他线程的双端队列中窃取一个最晚的任务来执行,这样可以避免和当前任务所属的线程发生竞争。
如下图所示,Thread2 从 Thread1 队列中拿出最晚的 Task1 来执行,Thread1 则拿出 Task2 来执行,这样就会避免发生竞争。
工作窃取算法优点:
- 充分利用线程进行并行计算
- 减少了线程间的竞争
工作窃取算法缺点:
- 在某些情况下会存在竞争(双端队列中只有一个任务)
- 消耗了更多的系统资源
Fork/Join 框架基础类
- ForkJoinPool:启动 Fork/Join 任务,用来执行 Task。或生成新的 ForkJoinWorkerThread,执行 ForkJoinWorkerThread 间的 work-stealing 逻辑。ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。
- ForkJoinTask:执行具体的分支逻辑,声明以同步/异步方式进行执行
- ForkJoinWorkerThread:是 ForkJoinPool 内的 worker thread,执行 ForkJoinTask,内部有 ForkJoinPool.WorkQueue 来保存要执行的 ForkJoinTask。
- ForkJoinPool.WorkQueue:保存要执行的ForkJoinTask。
Fork/Join 框架执行流程
- ForkJoinPool 的每个工作线程都维护着一个双端工作队列(WorkQueue),队列中存放着是任务(ForkJoinTask)。
- 每个工作线程在运行中产生新的任务(调用 fork())时,放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
- 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
- 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
示例代码
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class ForkJoinExample extends RecursiveTask<Integer> {
// 任务拆分阀值
private final int threshold = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last){
this.first = first;
this.last = last;
}
@Override
protected Integer compute() {
int result = 0;
if(last - first <= threshold){
// 任务足够小则直接计算
for(int i = first; i<= last; i++){
result += i;
}
}else{
// 二分拆分为小任务
int middle = first + (last-first)/2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle+1, last);
// 拆分进行计算
leftTask.fork();
rightTask.fork();
// 合并拆分计算的结果
result = leftTask.join()+rightTask.join();
}
return result;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 100);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}
}
补充
1.ForkJoinPool 使用 submit 与 invoke 提交的区别?
- invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码。
- submit 是异步执行,只有在 Future 调用 get 的时候会阻塞。
2.继承 RecursiveTask 与 RecursiveAction的区别?
- 继承 RecursiveTask:适用于有返回值的场景。
- 继承 RecursiveAction:适合于没有返回值的场景。
3.子任务调用 fork 与 invokeAll 的区别?
- fork:让子线程自己去完成任务,父线程监督子线程执行,浪费父线程。
- invokeAll:子父线程共同完成任务,可以更好的利用线程池。