Kafka Streams はレコードをどのように処理しているのか

Platform Team/Repro Core Unit の村上です。

Repro では Kafka を基盤としたストリーム処理のアプリケーションを構築する際に、Kafka Streams を積極的に活用しています。
Kafka Streams は、フォールトトレラントなステートフル処理を簡潔に実装でき、データパイプラインを Topology という表現で抽象化することで、複雑な処理でも管理しやすい形で組み立てていくことが可能です。
また、Apache Kafka 以外の外部依存がないことや Streams DSL によるシンプルな記述でストリーム処理を実装できることなども、ストリーム処理のアプリケーションをスムーズに構築する上で助かっています。

一方で、
なにかしらの問題が発生したときのトラブルシューティングや影響範囲調査の際には、Kafka Streams の内部処理を把握していないと適切な対応をすることが難しいこともあります。 内部処理を把握することは、Kafka Streams が提供するメトリクスの正確な理解やワークロードに応じたパフォーマンスチューニングなどにも役立ちます。

そこで本稿では、Kafka Streams の内部挙動を理解する第一歩として、
「Kafka Topic から取得したレコードはどのようなプロセスを経て処理されていくのか」という視点から内部の処理を追っていき、Kafka Streams で行われているレコード処理のおおよその流れを理解するところまでを目指します。

Kafka Streams の処理構造

Kafka Streams 処理モデル 引用: https://kafka.apache.org/38/documentation/streams/architecture

コードの詳細を見ていく前に、まずは全体像を掴むため Kafka Streams の処理を示した論理イメージを元に説明します。

図中の「Input Kafka Streams」と「Output Kafka Streams」は、入力と出力の Topic を表しています。どちらも複数の Topic を指定可能で、入力 Topic は必須です。

「Consumer」と「Producer」の部分は、Kafka Clients という Kafka Producer/Consumer のクライアントライブラリであり、実際に Kafka Broker とやりとりをするのは、Kafka Clients ライブラリです。
enable.auto.commit など、Kafka Streams 側で Kafka Clients の Config Property をオーバーライドしている箇所もあったりするので、Kafka Streams と Kafka Clients の担当領域とそれらの関係を抑えておくことは内部を理解する上で重要です。
Kafka Streams は、この Kafka Clients をラップし、Streams DSL によるストリーム処理のシンプルな記述や StateStore によるステートフル処理などを提供することで、Kafka Clients の一段階上の抽象レイヤーを提供しています。
Streams DSL とは、レコードのファルタリング、集約、結合などのデータ操作処理を数行で記述できる DSL です。Processor API という低レベル API を基盤としていて、Streams DSL では実現が難しい場面は Processor API を使うことができます。
StateStore とは、ストリーム処理における状態を保持するための機能です。レコードの結合や複数レコードの集約など、一時的にレコードの情報を格納しておきたいケースで活用でき、StateStore を使うことでステートフルな処理を実現できます。Processor API では直接操作できますが、Streams DSL では StateStore の存在は隠蔽されています。

Kafka Streams では、Streams DSLProcessor API を使い、レコードの処理ロジックを Topology というグラフ構造で定義しますが、その Topology に従ってレコードを処理しているところが図中の「Task」です。
Topology は、Sub Topology という独立した 1 つのデータフローの集まりで構成され、Task はその Sub Topology と Input Topic の Partition 番号の組み合わせの数だけ作成されます。 例えば、ある Kafka Streams アプリケーションが 2 つの Sub Topology を持ち、どちらも Input Topic の Partition が 10 の場合、Task は合計 20 個つくられます。

Task は「Stream Thread」に割り当てられて実行されます。上図でも 1 つの Stream Thread で複数の Task が割り当てられている様子が示されています。

Kafka Streams の処理の流れは簡単にまとめると、以下のようになります。

  1. Kafka Clients の Consumer で Input Topic からレコードを取得
  2. Kafka Streams 上で定義した Topology に従い Stream Thread にて Task 単位でレコードを処理する
  3. 処理結果を Kafka Clients の Producer で Output Topic に書き込む(Output Topic がなければ Producer での書き込みはない)

Kafka Streams のレコード処理を追う

Kafka Streams の実際のコードを参照しつつ、Topic から取得したレコードを処理する一連の流れを見ていきましょう。
なお、今回参照する Kafka Streams の version は 3.8.0 です。

Kafka Streams アプリケーションを起動する際は、Streams DSLProcessor API を使って書き出した Topology と、Config もしくは Config Property を KafkaStreams に渡し、KafkaStreams#start を実行します。

KafkaStreams#start を実行するまでのシンプルなコード例として WordCount の一部を抽出して若干書き換えたものを上げておきます。

final Properties props = new Properties();
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("streams-plaintext-input");
final KTable<String, Long> counts = source
    .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
    .groupBy((key, value) -> value)
    .count();
counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();

final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

上記 WordCount の例でいうと、streams.start() の部分です。
ここの KafkaStreams#start から処理を追っていきます。


KafkaStreams#start を実行すると StreamThread が起動します
StreamThreadJava の Thread であり、num.stream.thread で指定した数 (デフォルトは 1)、StreamThread が起動します。

StreamThread が起動すると runLoop というメソッドが実行されます。 この runLoop こそが Kafka Streams のメインイベントループを実行する箇所で、主要な処理になります。
runLoop では Kafka Consumer の subscribe を行ったあと、ループ内で Input Topic をポーリングして shutdown するまでレコードを処理を続けます。

ループ内で実行している 1 イテレーションを理解するときは、runOnce を見ます。
runOnce は Kafka Streams 3.7 から runOnceWithProcessingThreadsrunOnceWithoutProcessingThreads に分かれていて、デフォルトでは runOnceWithoutProcessingThreads が呼ばれるようになっているので、今回はそちらを確認していきます。

runOnce の各ステップは以下です(java doc に記載されているものをそのまま転記)

  1. poll records from main consumer and add to buffer
  2. restore from restore consumer and update standby tasks if necessary
  3. process active tasks from the buffers
  4. punctuate active tasks if necessary
  5. commit all tasks if necessary

runOnce の中身を把握することで「Kafka Streams のレコード処理の流れ」の理解に繋がります。
それぞれのステップを解説していきます。

poll records from main consumer and add to buffer

Consumer で取得したレコードをバッファリングします。

ここでは Consumer の resume を試みた後の StreamThread#pollPhase の処理を見ると把握できます。

void runOnceWithoutProcessingThreads() {
  // ---snip---

  final long pollLatency;
  taskManager.resumePollingForPartitionsWithAvailableSpace();
  pollLatency = pollPhase();

  // ---snip---
}

pollPhase では、main consumer でレコードを取得して、TaskManager を経由して StreamsTask が保持する PartitionGroup に追加しています。
レコードの取得部分は、pollPhaseStreamThread#pollRequestsPartitionGroup への追加は TaskManager#addRecordsToTasks で行っています。

main consumer は、Topology の Input Topic からレコードを取得する Kafka Clients の Kafka Consumer です。TaskManager は、その名の通り Task を管理するクラスで StreamThread からの Task 操作は、この TaskManager を介して行われます。

PartitionGroup は、Task 毎に存在していて、Consumer から取得したレコードを保管しています。
複数の Topic を Join するときなど、1つの Task で複数 Topic(Partition 番号は同じ)のレコードを保管することがあるので、Partition"Group" という命名になっているのでしょう。

レコードを処理するときは、この PartitionGroup からレコードを引っ張ってきます。

restore from restore consumer and update standby tasks if necessary

StateStore では、耐久性を向上させるために、変更ログを保存しておく Changelog Topic を作成し、そこに StateStore のデータのバックアップを書き込むことができます。
restore とは、その Changelog Topic から StateStore の復元する処理です。

restore consumer は、StateStoreChangelog Topic からレコードを取得するための Kafka Clients の Kafka Consumer です。
main consumer とは別であり、それぞれで異なる Config Property を指定できるので、restore に最適化したチューニングも可能になっています。

restore のステップでは runOnce 内の initializeAndRestorePhase を見ます。

void runOnceWithoutProcessingThreads() {
  // ---snip---
  
  if (!stateUpdaterEnabled) {
    initializeAndRestorePhase();
  }
  
  // ---snip---
}

initializeAndRestorePhase では、Task の初期化と StoreChangelogReader による StateStore の復元が行われます。
復元が完了した Task のステータスは RESTORING から RUNNING に変わり、 すべての Task のステータスが RUNNING になると、StreamThread のステータスも RUNNING に変わります
ChangeLog が有効な StateStore が存在しない場合でも Task のステータスは一度 RESTORING になりますが、すぐに RUNNING へと変わります。

ただし、runOnce で restore が行われるのは、stateUpdaterEnabled が false のときです。 Kafka Streams 3.8.0 から stateUpdaterEnabled は true です(Private property __state.updater.enabled__ で変更可能です)。 stateUpdaterEnabled が true の場合は、復元処理は runOnce の中で行われず、StateUpdaterThread という別の Thread にて StoreChangelogReader による restore が実行されます。別 Thread で restore をすることで、main consumer の poll 間隔を気にせず1、大量のレコードを一度に復元できます。(see: KAFKA-10199) そして、runOnce では checkStateUpdater で各 Task の復元完了を都度チェックすることになります。

void runOnceWithoutProcessingThreads() {
  // ---snip---

  if (stateUpdaterEnabled) {
    checkStateUpdater();
  }
  
  // ---snip---
}

具体的な StateStore の復元処理は、StoreChangelogReader#restore から読み進めると良いです。
そこでは restore consumer から取得したレコードを StateStore に書き込む処理も含みますが、StateStore の実装によって Changelog Topic のレコードから StateStore のレコードに変換する方法が異なるので、復元のための Callback を呼びだすようにしています。実際の Callback の中身はそれぞれの StateStore 側に実装されています。

復元処理が完了したあと、Standby Task が割り当てられていたらその Task を実行します
今回は詳しく説明しませんが、Standby Task は ChangeLog から StateStore のシャドウコピーを更新するタスクで、restore consumer が使われます。

process active tasks from the buffers

restore が完了した Task は、PartitionGroup にバッファリングされたレコードを逐次処理していきます。 レコードの実体は、PartitionGroup が Partition 単位で持っている RecordQueue にあります。 RecordQueue はレコードを FIFO で保持するためのキューで、次に処理するレコードは headRecord に格納しています。Consumer で Offset Commit をするときは、処理したレコードの次のレコードの Offset を使うので、Commit 時には headRecord の Offset が使われます。

PartitionGroup から取ってきたレコードは、Kafka Streams で定義した Topology に従ってレコードの処理されます。

以下が Topology のイメージ図です。
Topology とは、Processor Node(レコードに対してなんらかの処理を行う最小単位)を組み合わせた有向非巡回グラフ(DAG)です。

Kafka Streams プロセッサートポロジー 引用: https://kafka.apache.org/38/documentation/streams/core-concepts#streams_topology

Processor Node の中でも Source Node と Sink Node は特殊な Node になります。

  • Source Node: Kafka Topic からレコードを取得して、ダウンストリームの Processor Node に渡す
  • Sink Node: アップストリームから受信したレコードを Kafka topic に書き込む

Kafka Streams の Processor API では、AddSourceAddProcessor などのメソッドで Topology を構築するので、Processor API を使っているとイメージしやすいのではないでしょうか。

この Topology に従ったレコード処理は、runOnce 内の taskManager.process のコードを追っていくと良いです。

void runOnceWithoutProcessingThreads() {
  // ---snip---

  final int processed = taskManager.process(numIterations, time);
  final long processLatency = advanceNowAndComputeLatency();
  totalProcessLatency += processLatency;

  // ---snip---
}

Topology の処理は、taskManager.process を辿っていった StreamTask#doProcess で実行されます。 currNode.process が開始点です。

private void doProcess(final long wallClockTime) {
  // ---snip---
  
  final Record<Object, Object> toProcess = new Record<>(
      record.key(), 
      record.value(), 
      processorContext.timestamp(), 
      processorContext.headers()
  );
  maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
    
  // ---snip---
}

ここの currNode は Source Node にあたりますが、Source Node の process でやっていることは、次の Processor Node にレコードを渡すことだけです。

@Override
public void process(final Record<KIn, VIn> record) {
  context.forward(record);
  processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());
}

Source Node はレコードの Deserializer を持っていますが、デシリアライズPartitionGroup へ追加するときに行われるので、SourceNode では、context.forward(record) を実行するだけです。
ProcessorContext#forward を行うと、ダウンストリームの Processor Node の process メソッドが実行されます。

Source Node の後続の Processor Node で一通り処理を行った後、Topic に書き込んで終了する場合は Sink Node の process が呼ばれます。 Sink Node は、Output Topic のための Serializer を保持していて、process では RecordCollector#send を実行しています。

@Override
public void process(final Record<KIn, VIn> record) {
  final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();

  final KIn key = record.key();
  final VIn value = record.value();

  final long timestamp = record.timestamp();

  final ProcessorRecordContext contextForExtraction =
    new ProcessorRecordContext(
        timestamp,
        context.offset(),
        context.partition(),
        context.topic(),
        record.headers()
      );

  final String topic = topicExtractor.extract(key, value, contextForExtraction);

  collector.send(
      topic,
      key,
      value,
      record.headers(),
      timestamp,
      keySerializer,
      valSerializer,
      name(),
      context,
      partitioner);
}

RecordCollector は Kafka Topic に produce するための Kafka Streams のインターフェースで、実体は RecordCollectorImpl で Stream Task 毎に存在します。
RecordCollectorImpl は Kafka Producer を持っているので、Sink Node は RecordCollector を介して Kafka Producer にレコードを送ります。Sink Node だけではなく、State Store の ChangeLog も同様に RecordCollector 経由で Topic に書き込まれます。
Kafka Producer に送られたレコードは linger.ms の間隔2で Kafka Broker にリクエストされるので、後述する Commit のフェーズよりも前に Kafka Broker へ書き込まれることがあります。

runOnce では、PartitionGroup にバッファされたレコードが空になるか、処理レコード数が一定数に達するまで、上記の処理を繰り返します。

punctuate active tasks if necessary

レコードの処理が終わった後、Punctuator が登録されている場合は Punctuator の処理を行います。

runOnce のコードは taskManager.punctuate の部分になります。

void runOnceWithoutProcessingThreads() {
  // ---snip---

  final int punctuated = taskManager.punctuate();
  totalPunctuatorsSinceLastSummary += punctuated;
  final long punctuateLatency = advanceNowAndComputeLatency();
  totalPunctuateLatency += punctuateLatency;

  // ---snip---
}

ProcessorProcessorContext#schedule を使って登録することで、Punctuator とその実行タイミングを設定できます。

taskManager.punctuate が呼ばれたときに、スケジュールされたタイムスタンプを超えている場合に Punctuator が実行されます。
PunctuatorrunOnce 内で直列に実行されるので、重い処理が行われると、メインのストリーム処理が遅延するので注意が必要です。

commit all tasks if necessary

Offset Commit や StateStore の checkpoint ファイルの更新を試みます。

コードは runOncemaybeCommit です。

void runOnceWithoutProcessingThreads() {
  // ---snip---

  final long beforeCommitMs = now;
  final int committed = maybeCommit();
  final long commitLatency = Math.max(now - beforeCommitMs, 0);
  totalCommitLatency += commitLatency;

  // ---snip---
}

maybeCommit では、直近の Commit から commit.interval.ms が経過しているか、ProcessorProcessorContext#commit によるリクエストがされた場合、Commit 処理を行います。

Commit 処理の詳細は、TaskManager#commit から辿っていった TaskExecutor#commitTasksAndMaybeUpdateCommittableOffsets を確認するのが良いです。

int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit,
                                                final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
  int committed = 0;
  for (final Task task : tasksToCommit) {
    // we need to call commitNeeded first since we need to update committable offsets
    if (task.commitNeeded()) {
      final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
      if (!offsetAndMetadata.isEmpty()) {
        consumedOffsetsAndMetadata.put(task, offsetAndMetadata);
      }
    }
  }

  commitOffsetsOrTransaction(consumedOffsetsAndMetadata);

  for (final Task task : tasksToCommit) {
    if (task.commitNeeded()) {
      task.clearTaskTimeout();
      ++committed;
      task.postCommit(false);
    }
  }

  return committed;
}

task.prepareCommit は、RecordCollector の flush を行い、コミット可能な Offset 情報と Topic Partition のメタデータを返します。
RecordCollector の flush とは、Kafka Producer の flush です。Producer にバッファされているレコードを直ちに Broker にリクエストします。(linger.ms による待機時間はありません)

commitOffsetsOrTransaction は、prepareCommit で返却された Offset 情報を元に、Kafka Consumer の commitSync を実行します。(processing.guaranteeat_least_once のとき)

task.postCommit では、StateStore の flush をして、checkpoint ファイルに Topic、Partition、Offset の情報を書き込みます。StateStore の flush はバックエンドによりますが、仮に RocksDB だったときは memtable の flush がトリガーされます。

まとめ

Kafka Streams がレコードを処理する際の一連の流れを簡単に解説しました。

今回は、あまり細部には触れませんでしたが、実際のトラブルシューティングではもう少し深い理解が求められる場面も少なくありません。
それでも、大まかな流れを把握しておくことは、問題発生時に素早く原因を特定する際に役に立つのではないでしょうか。また、日常的な開発や運用業務においても、より深く処理を読み解く際の手がかりや理解の土台として役立てていただけたら幸いです。


  1. 同じ Thread で restore を行う場合、max.poll.interval.ms を超過して consumer group の rebalance が発生しないよう、1イテレーション内の restore 処理時間が長くなりすぎないようにする必要があります。
  2. Kafka Streams では、デフォルト 100ms でオーバーライドしています。