Java8之并行数据处理与性能
第7章 并行数据处理与性能
1、将顺序流转换为并行流
1)parallel方法:
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
- 转换成并行流之后的图解
2) 当然也可以转回来-sequential方法
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
并行流内部使用了默认的ForkJoinPool(7.2节会进一步讲到分支/合并框架),它默认的 线程数量就是你的处理器数量,这个值是由 Runtime.getRuntime().availableProcessors()得到的。 但是你可以通过系统属性 java.util.concurrent.ForkJoinPool.common. parallelism来改变线程池大小,如下所示: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); 这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个 并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值。
* 并行处理的性能-流的数据源和可分解性
按照可分解性总结了一些流数据源适不适于并行。
- 调并行流背后使用的基础架构是Java 7中引入的分支/合并框架。
2、分支/合并框架-Java7的框架
> 分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任 务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给 线程池(称为ForkJoinPool)中的工作线程。
1) 使用 RecursiveTask
要把任务提交到这个池,必须创建RecursiveTask的一个子类,其中R是并行化任务(以 及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型(当 然它可能会更新其他非局部机构)。
- 要定义RecursiveTask,只需实现它唯一的抽象方法 compute:
protected abstract R compute();
上面的方法实现大概如下:
if (任务足够小或不可分) {
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}
- Java7的框架暂不做记录,本篇将专注Java8
3、Spliterator接口
Spliterator是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行 而设计的。
Java 8已经为集合框架中包含的所有数据结构提供了一个 默认的Spliterator实现。集合实现了Spliterator接口,接口提供了一个spliterator方法。 这个接口定义了若干方法,如下面的代码清单所示。
- Spliterator接口
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
1) 拆分过程
将Stream拆分成多个部分的算法是一个递归过程,第一步是对第一个 Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用 trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit 直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过 程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null。