概念 JDK1.7引入了一种新的并行编程模式”fork-join”,它是实现了”分而治之”思想的Java并发编程框架。它对问题的解决思路是分而治之,先将一个问题fork(分为)几个子问题,然后子问题又分为孙子问题,直至细分为一个容易计算的问题,然后再将结果依次join(结合)为最终的答案。对程序员来说,叫递归思想更加合适。只不过普通的递归是在单线程中完成的,而这里的递归则把递归任务通过invokeAll()方法丢进了线程池中,让线程池来调度执行。
ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好
特点 在运行线程时,它使用“work-steal”(任务偷取)算法。一般来说,fork-join会启动多个线程(由参数指定,若不指定则默认为CPU核心数量),每个线程负责一个任务队列,并依次从队列头部获得任务并执行。当某个线程空闲时,它会从其他线程的任务队列尾部偷取一个任务来执行,这样就保证了线程的运行效率达到最高
它面向的问题域是可以大量并行执行的计算任务,其计算对象最好是一些独立的元素,不会被其他线程访问,也没有同步、互斥要求,更不要涉及IO或者无限循环。当然此框架也可以执行普通的并发编程任务,但是这时就失去了性能优势
工具类
ForkJoinTask:fork-join的任务抽象类,同时也是Future接口,并提供了fork和join方法
fork() 在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务
join() 当任务完成的时候返回计算结果。
invoke() 开始执行任务,如果必要,等待计算完成
invokeAll() 提交多个forkJoinTasks到ForkJoinPool的便捷方式
ForkJoinPool: fork-join的线程池,所有的ForkJoinTask任务都必须在其中运行,主要使用invoke()、invokeAll()等方法来执行任务, 当然也可以使用原有的execute()和submit()方法
RecursiveAction: ForkJoinTask的具体实现类,用于没有返回值的任务
RecursiveTask: ForkJoinTask的具体实现类,用于有返回值的任务
使用 常用方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 //实例化 ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); //任务提交 //方式1: submit()或execute() forkJoinPool.execute(ForkJoinTask<?> task); int result = customRecursiveTask.join(); //方式2: invoke()或者invokeAll() //invoke()方法拆分任务并等待结果,并且不需要任何手动join //invokeAll()方法是提交多个forkJoinTasks到ForkJoinPool的便捷方式。 //它将任务作为参数,forks它们将按照生成它们的顺序返回Future对象的集合 int result = forkJoinPool.invoke(ForkJoinTask<?> task); //方式3:使用单独的fork()和join() //fork()方法将任务提交到一个线程池中,但它不会触发它的执行。 //join()方法被用于触发执行。在RecursiveAction的情况下,join()只返回null ; 对于RecursiveTask <V>,它返回任务执行的结果 forkJoinTask.fork(); result = forkJoinTask.join();
为避免混淆,使用invokeAll()方法向ForkJoinPool提交多个任务通常是个好主意
示例 计算1至1000的正整数之和
fork-join的效率跟CPU的核数有直接关系,不同性能机器,测试结果会不一样
1 2 3 4 5 6 7 public interface Calculator { long sumUp (long [] numbers) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; private static class SumTask extends RecursiveTask <Long> { private long [] numbers; private int from; private int to; public SumTask (long [] numbers, int from, int to) { this .numbers = numbers; this .from = from; this .to = to; } @Override protected Long compute () { if (to - from < 6 ) { long total = 0 ; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; } else { int middle = (from + to) / 2 ; SumTask taskLeft = new SumTask (numbers, from, middle); SumTask taskRight = new SumTask (numbers, middle+1 , to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } public ForkJoinCalculator () { pool = new ForkJoinPool (); } @Override public long sumUp (long [] numbers) { return pool.invoke(new SumTask (numbers, 0 , numbers.length-1 )); } }
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Test { public static void main (String[] args) { long [] numbers = LongStream.rangeClosed(1 , 1000 ).toArray(); Calculator forkJoinCalculator = new ExecutorServiceCalculator (); System.out.println(forkJoinCalculator.sumUp(numbers)); } }
注意要点
需使用合理的阈值将ForkJoinTask拆分为子任务
避免在 ForkJoinTask中出现任何阻塞
ForkJoinTask在执行的时候可能会抛出异常,在主线程中是无法直接获取的,但是可以通过ForkJoinTask提供的isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了
Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。