Java Stream 源码深入解析 您所在的位置:网站首页 api解析源码 Java Stream 源码深入解析

Java Stream 源码深入解析

2024-03-04 16:10| 来源: 网络整理| 查看: 265

类图

image.png

概念解释 Pipline和Stage

Pipline是流水线,表示一整个流程。Stage表示流水线的其中一个阶段。是一个比较抽象层面的描述,因为stage主要表示一种逻辑上的顺序关系,而具体每一个阶段要干嘛、怎么干,使用Sink来进行描述。

new stream //stage 0 .filter() //stage 1 .sort() //stage 2

image.png

Sink

直译为水槽,生活中水槽的作用无非

打开水龙头,知道有水要来 水在水槽里, 可以进行一些操作 打开水闸,放水 Java中的Sink核心功能为: begin(): 告诉该水槽水流要来了,可以进行一些初始化操作 accept():接受水流,然后进行操作 end():水流全部处理完了。 看一个sort()的示例,sort这个stage的目的就是对所有水流进行排序,然后再流到下游。 private static final class SizedRefSortingSink extends AbstractRefSortingSink { private T[] array; //要进行排序,需要一个数组进行缓存 private int offset; SizedRefSortingSink(Sink source, int sourceFlags, boolean parallel) { this.previousStage = null; //使用一个字段指向数据集合的Spliterator,后续终结操作的时候,引用的方式操作数据 this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags e=1).distinct().sort(); //等同于 Stream afterFilter = headStream.filter(e -> e != 2); Stream afterDistinct = afterFilter.distinct(); Stream afterSort = afterDistinct.sort(); Filter

执行filter(op)会发生什么?

Stream afterFilter = head.filter(e -> e = 1);

filter()方法定义在Stream类,实现在ReferencePipeline类。

//ReferencePipeline.class @Override public final Stream filter(Predicate upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful() { return false; } }

中间super()会执行AbstractPipeline类的构造方法, 连接stage之间的关系

//AbstractPipeline.class /** * Constructor for appending an intermediate operation stage onto an * existing pipeline. * * @param previousStage the upstream pipeline stage * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }

image.png

Distinct

示例

Stream afterDistinct = afterFilter.distinct();

distinct的方法实现在ReferencePipeline类下

@Override public final Stream distinct() { return DistinctOps.makeRef(this); }

调用DistinctOps类的makeRef()方法,返回一个StatefulOp类,并重写了4个方法,实现逻辑在opWrapSink()中:

/** * Appends a "distinct" operation to the provided stream, and returns the * new stream. * * @param the type of both input and output elements * @param upstream a reference stream with element type T * @return the new stream */ static ReferencePipeline makeRef(AbstractPipeline upstream) { // 返回一个StatefulOp类 return new ReferencePipeline.StatefulOp(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) { // 重写了以下几个方法,内容省略... Node reduce(PipelineHelper helper, Spliterator spliterator) {...} @Override Node opEvaluateParallel(PipelineHelper helper, Spliterator spliterator, IntFunction generator) {...} @Override Spliterator opEvaluateParallelLazy(PipelineHelper helper, Spliterator spliterator) {...} @Override Sink opWrapSink(int flags, Sink sink) { Objects.requireNonNull(sink); if (StreamOpFlag.DISTINCT.isKnown(flags)) { ... } else if (StreamOpFlag.SORTED.isKnown(flags)) { ... } else { // 返回一个SinkChainedReference类 return new Sink.ChainedReference(sink) { //使用一个Set缓存数据,进行去重 Set seen; //当上游通知begin的时候,初始化Set @Override public void begin(long size) { seen = new HashSet(); downstream.begin(-1); } //略 @Override public void end() { seen = null; downstream.end(); } //如果已经存在,之间抛弃 @Override public void accept(T t) { if (!seen.contains(t)) { seen.add(t); downstream.accept(t); } } }; } } }; }

StatefulOp类与StatelessOp类相似,都是继承了ReferencePipeline类,然后中间super()页会执行AbstractPipeline类的构造方法, 连接stage之间的关系

/** * Base class for a stateful intermediate stage of a Stream. * * @param type of elements in the upstream source * @param type of elements in produced by this stage * @since 1.8 */ abstract static class StatefulOp extends ReferencePipeline { //省略 }

image.png

至于其他的中间操作,套路都是类似的,操作逻辑封装在opWrapSink()方法里, 可以慢慢的看。

疑问解答 各个中间操作是如何进行关联的? 一个个的操作封装成了一个个的statelessOp或stateFulOp对象,以双向链表的方法串起来。 如何执行完一个中间操作,然后执行下一个? Sink类负责流水线操作的承接上下游和执行操作的任务,核心方法为begain()、accept()、end()。 有状态的中间操作是怎么保存状态的? 有状态的中间操作封装成stateFulOp对象,各自都有独立的逻辑,具体的参考sort()的实现逻辑。 懒加载如何实现的 每个中间操作调用后,只是append在流程的尾部,保存了关联关系而已。 流水线操作的启动,要交给wrapAndCopyInto()方法调用Head的Sink()操作,而wrapAndCopyInto()方法都需要由终结操作进行触发。 终结操作 几个疑问 终结方法是如何进行操作的? 如何实现由终结操作触发流的运作的? 如何保证一个流一次只能执行一个终结方法? 使用方式

列举四种终结操作,在Stream提供的API中,也是四类:

image.png

// 计算流中元素数量,FindOP long count = afterLimit.count(); // 遍历所有元素,ForEachOp afterLimit.forEach(System.out::printl); // 获取第一个元素,MatchOp Optional any = afterLimit.findFirst(); // 判断是否,ReduceOp boolean flag = afterLimit.anyMatch(i -> i == 1); count()

在ReferencePipeline类中实现

@Override public final long count() { // 调用mapToLong将所有元素变成1,然后计算sum return mapToLong(e -> 1L).sum(); } maoToLong()

mapToLong()方法,是一个中间操作

@Override public final LongStream mapToLong(ToLongFunction


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有