Java8 Stream の裏舞台は、きっとあなたが考えているより忙しない

f:id:Naotsugu:20171014114518p:plain

Java9 も出た昨今でいまさらですが、Stream パイプラインが裏でどのように動いているのかをコードを交えて紹介します。

以下の単純な例を元に、内部処理を追っていきましょう。

Arrays.asList("a", "b", "c")
      .stream()                     // (1) Stream の生成
      .filter(String::isEmpty)      // (2) パイプライン
      .forEach(System.out::print);  // (3) 終端処理


(1) Stream の生成

最初はストリームを取得する stream() の中身から見ていきます。

stream()Collection インターフェースのデフォルトメソッドとして以下のように定義されています。

public interface Collection<E> extends Iterable<E> {

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

}

StreamSupport.stream() で Stream を生成する処理です。第一引数には、Spliterator を受け取ります。

Spliterator は Stream の内部処理で使う分割可能なイテレータです。

Stream 処理は並列に複数スレッドで処理可能で、この際の分割と各要素のイテレートを表現します。

上記コードの spliterator() で、ソースとなる Arrays.asList("a", "b", "c") 用の Spliterator が生成されます。具体的には ArraySpliterator クラスのインスタンスになります。

Spliterator は、ArrayList の場合は ArrayListSpliterator であったり、HashSet の場合は KeySpliterator であったり、生成元のソースにより色々なインスタンスになります。


StreamSupport.stream() の中身を見てみましょう。

public final class StreamSupport {

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {

        return new ReferencePipeline.Head<>(
                spliterator,
                StreamOpFlag.fromCharacteristics(spliterator),
                parallel);
    }
}

ReferencePipeline.Head というクラスをインスタンス化しています。 このクラスは ReferencePipeline の内部クラスで、以下の定義になっています。

static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {

    Head(Spliterator<?> source, int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
    }
    // ...
}

ReferencePipeline のサブクラスになっていますね。このクラスは Stream のメソッドチェーンの先頭を表現します。

ReferencePipelineAbstractPipeline のサブクラスです。

Stream のメソッドチェーンは、この AbstractPipeline というクラスのサブクラスで構造化されます。

各メソッドをチェーンする毎に AbstractPipeline が連結リストのように伸びていきます。

AbstractPipeline は以下のようなフィールドを持っています(代表的なものだけ)

f:id:Naotsugu:20171014114550p:plain

sourceStage は Stream の最上流 Pipeline を参照します。

previousStage が一つ前の Pipeline を参照し、nextStage が次の Pipeline を参照する双方向リストの構造になっています。


では Head のコンストラクト処理を見てみましょう。

ReferencePipeline のコンストラクタが呼ばれ、

abstract class ReferencePipeline<P_IN, P_OUT>
        extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
        implements Stream<P_OUT>  {

    ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
    }

}

さらに AbstractPipeline のコンストラクタが呼ばれます。

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {

    AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.depth = 0;
        // ...
    }
    //...
}

先頭の Pipeline(Head) は、previousStage が null に、sourceStage として自身を設定しています。

そして sourceSpliterator に先程生成した ArraySpliterator が設定されます。

この時点では以下のようになります。

f:id:Naotsugu:20171014114613p:plain

Stream の生成では、ソースから Spliterator を作成し、Pipeline に設定してReferencePipeline.Head が戻り値となります。

ReferencePipeline.HeadStream インターフェースを実装しているためメソッドチェーンを続けることができます。

(2) パイプライン

メソッドチェーンの2番めの filter() の処理です。

先程の処理で生成された ReferencePipelinefilter() がコールされます。

少しややこしいので、説明に必要な部分に絞ると以下のようになります(opWrapSink の中身は後ろで説明します)。

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {

    return new StatelessOp<P_OUT, P_OUT>(
            this, 
            StreamShape.REFERENCE,
            StreamOpFlag.NOT_SIZED) {

        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            // ...
        }
    };
}

StatelessOp というクラスをインスタンス化して返却しています。このクラスは抽象クラスで、これも ReferencePipeline のサブクラスです。

abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {

    StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                StreamShape inputShape,
                int opFlags) {

        super(upstream, opFlags);

    }
    // ...
}

StatelessOpsuper(upstream, opFlags) で呼ばれるのは AbstractPipeline のコンストラクタで以下のようになっています。

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {

    previousStage.nextStage = this;
    this.previousStage = previousStage;
    this.sourceStage = previousStage.sourceStage;
    this.depth = previousStage.depth + 1;
}

Pipeline の上流と、自身をリンクしているのが分かります(nextStage は無いので自身)。

この段階で Pipeline は以下のような構造になります。

f:id:Naotsugu:20171014114631p:plain

さて、先程省略した opWrapSink() の中身を見てみましょう。

    @Override
    Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {

        return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {

            @Override
            public void begin(long size) {
                downstream.begin(-1);
            }

            @Override
            public void accept(P_OUT u) {
                if (predicate.test(u)) // Predicate の適用
                    downstream.accept(u);
            }
        };
    }

opWrapSink() では、Sink.ChainedReference という抽象クラスをインスタンス化して返します。

Sink については後ほど見ていくので、ここでは predicate.test(u) という箇所に着目します。

この predicate は filter の引数として受け取ったもので、filter(String::isEmpty) の述語部分となります。

Sink という入れ物の中に、受け取った predicate を閉じ込める処理になっています。終端処理にて opWrapSink() が呼び出されたタイミングで、述語部分を閉じ込めた Sink というオブジェクトを生成することになります。


今回の例ではパイプラインは1段ですが、複数あった場合は以下のようにリストが伸びていきます。

f:id:Naotsugu:20171014114649p:plain

(3) 終端処理

最後の forEach() で Stream の終端処理となります。いままで連結してきた Pipeline を使ってソースの要素に処理を適用していく処理です。

この終端処理では、最初に Pipeline を上流に辿りながらSink という形で適用する処理を連結リストとして構築し、Pipeline の先頭にある Spliterator から Sinkにソース要素を流し込むという処理を行います。


forEach()ReferencePipeline で以下のように定義されています。

    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }
}

引数の Consumer (System.out::print) から ForEachOps.makeRef() として evaluate() を呼び出しています。

ForEachOps.makeRef() では終端処理として渡した Consumer を ForEachOp.OfRef という終端処理を表す Sink にラップして返却します。

f:id:Naotsugu:20171014114703p:plain


evaluate() の方を見てみましょう。

これは ReferencePipeline の親クラスの AbstractPipeline で以下のように定義されています。

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {

    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

引数で、終端操作を表す TerminalOp を受け取り、evaluateSequential() を呼んでいます(今回は並列実行については触れません)。この TerminalOp は先程生成した ForEachOp.OfRef ですね。

sourceSpliterator() は、現在の Pipeline の先頭に設定されている Spliterator を取得しているだけで、今回の場合は ArraySpliterator となります(ちなみにこの取得操作でPipelineの先頭に設定された Spliterator は null クリアされます)。

さて、evaluateSequential()TerminalOp の実装クラスである ForEachOp で以下のように定義されています。

@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<S> spliterator) {
    return helper.wrapAndCopyInto(this, spliterator).get();
}

PipelineHelperAbstractPipeline の親クラスです。

AbstractPipeline において、wrapAndCopyInto() は以下の定義になっています。

@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(sink), spliterator);
    return sink;
}

引数の sinkForEachOp.OfRef で、これを wrapSink() とし、copyInto() に渡しています。

wrapSink() で Pipeline を上流方向に辿りながら Sink を生成する処理を行っています。

@Override
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {

    for (AbstractPipeline p = AbstractPipeline.this; p.depth > 0; p = p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }

    return (Sink<P_IN>) sink;
}

パイプラインを上流方向にたどりながら、opWrapSink() により Sink にラップしていき、先頭までいったら、その先頭の Sink を返却となっています。


さて、ここでコールしている opWrapSink() はどのインスタンスのメソッドになるでしょうか。

メソッドチェーンの1つ前の filter() では以下のように StatelessOp を匿名クラスとしてインスタンス化して返却していました。 ここで opWrapSink() をオーバーライドしています。

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {

    return new StatelessOp<P_OUT, P_OUT>(
            this, 
            StreamShape.REFERENCE,
            StreamOpFlag.NOT_SIZED) {

        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            // ...
        }
    };
}

再掲すると以下です。

    @Override
    Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {

        return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {

            @Override
            public void begin(long size) {
                downstream.begin(-1);
            }

            @Override
            public void accept(P_OUT u) {
                if (predicate.test(u))
                    downstream.accept(u);
            }
        };
    }

Sink.ChainedReference のコンストラクタは以下のようになっています。

static abstract class ChainedReference<T, E_OUT> implements Sink<T> {

    protected final Sink<? super E_OUT> downstream;

    public ChainedReference(Sink<? super E_OUT> downstream) {
        this.downstream = downstream;
    }
    // ...
}

つまり、ChainedReference は上流から下流に向かう単方向リストとなっています。 Stream の終端処理にて、Pipeline を末尾からたどりながら Sink という形で操作をラップして単方向リストを生成していることがわかります。

今回の Pipeline は 1段階なので以下のような Sink の連結リストになります。

f:id:Naotsugu:20171014114722p:plain

元にもどり、wrapAndCopyInto() を再掲します。

@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(sink), spliterator);
    return sink;
}

wrapSink() で連結した Sink の先頭要素と Spliterator を copyInto() に渡しています。

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {

    wrappedSink.begin(spliterator.getExactSizeIfKnown());

    spliterator.forEachRemaining(wrappedSink);

    wrappedSink.end();
}

Sink.begin()Sink.end() の間に spliterator.forEachRemaining() を呼んでいます。

今回の場合は ArraySpliterator で、以下のようになっています。

static final class ArraySpliterator<T> implements Spliterator<T> {
    private final Object[] array;
    private int index;
    private final int fence;
    private final int characteristics;

    @Override
    public void forEachRemaining(Consumer<? super T> action) {
        Object[] a; int i, hi;
        if ((a = array).length >= (hi = fence) &&
            (i = index) >= 0 && i < (index = hi)) {
            do { 
                action.accept((T)a[i]);
            } while (++i < hi);
        }
    }
}

対象の要素を action.accept() しています。この action は Sink なので、

  • Sink.begin()
  • Sink.accept() を要素数分コール
  • Sink.end()

という形になり、begin, accept, end は下流の Sink に伝搬されていきます。

f:id:Naotsugu:20171014114740p:plain

まとめ

Stream パイプラインの処理の流れを簡単な例でみてきました。

中心となるのは以下の3つです。

  • Spliterator : Stream を流れるソース要素のイテレータ
  • Pipeline : メソッドチェーンの構造を表し、終端操作を起点に Sink を生成
  • Sink : パイプライン上の操作を連結したリスト

今回は触れていませんが、ソートや並列ストリームに対応する処理などもあります。Stream の裏舞台は、きっとあなたが考えているより忙しなかったのではないでしょうか。