Platform Team/Repro Core Unit の村上です。
Repro では Kafka を基盤としたストリーム処理のアプリケーションを構築する際に、Kafka Streams を積極的に活用しています。
Kafka Streams は、フォールトトレラントなステートフル処理を簡潔に実装でき、データパイプラインを Topology という表現で抽象化することで、複雑な処理でも管理しやすい形で組み立てていくことが可能です。
また、Apache Kafka 以外の外部依存がないことや Streams DSL によるシンプルな記述でストリーム処理を実装できることなども、ストリーム処理のアプリケーションをスムーズに構築する上で助かっています。
一方で、
なにかしらの問題が発生したときのトラブルシューティングや影響範囲調査の際には、Kafka Streams の内部処理を把握していないと適切な対応をすることが難しいこともあります。
内部処理を把握することは、Kafka Streams が提供するメトリクスの正確な理解やワークロードに応じたパフォーマンスチューニングなどにも役立ちます。
そこで本稿では、Kafka Streams の内部挙動を理解する第一歩として、
「Kafka Topic から取得したレコードはどのようなプロセスを経て処理されていくのか」という視点から内部の処理を追っていき、Kafka Streams で行われているレコード処理のおおよその流れを理解するところまでを目指します。
Kafka Streams の処理構造
引用: https://kafka.apache.org/38/documentation/streams/architecture
コードの詳細を見ていく前に、まずは全体像を掴むため Kafka Streams の処理を示した論理イメージを元に説明します。
図中の「Input Kafka Streams」と「Output Kafka Streams」は、入力と出力の Topic を表しています。どちらも複数の Topic を指定可能で、入力 Topic は必須です。
「Consumer」と「Producer」の部分は、Kafka Clients という Kafka Producer/Consumer のクライアントライブラリであり、実際に Kafka Broker とやりとりをするのは、Kafka Clients ライブラリです。
enable.auto.commit
など、Kafka Streams 側で Kafka Clients の Config Property をオーバーライドしている箇所もあったりするので、Kafka Streams と Kafka Clients の担当領域とそれらの関係を抑えておくことは内部を理解する上で重要です。
Kafka Streams は、この Kafka Clients をラップし、Streams DSL によるストリーム処理のシンプルな記述や StateStore によるステートフル処理などを提供することで、Kafka Clients の一段階上の抽象レイヤーを提供しています。
Streams DSL とは、レコードのファルタリング、集約、結合などのデータ操作処理を数行で記述できる DSL です。Processor API という低レベル API を基盤としていて、Streams DSL では実現が難しい場面は Processor API を使うことができます。
StateStore とは、ストリーム処理における状態を保持するための機能です。レコードの結合や複数レコードの集約など、一時的にレコードの情報を格納しておきたいケースで活用でき、StateStore を使うことでステートフルな処理を実現できます。Processor API では直接操作できますが、Streams DSL では StateStore の存在は隠蔽されています。
Kafka Streams では、Streams DSL や Processor API を使い、レコードの処理ロジックを Topology というグラフ構造で定義しますが、その Topology に従ってレコードを処理しているところが図中の「Task」です。
Topology は、Sub Topology という独立した 1 つのデータフローの集まりで構成され、Task はその Sub Topology と Input Topic の Partition 番号の組み合わせの数だけ作成されます。
例えば、ある Kafka Streams アプリケーションが 2 つの Sub Topology を持ち、どちらも Input Topic の Partition が 10 の場合、Task は合計 20 個つくられます。
Task は「Stream Thread」に割り当てられて実行されます。上図でも 1 つの Stream Thread で複数の Task が割り当てられている様子が示されています。
Kafka Streams の処理の流れは簡単にまとめると、以下のようになります。
- Kafka Clients の Consumer で Input Topic からレコードを取得
- Kafka Streams 上で定義した Topology に従い Stream Thread にて Task 単位でレコードを処理する
- 処理結果を Kafka Clients の Producer で Output Topic に書き込む(Output Topic がなければ Producer での書き込みはない)
Kafka Streams のレコード処理を追う
Kafka Streams の実際のコードを参照しつつ、Topic から取得したレコードを処理する一連の流れを見ていきましょう。
なお、今回参照する Kafka Streams の version は 3.8.0
です。
Kafka Streams アプリケーションを起動する際は、Streams DSL や Processor API を使って書き出した Topology と、Config もしくは Config Property を KafkaStreams
に渡し、KafkaStreams#start を実行します。
KafkaStreams#start を実行するまでのシンプルなコード例として WordCount の一部を抽出して若干書き換えたものを上げておきます。
final Properties props = new Properties(); props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); final StreamsBuilder builder = new StreamsBuilder(); final KStream<String, String> source = builder.stream("streams-plaintext-input"); final KTable<String, Long> counts = source .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(); counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); streams.start();
上記 WordCount の例でいうと、streams.start()
の部分です。
ここの KafkaStreams#start から処理を追っていきます。
KafkaStreams#start を実行すると StreamThread が起動します。
StreamThread は Java の Thread であり、num.stream.thread
で指定した数 (デフォルトは 1)、StreamThread
が起動します。
StreamThread
が起動すると runLoop というメソッドが実行されます。 この runLoop
こそが Kafka Streams のメインイベントループを実行する箇所で、主要な処理になります。
runLoop
では Kafka Consumer の subscribe を行ったあと、ループ内で Input Topic をポーリングして shutdown するまでレコードを処理を続けます。
ループ内で実行している 1 イテレーションを理解するときは、runOnce を見ます。
runOnce
は Kafka Streams 3.7 から runOnceWithProcessingThreads
と runOnceWithoutProcessingThreads
に分かれていて、デフォルトでは runOnceWithoutProcessingThreads
が呼ばれるようになっているので、今回はそちらを確認していきます。
runOnce
の各ステップは以下です(java doc に記載されているものをそのまま転記)
- poll records from main consumer and add to buffer
- restore from restore consumer and update standby tasks if necessary
- process active tasks from the buffers
- punctuate active tasks if necessary
- commit all tasks if necessary
runOnce
の中身を把握することで「Kafka Streams のレコード処理の流れ」の理解に繋がります。
それぞれのステップを解説していきます。
poll records from main consumer and add to buffer
Consumer で取得したレコードをバッファリングします。
ここでは Consumer の resume を試みた後の StreamThread#pollPhase の処理を見ると把握できます。
void runOnceWithoutProcessingThreads() { // ---snip--- final long pollLatency; taskManager.resumePollingForPartitionsWithAvailableSpace(); pollLatency = pollPhase(); // ---snip--- }
pollPhase
では、main consumer でレコードを取得して、TaskManager
を経由して StreamsTask
が保持する PartitionGroup
に追加しています。
レコードの取得部分は、pollPhase
の StreamThread#pollRequests、PartitionGroup
への追加は TaskManager#addRecordsToTasks で行っています。
main consumer は、Topology の Input Topic からレコードを取得する Kafka Clients の Kafka Consumer です。TaskManager は、その名の通り Task を管理するクラスで StreamThread
からの Task 操作は、この TaskManager
を介して行われます。
PartitionGroup は、Task 毎に存在していて、Consumer から取得したレコードを保管しています。
複数の Topic を Join するときなど、1つの Task で複数 Topic(Partition 番号は同じ)のレコードを保管することがあるので、Partition"Group" という命名になっているのでしょう。
レコードを処理するときは、この PartitionGroup
からレコードを引っ張ってきます。
restore from restore consumer and update standby tasks if necessary
StateStore
では、耐久性を向上させるために、変更ログを保存しておく Changelog Topic を作成し、そこに StateStore
のデータのバックアップを書き込むことができます。
restore とは、その Changelog Topic から StateStore
の復元する処理です。
restore consumer は、StateStore
の Changelog Topic からレコードを取得するための Kafka Clients の Kafka Consumer です。
main consumer とは別であり、それぞれで異なる Config Property を指定できるので、restore に最適化したチューニングも可能になっています。
restore のステップでは runOnce
内の initializeAndRestorePhase を見ます。
void runOnceWithoutProcessingThreads() { // ---snip--- if (!stateUpdaterEnabled) { initializeAndRestorePhase(); } // ---snip--- }
initializeAndRestorePhase
では、Task の初期化と StoreChangelogReader による StateStore
の復元が行われます。
復元が完了した Task のステータスは RESTORING から RUNNING に変わり、 すべての Task のステータスが RUNNING になると、StreamThread のステータスも RUNNING に変わります。
ChangeLog が有効な StateStore
が存在しない場合でも Task のステータスは一度 RESTORING になりますが、すぐに RUNNING へと変わります。
ただし、runOnce
で restore が行われるのは、stateUpdaterEnabled が false のときです。
Kafka Streams 3.8.0 から stateUpdaterEnabled
は true です(Private property __state.updater.enabled__
で変更可能です)。
stateUpdaterEnabled
が true の場合は、復元処理は runOnce の中で行われず、StateUpdaterThread という別の Thread にて StoreChangelogReader
による restore が実行されます。別 Thread で restore をすることで、main consumer の poll 間隔を気にせず1、大量のレコードを一度に復元できます。(see: KAFKA-10199)
そして、runOnce
では checkStateUpdater
で各 Task の復元完了を都度チェックすることになります。
void runOnceWithoutProcessingThreads() { // ---snip--- if (stateUpdaterEnabled) { checkStateUpdater(); } // ---snip--- }
具体的な StateStore
の復元処理は、StoreChangelogReader#restore から読み進めると良いです。
そこでは restore consumer から取得したレコードを StateStore
に書き込む処理も含みますが、StateStore
の実装によって Changelog Topic のレコードから StateStore のレコードに変換する方法が異なるので、復元のための Callback を呼びだすようにしています。実際の Callback の中身はそれぞれの StateStore 側に実装されています。
復元処理が完了したあと、Standby Task が割り当てられていたらその Task を実行します。
今回は詳しく説明しませんが、Standby Task は ChangeLog から StateStore
のシャドウコピーを更新するタスクで、restore consumer が使われます。
process active tasks from the buffers
restore が完了した Task は、PartitionGroup
にバッファリングされたレコードを逐次処理していきます。
レコードの実体は、PartitionGroup
が Partition 単位で持っている RecordQueue にあります。
RecordQueue
はレコードを FIFO で保持するためのキューで、次に処理するレコードは headRecord に格納しています。Consumer で Offset Commit をするときは、処理したレコードの次のレコードの Offset を使うので、Commit 時には headRecord の Offset が使われます。
PartitionGroup
から取ってきたレコードは、Kafka Streams で定義した Topology に従ってレコードの処理されます。
以下が Topology のイメージ図です。
Topology とは、Processor Node(レコードに対してなんらかの処理を行う最小単位)を組み合わせた有向非巡回グラフ(DAG)です。
引用: https://kafka.apache.org/38/documentation/streams/core-concepts#streams_topology
Processor Node の中でも Source Node と Sink Node は特殊な Node になります。
- Source Node: Kafka Topic からレコードを取得して、ダウンストリームの Processor Node に渡す
- Sink Node: アップストリームから受信したレコードを Kafka topic に書き込む
Kafka Streams の Processor API では、AddSource
やAddProcessor
などのメソッドで Topology を構築するので、Processor API を使っているとイメージしやすいのではないでしょうか。
この Topology に従ったレコード処理は、runOnce 内の taskManager.process のコードを追っていくと良いです。
void runOnceWithoutProcessingThreads() { // ---snip--- final int processed = taskManager.process(numIterations, time); final long processLatency = advanceNowAndComputeLatency(); totalProcessLatency += processLatency; // ---snip--- }
Topology の処理は、taskManager.process
を辿っていった StreamTask#doProcess で実行されます。
currNode.process
が開始点です。
private void doProcess(final long wallClockTime) { // ---snip--- final Record<Object, Object> toProcess = new Record<>( record.key(), record.value(), processorContext.timestamp(), processorContext.headers() ); maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor); // ---snip--- }
ここの currNode
は Source Node にあたりますが、Source Node の process でやっていることは、次の Processor Node にレコードを渡すことだけです。
@Override public void process(final Record<KIn, VIn> record) { context.forward(record); processAtSourceSensor.record(1.0d, context.currentSystemTimeMs()); }
Source Node はレコードの Deserializer を持っていますが、デシリアライズは PartitionGroup
へ追加するときに行われるので、SourceNode では、context.forward(record)
を実行するだけです。
ProcessorContext#forward
を行うと、ダウンストリームの Processor Node の process
メソッドが実行されます。
Source Node の後続の Processor Node で一通り処理を行った後、Topic に書き込んで終了する場合は Sink Node の process
が呼ばれます。
Sink Node は、Output Topic のための Serializer を保持していて、process では RecordCollector#send を実行しています。
@Override public void process(final Record<KIn, VIn> record) { final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); final KIn key = record.key(); final VIn value = record.value(); final long timestamp = record.timestamp(); final ProcessorRecordContext contextForExtraction = new ProcessorRecordContext( timestamp, context.offset(), context.partition(), context.topic(), record.headers() ); final String topic = topicExtractor.extract(key, value, contextForExtraction); collector.send( topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, name(), context, partitioner); }
RecordCollector
は Kafka Topic に produce するための Kafka Streams のインターフェースで、実体は RecordCollectorImpl で Stream Task 毎に存在します。
RecordCollectorImpl
は Kafka Producer を持っているので、Sink Node は RecordCollector
を介して Kafka Producer にレコードを送ります。Sink Node だけではなく、State Store の ChangeLog も同様に RecordCollector
経由で Topic に書き込まれます。
Kafka Producer に送られたレコードは linger.ms の間隔2で Kafka Broker にリクエストされるので、後述する Commit のフェーズよりも前に Kafka Broker へ書き込まれることがあります。
runOnce
では、PartitionGroup
にバッファされたレコードが空になるか、処理レコード数が一定数に達するまで、上記の処理を繰り返します。
punctuate active tasks if necessary
レコードの処理が終わった後、Punctuator が登録されている場合は Punctuator
の処理を行います。
runOnce
のコードは taskManager.punctuate の部分になります。
void runOnceWithoutProcessingThreads() { // ---snip--- final int punctuated = taskManager.punctuate(); totalPunctuatorsSinceLastSummary += punctuated; final long punctuateLatency = advanceNowAndComputeLatency(); totalPunctuateLatency += punctuateLatency; // ---snip--- }
Processor で ProcessorContext#schedule を使って登録することで、Punctuator
とその実行タイミングを設定できます。
taskManager.punctuate
が呼ばれたときに、スケジュールされたタイムスタンプを超えている場合に Punctuator
が実行されます。
Punctuator
も runOnce
内で直列に実行されるので、重い処理が行われると、メインのストリーム処理が遅延するので注意が必要です。
commit all tasks if necessary
Offset Commit や StateStore
の checkpoint ファイルの更新を試みます。
コードは runOnce
の maybeCommit です。
void runOnceWithoutProcessingThreads() { // ---snip--- final long beforeCommitMs = now; final int committed = maybeCommit(); final long commitLatency = Math.max(now - beforeCommitMs, 0); totalCommitLatency += commitLatency; // ---snip--- }
maybeCommit
では、直近の Commit から commit.interval.ms
が経過しているか、Processor で ProcessorContext#commit によるリクエストがされた場合、Commit 処理を行います。
Commit 処理の詳細は、TaskManager#commit から辿っていった TaskExecutor#commitTasksAndMaybeUpdateCommittableOffsets を確認するのが良いです。
int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit, final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) { int committed = 0; for (final Task task : tasksToCommit) { // we need to call commitNeeded first since we need to update committable offsets if (task.commitNeeded()) { final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit(); if (!offsetAndMetadata.isEmpty()) { consumedOffsetsAndMetadata.put(task, offsetAndMetadata); } } } commitOffsetsOrTransaction(consumedOffsetsAndMetadata); for (final Task task : tasksToCommit) { if (task.commitNeeded()) { task.clearTaskTimeout(); ++committed; task.postCommit(false); } } return committed; }
task.prepareCommit は、RecordCollector
の flush を行い、コミット可能な Offset 情報と Topic Partition のメタデータを返します。
RecordCollector
の flush とは、Kafka Producer の flush です。Producer にバッファされているレコードを直ちに Broker にリクエストします。(linger.ms による待機時間はありません)
commitOffsetsOrTransaction は、prepareCommit
で返却された Offset 情報を元に、Kafka Consumer の commitSync
を実行します。(processing.guarantee
が at_least_once
のとき)
task.postCommit では、StateStore
の flush をして、checkpoint ファイルに Topic、Partition、Offset の情報を書き込みます。StateStore
の flush はバックエンドによりますが、仮に RocksDB だったときは memtable の flush がトリガーされます。
まとめ
Kafka Streams がレコードを処理する際の一連の流れを簡単に解説しました。
今回は、あまり細部には触れませんでしたが、実際のトラブルシューティングではもう少し深い理解が求められる場面も少なくありません。
それでも、大まかな流れを把握しておくことは、問題発生時に素早く原因を特定する際に役に立つのではないでしょうか。また、日常的な開発や運用業務においても、より深く処理を読み解く際の手がかりや理解の土台として役立てていただけたら幸いです。
- 同じ Thread で restore を行う場合、max.poll.interval.ms を超過して consumer group の rebalance が発生しないよう、1イテレーション内の restore 処理時間が長くなりすぎないようにする必要があります。↩
- Kafka Streams では、デフォルト 100ms でオーバーライドしています。↩