skip(long n);
...
```
这些都是操作流的`中间操作`,它们的返回结果必须是流对象本身。
### **关闭流操作** ###
BaseStream 实现了 `AutoCloseable` 接口,也就是 `close()` 方法会在流关闭时被调用。同时,`BaseStream` 中还给我们提供了`onClose()`方法:
```
/** * Returns an equivalent stream with an additional close handler. Close * handlers are run when the {@link #close()} method * is called on the stream, and are executed in the order they were * added. All close handlers are run, even if earlier close handlers throw * exceptions. If any close handler throws an exception, the first * exception thrown will be relayed to the caller of {@code close()}, with * any remaining exceptions added to that exception as suppressed exceptions * (unless one of the remaining exceptions is the same exception as the * first exception, since an exception cannot suppress itself.) May * return itself. * * This is an intermediate * operation. * * @param closeHandler A task to execute when the stream is closed * @return a stream with a handler that is run if the stream is closed */
S onClose(Runnable closeHandler);
```
当`AutoCloseable`的`close()`接口被调用的时候会触发调用流对象的`onClose()`方法,但有几点需要注意:
* `onClose()` 方法会返回流对象本身,也就是说可以对改对象进行多次调用
* 如果调用了多个`onClose()` 方法,它会按照调用的顺序触发,但是如果某个方法有异常则只会向上抛出第一个异常
* 前一个 `onClose()` 方法抛出了异常不会影响后续 `onClose()` 方法的使用
* 如果多个 `onClose()` 方法都抛出异常,只展示第一个异常的堆栈,而其他异常会被压缩,只展示部分信息
### **并行流和串行流** ###
`BaseStream`接口中分别提供了`并行流`和`串行流`两个方法,`这两个方法可以任意调用若干次,也可以混合调用,但最终只会以最后一次方法调用的返回结果为准`。
参考`parallel()`方法的说明:
> Returns an equivalent stream that is parallel. May return
>
> itself, either because the stream was already parallel, or because
>
> the underlying stream state was modified to be parallel.
所以多次调用同样的方法并不会生成新的流,而是直接复用当前的流对象。
下面的例子里以最后一次调用`parallel()`为准,最终是并行地计算`sum`:
```
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.sum();
```
### **ParallelStream背后的男人:ForkJoinPool** ###
ForkJoin框架是从JDK7中新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService 接口。它使用了一个「无限队列」来保存需要执行的任务,而线程的数量则是通过构造函数传入, 如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm) 来解决问题,典型的应用比如\_快速排序算法\_。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成`两个500 万的排序任务`和`一个针对这两组500万数据的合并任务`。
以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。
> 问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行,想象一下归并排序的过程。
所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法向 任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而`使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行`。
那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?
首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有「父子关系」的任务,比如使用4个线程来完成超过200万个任务。使用ThreadPoolExecutor 时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。
Work Stealing原理:
1. 每个工作线程都有自己的工作队列WorkQueue;
2. 这是一个双端队列dequeue,它是线程私有的;
3. ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务,即堆栈的方式;
4. 为了最大化地利用CPU,空闲的线程将从其它线程的队列中「窃取」任务来执行
5. 但是是从工作队列的尾部窃取任务,以减少和队列所属线程之间的竞争;
6. 双端队列的操作:push()/pop()仅在其所有者工作线程中调用,poll()是由其它线程窃取任务时调用的;
7. 当只剩下最后一个任务时,还是会存在竞争,是通过CAS来实现的;
### **用ForkJoinPool的眼光来看ParallelStream** ###
Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的CPU数量。当调用Arrays 类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。
比如下面的代码用来遍历列表中的元素并执行需要的操作:
```
List userInfoList =
DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);
```
对于列表中的元素的操作都会以并行的方式执行。`forEach`方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的`ForkJoinPool`中的commonPool处理。以上的并行计算逻辑当然也可以使用`ThreadPoolExecutor`完成,但是就代码的可读性和代码量而言,使用`ForkJoinPool`明显更胜一筹。
对于`ForkJoinPool`通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。也可以通过设置系统属性:`-Djava.util.concurrent .ForkJoinPool.common.parallelism=N` (N为线程数量),来调整`ForkJoinPool`的线程数量。
值得注意的是,当前执行的线程也会被用来执行任务,所以最终的线程个数为`N+1`,1就是当前的`主线程`。
这里就有一个问题,如果你在并行流的执行计算使用了\_阻塞操作\_,如I/O,那么很可能会导致一些问题:
```
public static String query(String question) {
List engines = new ArrayList();
engines.add("http://www.google.com/?q=");
engines.add("http://duckduckgo.com/?q=");
engines.add("http://www.bing.com/search?q=");
// get element as soon as it is available
Optional result = engines.stream().parallel().map((base) - {
String url = base + question;
// open connection and fetch the result
return WS.url(url).get();
}).findAny();
return result.get();
}
```
这个例子很典型,让我们来分析一下:
* 这个并行流计算操作将由主线程和JVM默认的`ForkJoinPool.commonPool()`来共同执行。
* `map`中是一个阻塞方法,需要通过访问`HTTP`接口并得到它的`response`,所以任何一个worker线程在执行到这里的时候都会阻塞并等待结果。
* 所以当此时再其他地方通过并行流方式调用计算方法的时候,将会受到此处阻塞等待的方法的影响。
* 目前的`ForkJoinPool`的实现并未考虑补偿等待那些阻塞在等待新生成的线程的工作worker线程,所以最终`ForkJoinPool.commonPool()`中的线程将备用光并且阻塞等待。
> 正如我们上面那个列子的情况分析得知,lambda的执行并不是瞬间完成的,所有使用parallel streams的程序都有可能成为阻塞程序的源头, 并且在执行过程中程序中的其他部分将无法访问这些workers,这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机。
小结:
> 1. 当需要处理递归分治算法时,考虑使用ForkJoinPool。
> 2. 仔细设置不再进行任务划分的阈值,这个阈值对性能有影响。
> 3. Java 8中的一些特性会使用到ForkJoinPool中的通用线程池。在某些场合下,需要调整该线程池的默认的线程数量
> 4. lambda应该尽量避免副作用,也就是说,避免突变基于堆的状态以及任何IO
> 5. lambda应该互不干扰,也就是说避免修改数据源(因为这可能带来线程安全的问题)
> 6. 避免访问在流操作生命周期内可能会改变的状态
### **并行流的性能** ###
并行流框架的性能受以下因素影响:
* 数据大小:数据够大,每个管道处理时间够长,并行才有意义;
* 源数据结构:每个管道操作都是基于初始数据源,通常是集合,将不同的集合数据源分割会有一定消耗;
* 装箱:处理基本类型比装箱类型要快;
* 核的数量:默认情况下,核数量越多,底层fork/join线程池启动线程就越多;
* 单元处理开销:花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显;
源数据结构分为以下3组:
* 性能好:`ArrayList`、数组或`IntStream.range`(数据支持随机读取,能轻易地被任意分割)
* 性能一般:`HashSet`、`TreeSet`(数据不易公平地分解,大部分也是可以的)
* 性能差:`LinkedList`(需要遍历链表,难以对半分解)、`Stream.iterate`和`BufferedReader.lines`(长度未知,难以分解)
注意:下面几个部分节选自:Streams 的幕后原理,顺便感谢一下作者\_Brian Goetz\_,写的太通透了。
### **NQ模型** ###
要确定并行性是否会带来提速,需要考虑的最后两个因素是:可用的数据量和针对每个数据元素执行的计算量。
在我们最初的并行分解描述中,我们采用的概念是拆分来源,直到分段足够小,以致解决该分段上的问题的顺序方法更高效。分段大小必须依赖于所解决的问题,确切的讲,取决于每个元素完成的工作量。例如,计算一个字符串的长度涉及的工作比计算字符串的 `SHA-1` 哈希值要少得多。为每个元素完成的工作越多,“大到足够利用并行性” 的阈值就越低。类似地,拥有的数据越多, 拆分的分段就越多,而不会与 “太小” 阈值发生冲突。
一个简单但有用的并行性能模型是 `NQ` 模型,其中 `N` 是数据元素数量,`Q` 是为每个元素执行的工作量。乘积 `N*Q` 越大,就越有可能获得并行提速。对于具有很小的 `Q` 的问题,比如对数字求和,您通常可能希望看到 `N > 10,000` 以获得提速;随着 `Q` 增加,获得提速所需的数据大小将会减小。
并行化的许多阻碍(比如拆分成本、组合成本或遇到顺序敏感性)都可以通过 `Q` 更高的操作来缓解。尽管拆分某个 `LinkedList` 特征的结果可能很糟糕,但只要拥有足够大的 `Q`,仍然可能获得并行提速。
### **遇到顺序** ###
遇到顺序指的是来源分发元素的顺序是否对计算至关重要。一些来源(比如基于哈希的集合和映射)没有有意义的遇到顺序。流标志 `ORDERED` 描述了流是否有有意义的遇到顺序。JDK 集合的 `spliterator` 会根据集合的规范来设置此标志;一些中间操作可能注入 `ORDERED` (`sorted()`) 或清除它 (`unordered()`)。
如果流没有遇到顺序,大部分流操作都必须遵守该顺序。对于顺序执行,会「自动保留遇到顺序」,因为元素会按遇到它们的顺序自然地处理。甚至在并行执行中,许多操作(无状态中间操作和一些终止操作(比如 `reduce()`)),遵守遇到顺序不会产生任何实际成本。但对于其他操作(有状态中间操作,其语义与遇到顺序关联的终止操作,比如 `findFirst()` 或 `forEachOrdered()`), 在并行执行中遵守遇到顺序的责任可能很重大。如果流有一个已定义的遇到顺序,但该顺序对结果没有意义, 那么可以通过使用 `unordered()` 操作删除 `ORDERED` 标志,加速包含顺序敏感型操作的管道的顺序执行。
作为对遇到顺序敏感的操作的示例,可以考虑 `limit()`,它会在指定大小处截断一个流。在顺序执行中实现 `limit()` 很简单:保留一个已看到多少元素的计数器,在这之后丢弃任何元素。但是在并行执行中,实现 `limit()` 要复杂得多;您需要保留前 `N` 个元素。此要求大大限制了利用并行性的能力;如果输入划分为多个部分,您只有在某个部分之前的所有部分都已完成后,才知道该部分的结果是否将包含在最终结果中。因此,该实现一般会错误地选择不使用所有可用的核心,或者缓存整个试验性结果,直到您达到目标长度。
如果流没有遇到顺序,`limit()` 操作可以自由选择任何 `N` 个元素,这让执行效率变得高得多。知道元素后可立即将其发往下游, 无需任何缓存,而且线程之间唯一需要执行的协调是发送一个信号来确保未超出目标流长度。
遇到顺序成本的另一个不太常见的示例是排序。如果遇到顺序有意义,那么 `sorted()` 操作会实现一种稳定 排序 (相同的元素按照它们进入输入时的相同顺序出现在输出中),而对于无序的流,稳定性(具有成本)不是必需的。 `distinct()` 具有类似的情况:如果流有一个遇到顺序,那么对于多个相同的输入元素,`distinct()` 必须发出其中的第一个, 而对于无序的流,它可以发出任何元素 — 同样可以获得高效得多的并行实现。
在您使用 `collect()` 聚合时会遇到类似的情形。如果在无序流上执行 `collect(groupingBy()`) 操作, 与任何键对应的元素都必须按它们在输入中出现的顺序提供给下游收集器。此顺序对应用程序通常没有什么意义,而且任何顺序都没有意义。在这些情况下,可能最好选择一个并发 收集器(比如 `groupingByConcurrent()`),它可以忽略遇到顺序, 并让所有线程直接收集到一个共享的并发数据结构中(比如 `ConcurrentHashMap`),而不是让每个线程收集到它自己的中间映射中, 然后再合并中间映射(这可能产生很高的成本)。
### **什么时候该使用并行流** ###
谈了这么多,关于并行流`parallelStream`的使用注意事项需要格外注意,它并不是解决性能的万金油,相反,如果使用不当会严重影响性能。
>来源:elsef.com/2019/09/16/Java8中Stream的原理分析