Java9 も出た昨今でいまさらですが、Stream パイプラインが裏でどのように動いているのかをコードを交えて紹介します。
以下の単純な例を元に、内部処理を追っていきましょう。
Arrays.asList("a", "b", "c") .stream() // (1) Stream の生成 .filter(String::isEmpty) // (2) パイプライン .forEach(System.out::print); // (3) 終端処理
(1) Stream の生成
最初はストリームを取得する stream()
の中身から見ていきます。
stream()
は Collection
インターフェースのデフォルトメソッドとして以下のように定義されています。
public interface Collection<E> extends Iterable<E> { default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); } }
StreamSupport.stream()
で Stream を生成する処理です。第一引数には、Spliterator を受け取ります。
Spliterator は Stream の内部処理で使う分割可能なイテレータです。
Stream 処理は並列に複数スレッドで処理可能で、この際の分割と各要素のイテレートを表現します。
上記コードの spliterator()
で、ソースとなる Arrays.asList("a", "b", "c")
用の Spliterator が生成されます。具体的には ArraySpliterator
クラスのインスタンスになります。
Spliterator は、ArrayList の場合は ArrayListSpliterator
であったり、HashSet の場合は KeySpliterator
であったり、生成元のソースにより色々なインスタンスになります。
StreamSupport.stream()
の中身を見てみましょう。
public final class StreamSupport { public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { return new ReferencePipeline.Head<>( spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); } }
ReferencePipeline.Head
というクラスをインスタンス化しています。
このクラスは ReferencePipeline
の内部クラスで、以下の定義になっています。
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { Head(Spliterator<?> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } // ... }
ReferencePipeline
のサブクラスになっていますね。このクラスは Stream のメソッドチェーンの先頭を表現します。
ReferencePipeline
は AbstractPipeline
のサブクラスです。
Stream のメソッドチェーンは、この AbstractPipeline
というクラスのサブクラスで構造化されます。
各メソッドをチェーンする毎に AbstractPipeline
が連結リストのように伸びていきます。
AbstractPipeline
は以下のようなフィールドを持っています(代表的なものだけ)
sourceStage
は Stream の最上流 Pipeline を参照します。
previousStage
が一つ前の Pipeline を参照し、nextStage
が次の Pipeline を参照する双方向リストの構造になっています。
では Head
のコンストラクト処理を見てみましょう。
ReferencePipeline
のコンストラクタが呼ばれ、
abstract class ReferencePipeline<P_IN, P_OUT> extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>> implements Stream<P_OUT> { ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } }
さらに AbstractPipeline
のコンストラクタが呼ばれます。
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> { AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.depth = 0; // ... } //... }
先頭の Pipeline(Head) は、previousStage が null
に、sourceStage として自身を設定しています。
そして sourceSpliterator
に先程生成した ArraySpliterator
が設定されます。
この時点では以下のようになります。
Stream の生成では、ソースから Spliterator を作成し、Pipeline に設定してReferencePipeline.Head
が戻り値となります。
ReferencePipeline.Head
は Stream
インターフェースを実装しているためメソッドチェーンを続けることができます。
(2) パイプライン
メソッドチェーンの2番めの filter()
の処理です。
先程の処理で生成された ReferencePipeline
の filter()
がコールされます。
少しややこしいので、説明に必要な部分に絞ると以下のようになります(opWrapSink
の中身は後ろで説明します)。
@Override public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { return new StatelessOp<P_OUT, P_OUT>( this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { // ... } }; }
StatelessOp
というクラスをインスタンス化して返却しています。このクラスは抽象クラスで、これも ReferencePipeline
のサブクラスです。
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, opFlags); } // ... }
StatelessOp
の super(upstream, opFlags)
で呼ばれるのは AbstractPipeline のコンストラクタで以下のようになっています。
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { previousStage.nextStage = this; this.previousStage = previousStage; this.sourceStage = previousStage.sourceStage; this.depth = previousStage.depth + 1; }
Pipeline の上流と、自身をリンクしているのが分かります(nextStage は無いので自身)。
この段階で Pipeline は以下のような構造になります。
さて、先程省略した opWrapSink()
の中身を見てみましょう。
@Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) // Predicate の適用 downstream.accept(u); } }; }
opWrapSink()
では、Sink.ChainedReference
という抽象クラスをインスタンス化して返します。
Sink
については後ほど見ていくので、ここでは predicate.test(u)
という箇所に着目します。
この predicate は filter の引数として受け取ったもので、filter(String::isEmpty)
の述語部分となります。
Sink という入れ物の中に、受け取った predicate を閉じ込める処理になっています。終端処理にて opWrapSink()
が呼び出されたタイミングで、述語部分を閉じ込めた Sink というオブジェクトを生成することになります。
今回の例ではパイプラインは1段ですが、複数あった場合は以下のようにリストが伸びていきます。
(3) 終端処理
最後の forEach()
で Stream の終端処理となります。いままで連結してきた Pipeline を使ってソースの要素に処理を適用していく処理です。
この終端処理では、最初に Pipeline を上流に辿りながらSink
という形で適用する処理を連結リストとして構築し、Pipeline の先頭にある Spliterator から Sink
にソース要素を流し込むという処理を行います。
forEach()
は ReferencePipeline
で以下のように定義されています。
@Override public void forEach(Consumer<? super P_OUT> action) { evaluate(ForEachOps.makeRef(action, false)); } }
引数の Consumer (System.out::print
) から ForEachOps.makeRef()
として evaluate()
を呼び出しています。
ForEachOps.makeRef()
では終端処理として渡した Consumer を ForEachOp.OfRef
という終端処理を表す Sink
にラップして返却します。
evaluate()
の方を見てみましょう。
これは ReferencePipeline
の親クラスの AbstractPipeline
で以下のように定義されています。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
引数で、終端操作を表す TerminalOp
を受け取り、evaluateSequential()
を呼んでいます(今回は並列実行については触れません)。この TerminalOp
は先程生成した ForEachOp.OfRef
ですね。
sourceSpliterator()
は、現在の Pipeline の先頭に設定されている Spliterator を取得しているだけで、今回の場合は ArraySpliterator
となります(ちなみにこの取得操作でPipelineの先頭に設定された Spliterator は null クリアされます)。
さて、evaluateSequential()
は TerminalOp
の実装クラスである ForEachOp
で以下のように定義されています。
@Override public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) { return helper.wrapAndCopyInto(this, spliterator).get(); }
PipelineHelper
は AbstractPipeline
の親クラスです。
AbstractPipeline
において、wrapAndCopyInto()
は以下の定義になっています。
@Override final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(sink), spliterator); return sink; }
引数の sink
は ForEachOp.OfRef
で、これを wrapSink()
とし、copyInto()
に渡しています。
wrapSink()
で Pipeline を上流方向に辿りながら Sink を生成する処理を行っています。
@Override final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { for (AbstractPipeline p = AbstractPipeline.this; p.depth > 0; p = p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
パイプラインを上流方向にたどりながら、opWrapSink()
により Sink にラップしていき、先頭までいったら、その先頭の Sink を返却となっています。
さて、ここでコールしている opWrapSink()
はどのインスタンスのメソッドになるでしょうか。
メソッドチェーンの1つ前の filter()
では以下のように StatelessOp
を匿名クラスとしてインスタンス化して返却していました。
ここで opWrapSink()
をオーバーライドしています。
@Override public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { return new StatelessOp<P_OUT, P_OUT>( this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { // ... } }; }
再掲すると以下です。
@Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; }
Sink.ChainedReference
のコンストラクタは以下のようになっています。
static abstract class ChainedReference<T, E_OUT> implements Sink<T> { protected final Sink<? super E_OUT> downstream; public ChainedReference(Sink<? super E_OUT> downstream) { this.downstream = downstream; } // ... }
つまり、ChainedReference
は上流から下流に向かう単方向リストとなっています。
Stream の終端処理にて、Pipeline を末尾からたどりながら Sink という形で操作をラップして単方向リストを生成していることがわかります。
今回の Pipeline は 1段階なので以下のような Sink の連結リストになります。
元にもどり、wrapAndCopyInto()
を再掲します。
@Override final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(sink), spliterator); return sink; }
wrapSink()
で連結した Sink の先頭要素と Spliterator を copyInto()
に渡しています。
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); }
Sink.begin()
と Sink.end()
の間に spliterator.forEachRemaining()
を呼んでいます。
今回の場合は ArraySpliterator
で、以下のようになっています。
static final class ArraySpliterator<T> implements Spliterator<T> { private final Object[] array; private int index; private final int fence; private final int characteristics; @Override public void forEachRemaining(Consumer<? super T> action) { Object[] a; int i, hi; if ((a = array).length >= (hi = fence) && (i = index) >= 0 && i < (index = hi)) { do { action.accept((T)a[i]); } while (++i < hi); } } }
対象の要素を action.accept()
しています。この action は Sink なので、
Sink.begin()
Sink.accept()
を要素数分コールSink.end()
という形になり、begin, accept, end は下流の Sink に伝搬されていきます。
まとめ
Stream パイプラインの処理の流れを簡単な例でみてきました。
中心となるのは以下の3つです。
Spliterator
: Stream を流れるソース要素のイテレータPipeline
: メソッドチェーンの構造を表し、終端操作を起点にSink
を生成Sink
: パイプライン上の操作を連結したリスト
今回は触れていませんが、ソートや並列ストリームに対応する処理などもあります。Stream の裏舞台は、きっとあなたが考えているより忙しなかったのではないでしょうか。