Stream Api 首先回顾下 Stream Api 的操作分类:
操作分类
子分类
操作
无状态
unordered filter map mapTo…
中间操作
flatMap flatMapTo… peek
有状态
distinct sorted limit skip
非短路操作
forEach forEachOrdered toArray
终结操作
reduce collect max min count
短路操作
anyMatch allMatch noneMatch
findFirst findAny
前文有提到 Stream操作分为中间操作和终结操作,其中中间操作不会触发实际动作,仅标记状态;终结操作才会触发实际计算动作。 中间操作又分为无状态(Stateless)和有状态(Stateful),有无状态是指元素间是否相互影响,如 sort 是有状态操作, 读取完全部元素前不能确定结果;终结操作又分为非短路操作和短路操作,短路操作是指不需要处理全部元素即可返回结果,如 findFirst。
简单的实现方式 接下来看个示例:
1 2 3 4 Stream.of("one", "two", "three", "four") .filter(e -> e.length() > 3) .map(String::toUpperCase) .collect(Collectors.toList());
该代码将字符串序列过滤,找出其中长度大于三的字符串转为大写,并收集为List。我们很容易想到一种实现方式,即每个函数都执行一次迭代,并将处理的数据 保存在中间容器中。用代码描述如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // do filter List<String> filterList = new ArrayList<>(); for (String s: source){ if (s.length > 3){ filterList.add(source); } } // do map List<String> mapList = new ArrayList<>(); for (String s: filterList){ mapList.add(s.toUpperCase()); } // do collect List<String> collectList = new ArrayList<>(); collectList.addAll(mapList);
显然,这种实现虽然简单,但多次迭代既浪费时间,存储中间结果又浪费空间,效率极低。在我们自己实现时,自然会采用以下方法:
1 2 3 4 5 6 7 List<String> collectList = new ArrayList<>(); for (String s: source){ if (s.length > 3){ // filter(), 获取长度大于3的字符串 String upperStr = s.toUpperCase(); // map(), 转为大写字符串 collectList.add(upperStr); // collect(), 收集结果 } }
采用这种方式我们既减少了迭代次数,也避免存储中间结果。这就是pipeline的目标结果了。当知道操作目标时,就可以将操作进行叠加到单次迭代来实现 Stream Api了。这也是为什么对操作的分类多且详细。
Stream Pipeline 从上面我们可以得到解决方案的思路,我们需要在调用终结操作前,先将依序中间操作记录下来,然后在调用终结操作时将中间操作叠加在一起并在一次迭代中将所有操作执行。 即:
记录中间操作
叠加操作
执行操作
保存结果
记录操作 Stream中使用 Stage 这个术语来描述一组完整的操作,并使用示例化后的 PipelineHelper 来代表 Stage。将这些 Stage 按序串联起来,就够成了整个流水线。
与之相关的接口与类如下图所示。
图中 DoublePipeline LongPipeline 与 IntPipeline 是专门处理对应基本类型的流水线,这里主要关注 ReferencePipeline。
Head, StatefulOp, StatelessOp 是 ReferencePipeline 的子类。其中 Head 用于表示第一个 Stage,如调用产生流的一些方法如Collections.stream()方法所产生的 Stage,因此该 Stage不会包含任何操作; StatefulOp 和 StatelessOp 分别代表有无状态的 Stage,对应有无状态的中间操作。
1 2 3 4 5 ____________ ______ ----> ____________ ----> ___________ |Collection| |Head| |StatelessOp| |StatefulOp| ------------ ------ <--- ------------ <--- ----------- data source ————> stage0 ————> stage1 ————> stage2 stream map sort
上图所示 Pipeline,由Collection.stream() 产生 Head,然后调用一系列的中间操作不断产生 Stream。这些 Stream 对象以双向链表的形式组织在一起,构成整个流水线,由于每个 Stage 都记录了前一个 Stage 和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是 Stream 记录操作的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 // 将 stage 添加到已有 pipeline 建立链表 AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
叠加操作 直觉上,操作叠加非常简单,就是从 Head 开始依次执行每一次操作。但事实上,Stage 之间并不知道互相执行了什么样的操作,这使得需要有方法来协调相邻 Stage 之间的调用关系。
Sink 接口用于解决这个问题,
begin 开始遍历元素之前调用该方法,通知Sink做好准备。
end 所有元素遍历完成之后调用,通知Sink没有更多的元素了。
cancellationRequested 是否可以结束操作,可以让短路操作尽早结束。
accept 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept方法就行了。
经过 Sink 封装后,Stage之间就可以方便调用,仅需调用 accept 方法而不必知道其内部实现。对于 StatefulOp 来说,需要实现 begin 和 end 方法。对于短路操作来说,还需实现 cancellationRequested 方法来尽快返回。Sink 的四个接口方法常常相互协作,共同完成计算任务。实际上 Stream API 内部实现的的本质,就是如何重载 Sink 的这四个接口方法。在经过包装后,执行时只需要从流水线的 Head 开始对数据源依次调用每个 Stage 对应的 Sink 的 begin, accept, cancellationRequested, end 方法就可以了。
Sink.accept 方法通常首先使用当前 Sink 包装的回调函数处理元素,然后再将结果传递给流水线下游的 Sink。来看下 ReferencePipeline 对 filter 方法的实现观察这一流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 // Stream.filter(predicate)会产生新的Stream public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override //opWripSink()方法返回由回调函数包装而成Sink Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); // 因不知道推到下流的元素数量,使用-1代表未知或无限 } @Override public void accept(P_OUT u) { if (predicate.test(u))// 1. 使用当前Sink包装的回调函数predicate处理u downstream.accept(u);// 2. 将处理结果传递给流水线下游的Sink } }; } }; }
上面将回调函数predicate包装到一个Sink当中。由于 filter 是一个无状态的中间操作,所以 filter 方法返回了一个 StatelessOp 内部类对象(一个新的Stream),调用这个新 Stream 的 opWripSink() 方法将得到一个包装了当前回调函数的Sink。
再来看下 SortedOps 对 refences sorted 的实现,因 sorted 在遍历完全部元素不知道结果,所以会复杂一些。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { private ArrayList<T> list;// 存放用于排序的元素 RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super(sink, comparator); } @Override public void begin(long size) { // MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 是最大可收集元素个数 if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); // 1.初始化 list list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); } @Override public void end() { // 3.只有元素全部接收之后才能开始排序 list.sort(comparator); // 传递给下游元素个数 downstream.begin(list.size()); // 如果下游 Sink 不包含短路操作,则传递给下游 if (!cancellationWasRequested) { list.forEach(downstream::accept); // 传递元素 } else { // 如果下游包含短路操作,则每次先询问是否短路,再传递元素 for (T t : list) { if (downstream.cancellationRequested()) break; downstream.accept(t); // 传递元素 } } downstream.end(); list = null; } @Override public void accept(T t) { list.add(t); // 2. 使用当前Sink包装动作处理t,只是简单的将元素添加到中间列表当中 } }
上述代码完美的展现了Sink的四个接口方法是如何协同工作的:
首先 begin 方法告诉 Sink 参与排序的元素个数,方便确定中间结果容器的的大小;
之后通过 accept 方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;
最后 end 方法告诉 Sink 所有元素遍历完毕,启动排序,排序完成后将结果传递给下游的 Sink ;
如果下游的 Sink 是短路操作,则将结果传递给下游时会不断询问下游cancellationRequested 是否可以结束处理。
执行操作 叠加操作完成后,我们只需要执行操作了。如之前所说,中间操作不会触发执行,只有终结操作会触发执行。一旦调用终结操作,就会触发整个 pipeline 的执行。
终结操作不会创建新的 stream ,亦即不会创建 stage 。结束操作会创建一个包装本身的 Sink ,这也是最后一个 Sink ,不会继续将元素传递给下游,仅对元素进行处理。
AbstractPipeline.opWrapSink(int flags, Sink sink) 方法返回一个新的包含了当前 Stage 代表的操作以及能够将结果传递给 downstream 的 Sink 对象,示例可见叠加操作一节中 filter 的源码。使用 opWrapSink 可以将当前操作与下游 Sink 结合成新 Sink 。如果从终结操作的 Sink 开始至 Head 调用该方法,则会形成一个包含了所有操作的 Sink, 这就是 AbstractPipeline.wrapSink 方法,如下:
1 2 3 4 5 6 7 8 9 10 // 从下游回溯不断调用 opWrapSink // 如果传入参数 sink 是终结操作的 Sink, 则会得到一个含所有操作的 sink final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
现在流水线上所有操作都被包含到该 Sink 中,仅需执行该 Sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 // AbstractPipeline // 对spliterator代表的数据执行wrappedSink代表的操作 // Spliterator 可认为是可拆分,可批量迭代的Iterator final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { // 通知开始遍历 wrappedSink.begin(spliterator.getExactSizeIfKnown()); // 执行迭代 spliterator.forEachRemaining(wrappedSink); // 通知结束遍历 wrappedSink.end(); } else { // 带短路操作的使用该方法 copyIntoWithCancel(wrappedSink, spliterator); } }
保存结果 所有操作执行完成后,我们需要知道执行结果保存在哪。除开无返回结果的终结操作(如forEach,我们需要的是它的Side effects),将带返回结果的操作列出如下:
返回类型
终结操作
boolean
anyMatch allMatch noneMatch
Optional
findFirst findAny
归约结果
reduce collect max min count
数组
toArray
对于表中返回boolean或者Optional的操作的操作,由于只返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。
对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect, reduce, max, min都是归约操作,虽然 max 和 min 也是返回一个Optional,但事实上底层是通过调用reduce方法实现的。同理 count 同样是调用的reduce。
对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。
并行计算 Stream 还有一大优势,即可简单的利用多核优势进行并行计算。只使用steam.parallel方法或Collection.parallelStream即可创建一条并行流。
为了找出整个流程,首先我们找到执行终结操作的方法:
1 2 3 4 5 6 7 8 9 10 11 12 // AbstractPipeline // 执行终结操作 final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
这里判断了是否为并行,并行则使用并行执行方法。evaluateParallel是TerminalOp接口的方法,有多个实现,可到 FindOps 查看下该方法的一种实现。
1 2 3 4 5 @Override public <P_IN> O evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return new FindTask<>(this, helper, spliterator).invoke(); }
这里生成了一个FindTask并执行。
其他ReduceOps,MatchOps等类似,都是执行了对应Task,现对相关Task整理为图列出。
所有的Task都是ForkJoinTask的子类,从此也可以看出Stream的并行是基于ForkJoin实现的。
查看AbstractTask的compute方法,看是如何进行并行计算的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public void compute() { Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators //预估这个分片中的数据量 long sizeEstimate = rs.estimateSize(); //获取阈值 long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; //根据预估的数据量获取最小处理单元的大小阈值,即当数据量已经小于这个阈值的时候进行计算,否则进行fork 将任务划分成更小的数据块 while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; //分别将左右分片的任务创建为新的Task,并且将当前的任务关联为两个新任务的父级任务 task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); //先后对左右子节点的任务进行fork,对另外的分区进行分解。同时设定pending 为1,这代表一个task 实际上只会有一个等待的子节点(被fork) task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } //当任务已经分解到足够小的时候退出循环,尝试进行结束。调用子类实现的doLeaf方法,完成最小计算单元的计算任务,并设置到当前任务的localResult中 task.setLocalResult(task.doLeaf()); //调用tryComplete 方法进行最终任务的扫尾工作,如果该任务pending 值不等于0,则原子的减1,如果已经等于0,说明任务都已经完成,则调用onCompletion 回调,如果该任务是叶子任务,则直接销毁中间数据结束;如果是中间节点会将左右子节点的结果进行合并 //检查如果这个任务已经没有父级任务了,则将该任务置为正常结束,如果还有则尝试递归的去调用父级节点的onCompletion回调,逐级进行任务的合并。 task.tryComplete(); }