夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Kafka 概要(Design)和訳まとめ

こんにちは。ようやく1ページ全てまとめ終わったので、
最後にまとめ投稿として投稿しておきます。

尚、ページは下記です。
http://kafka.apache.org/07/design.html

==========

1.何故Kafkaは作られたのか?

元々はLinkedInのActivity StreamとData Processingをパイプライン式に繋ぐために開発されたプロダクト。
最近はTumblr、DataSiftといった企業でも使用されている。
 → SNSや、複数のサービスの情報を統合するようなシステムで使われているようです。

ここでいうActivity Streamとは
Webページで閲覧、検索、リンク設定などを行う活動全般を指す。
これらのデータは通常のシステムならば、ログファイルとして出力し、後で別途解析に用いられる。

もう一つ言葉を定義する。
Operational Dataはサーバのパフォーマンスデータ(CPU、IO等)を指す。
これらのデータの統合には様々なアプローチがある。

Activity DataやOperational DataはWebサイト運営の上で非常に重要なデータとなる。
だからこそ、これらを収集解析するにはこれまでより洗練されたインフラが必要。
Kafkaは上記を達成する『洗練するインフラ』として開発された。

2.Activity Data/Operational Dataのユースケース例

友人内で共有されたニュースフィード
投票結果によって最適解をしぼるランキングコンテスト
APIやメール受信数から攻撃やスパムを検知するセキュリティ対策
運用監視:あるシステムやサイトのレスポンスが悪化していないかを検知
レポーティングやバッチ実行:データ解析に必要なDWHやHadoop等へのデータロードを実行

3.Activity Stream Dataの特徴

高負荷なシステムのストリームデータ解析を行う際、
データ量は既存システムの数十倍、数百倍のオーダーに達する。

上記の前提にの上で、
これまで行われてきたログファイル収集は
オフラインの状況においては厳密かつスケーラブルだった。

だが、リアルタイムプロセッシングを行うには遅すぎる上に、
運用も複雑になる傾向がある。

対して、既存のメッセージング/キューイングシステムは
リアルタイムプロセッシングを行うには適しているが、
永続化層が遅い関係上非常に大きい処理待ちのキューを抱えることとなっていた。

Hadooopのようなオフラインシステムにデータを投入する際にも同様の問題が発生し、
これまでは1時間単位や、1日単位の処理しか提供できなかった。

Kafkaはこれらの遅延、処理待ちキュー肥大化の問題を解決するために
開発されたキューイングプラットフォームである。

Hadoopのようなオフラインのシステム、
リアルタイムプロセッシングのようなオンラインシステムの
両方に対して単一のキューを用いて情報の収集を行う。

4.適用例

下記の画像がLinkedInで使用されている構成の概要となる。

単一のKafkaクラスタが複数の異なるデータソースから収集される
Activityデータを扱っている。
Kafkaクラスタはオンライン/オフラインのデータ利用者に対して単一のデータパイプラインを提供する。

上記のような構成において、Kafkaは実Activityと非同期処理を仲立ちするバッファとして働いている。

また、Kafkaはこれらの全データを別データセンターで使用するために同期する機能も提供している。
Kafkaは1クラスタでデータセンターをまたぐような構成ではないが、
複数のクラスタを構築することでデータセンター間のフローも実現する。

実際の構成として、コピー先のKafkaクラスタはコピー元のKafkaクラスタの
Consumerとして扱うのみであり、非常にシンプル。

また、上記の特徴は複数のデータセンターから
1か所にデータを集約可能であることも示す。

下の図は複数データセンター間でバッチロードを補助するための構成例。

上記の構成において、2つのLocal Kafka Cluster間の通信は行われない。
2つのLocal Kafka Clusterは異なるサイズ、異なる数のノードを保持することが出来る。
また、1つのKafka Clusterは任意の数のSource Kafka Clusterの
データをコピーすることができる。

ミラーリングについての詳細はこちら(別ページ)参照。

5.基本設計方針

Apache Kafkaの持つ設計方針は下記の通り。

1.Kafkaは永続化する必要があるメッセージを共通的に扱うために設計
2.スループットより、1の特徴を満たすことを優先して設計
3.状態はプロデューサ/ブローカではなく、コンシューマが更新
4.Kafkaは完全分散型。プロデューサ/ブローカ/コンシューマは複数マシンに分散することを前提とする。

6.基本コンセプト

KafkaではMessageを基本の通信単位とする。

Messageはトピックという扱いでプロデューサからサーバ(ブローカとして動作)に送信される。
複数のコンシューマがトピックを取得設定した場合、
全コンシューマに対して各々のメッセージが送付される。

これらのプロデューサ/ブローカ/コンシューマは別マシンに分散配置することができ、
論理的なグルーピングを構成して協調動作する。

上記の動作はプロデューサ/ブローカにとっては自然だが、
コンシューマはいくつか設定が必要となる。

各コンシューマはコンシューマグループに所属することでメッセージの受信が可能となる。
『コンシューマグル―プに所属しているコンシューマ群』に対して全メッセージが配信される形となる。
#コンシューマグループに所属するコンシューマのうち1つに配信。

つまりは、コンシューマグループとは、
複数のプロセス/マシンを論理的に1つのコンシューマとして見せる仕組みとなる。

このコンシューマグループによってキューやJMSトピックのサポートが可能。

キューイングをサポートする際には、複数のコンシューマを1コンシューマグループに所属させ、
各メッセージを1つのコンシューマに送付する。

JMSトピックをサポートする際には、
各コンシューマが各々コンシューマグループを保持するようにすれば、
全コンシューマに対して全メッセージが配信される。

一般的なKafka利用形態として、
複数のコンシューマグループを持ち、各々が全メッセージを受信するケースとなる。

Kafkaはコンシューマの数がいくつあっても問題ないことから、
かつ1つのメッセージを1回しか処理してはいけないという制約がない場合に適する。

7.メッセージの永続化/保持

ファイルシステムを恐れるな!

Kafkaはメッセージの保存と保持にあたってファイルシステムに密接に依存している。
ディスクは遅いという一般見解があるため、
こう書いてしまうと性能を確保できるか疑われるかもしれない。

ただ、実際のところディスクはどう使うかで大きくパフォーマンスが変わる。
ディスクの構造に併せた設定を行えば、ネットワークと同等の性能を出すことも可能である。

ディスク性能に関するポイントは、
ディスク性能はディスクのシークによって遅延が発生するということ。

実際のところ、シーケンシャル領域に対する書き込み性能は
6台構成の7200rpmのSATA-RAID5ディスクで300MB/secにも達する。
だが、ランダム書き込みは50KB/secまで落ち込んでしまう。
実に、10000倍近い差が出てくる。

連続的に読み書きは利用方法に応じて予測可能。
ゆえに、事前読込/遅延書込を駆使して論理的な小さい読込/書込に
マージすれば、ディスクの性能を引き出すことができる。

上記の関連情報として、下記の情報がある。

つまりは、シーケンシャルなディスクに対するスループットは、
ランダムなメモリアクセスを上回る。

上記にあるような性能の差分を補うために、
最近のOSはメモリをディスクのキャッシュ領域として積極的に用いている。

最近はどんなOSでもメモリ解放の(若干の)性能負荷を見込んだ上で
空きメモリ領域を全てディスクキャッシュとして転用している。

ディスク読込/書込は上記のディスクキャッシュを通して行われる。

そのため、プロセスが保持しているプロセス内キャッシュが
OSの保持しているページキャッシュと重複してくる。
結果、重複した書き込みが行われる状況を
ダイレクトIOを用いずに解消することは容易ではない。

更に忘れてはならない内容として、JVM上の
Javaのメモリ利用として下記の2つを押さえておく必要がある。

1.オブジェクトのメモリオーバヘッドが大きく、しばしば重複して保持される
2.JavaのGCはヒープ上のデータが増えるにつれて、大雑把かつ負荷は増大する

上記の要素の結果として、
ページキャッシュに依存したファイルシステムは
メモリキャッシュなど他の領域に保持するより優れたものとなってくる。

全空きメモリの少なくとも倍のキャッシュを保持することで、
個々のオブジェクトに着目するより最終的に保持するバイト数は小さくなる。
結果、メモリが32GBのマシン上で28-30GBのキャッシュをGCのペナルティなしで持つことが可能となる。
#この辺うまく訳せていません。
#とりあえずプロセス内キャッシュを使うより素直にOSの機構を使え、ということなのは確かですが。

その上、これらのキャッシュはプロセスが再起動しても保持されます。
対して、プロセス内キャッシュの場合プロセス再起動時に再度初期化するか、
初期化用データを破棄してコールド起動する必要があります。
(キャッシュ初期化には10GBで10分かかるということもあり得ます。
 また、コールド起動の場合初期のパフォーマンスが低下します。)

加えて、キャッシュとファイルシステムの同期を
プロセスで1回行うという実装と比べて確実に実施されるOSにゆだねる事によって、
コードが簡略化されます。
また、ディスク使用が線形アクセスが多い場合、
ページキャッシュによる先読み機構がより有効に作用します。


これまで記述した設計は出来る限りメモリ上にデータを保持し、
必要な場合のみディスクに出力する・・・という設計と比して非常にシンプルです。
全てのデータは特に書き込み命令の実施無しに常時永続化されます。
→ 実際にはOSが後で永続化するためのファイルキャッシュに書き込むのみです。

ハードクラッシュによって消えるリスクがある状態のメッセージ増大を防ぐため、
一定時間ごと/一定数のメッセージを保持するたびに物理ディスクへの書き込みを行う
設定の追加を可能としている。

ページキャッシュ中心のこの設計はこのページを参考にしてほしい。
理解の手助けとなるはずだ。

一定の時間内に満たせるよう収める

メッセージング·ã‚·ã‚¹ãƒ†ãƒ ã®ãƒ¡ã‚¿ãƒ‡ãƒ¼ã‚¿ã«ä½¿ç”¨ã•ã‚Œã‚‹æ°¸ç¶šçš„なデータ構造は、多くの場合、Btreeである。
Btree構成は最も汎用性の高いデータ構造が利用可能であり、
メッセージングシステムにおけるトランザクションと非トランザクションの処理を幅広くサポートすることが可能。

但し、BtreeのアクセスにはO(logn)のコストがかかる。
通常であればO(logn)の時間は定数として扱われるが、ディスクを使用するというアクセス形態の場合これは真では無い。
ディスクからデータを取得するには10ミリ秒程の時間がかかり、
かつ1つのファイルをシークしている最中に別のファイルをシークすることはできない。
それを考えると、Btreeへのアクセスを行う場合それだけでディスクに高い負荷をかけることにつながる。

ストレージシステムは、物理ディスク操作とキャッシュ操作を混在しているため見える性能は大きく変動する。
さらにBtreeでは各操作にツリー全体をロックを避けるために非常に洗練されたページまたは行ロックの実装を必要とする。
実際に実現するには行ロックにかなり高いコストかけるか、または他の全ての読み取りを直列化する必要がある。

ディスクに依存して上記のロック構造を実現する場合ディスクの性能向上/密度向上を享受することができない。
細切れのアクセスが大量に発生するためである。

Kafkaの場合、ロギングソリューションと同様に永続的キューはシンプルなリード上に構築されており、後からデータを追加することが可能。
この構造はBtreeが保持するような豊富な機能を持たない代わりに、
ディスクにアクセスする際の速度をデータのサイズに依存しないO(1)にすることができ、お互いにブロックも発生させない。
O(1)にしたことで、性能を実データサイズと分離することができる。
結果、一般的なSATAの1TB+のディスク容量を最大限に活用することが可能。

8.効率を最大化

Kafka開発にあたっておいた仮定はメッセージの量が非常に高いということ。
また、全てのメッセージが1回、または複数回読まれると仮定している。
そのため、Kafkaでは「メッセージの消費」ではなく、「メッセージの生成」に最適化した構造を取る。

上記を実現するために立ちはだかる障害として、下記2点がある。

  1. 大量のネットワークIO
  2. 過度のメモリ上のバイトコピー

これらの問題を前提としたうえで効率化を図るため、
APIはGroup Messageに "MessageSet"として抽象化を加えて構築されている。
このAPI構成によって、複数のメッセージを同時にネットワーク転送することが可能で、
1メッセージごとに通信が発生するよりもネットワークのコストを抑えることができる。

尚、このMessageSetの実装においてはバイト配列またはファイルをラップした非常に薄いAPIを提供する。
メッセージのシリアライズ/デシリアライズは必要な場合のみ行われ、かつ遅延実行される。
(必要がない場合デシリアライズされないようにしている)

Brokerプロセスによって永続化されるメッセージログは単純にディスクに出力されたMessageSetのディレクトリのみ。
この単純な構造によって、BrokerプロセスとConsumerプロセスで同一のファイル形式を共有することが可能。
(また、Producerプロセスから受信したメッセージを検証した上で追記することが可能←?)

これらの永続化メッセージとネットワーク転送を共通フォーマット化することで各操作の最適化が可能。
UNIX系オペレーティング·ã‚·ã‚¹ãƒ†ãƒ ã§ã¯ã‚½ã‚±ãƒƒãƒˆã«ãƒšãƒ¼ã‚¸ã‚­ãƒ£ãƒƒã‚·ãƒ¥ã‹ã‚‰
データを転送するための高度に最適化されたコードパスを提供している。
Linuxではsendfileシステムコールを使用して行われる。
JavaはFileChannel.transferTo APIでこのシステムコールを利用可能。

sendfileを利用することによる利点を理解するためには、
ソケットからファイルへのデータの転送のための共通のデータパスを理解することが重要である。
通常のアプリケーションにおいてはファイル→ソケットへの転送は下記のフローで行われる。

  1. オペレーティングシステムは、カーネル空間でページキャッシュにディスクからデータを読み出す
  2. アプリケーションは、ユーザ空間バッファにカーネル空間からデータを読み出す
  3. アプリケーションは、ユーザ空間バッファからカーネル空間に対してデータを書き込み
  4. オペレーティングシステムはカーネル空間中のソケットバッファからNICバッファにデータをコピーする

これは明らかに非効率的である。
4つのコピー、2つのシステムコールを挟んでしまうため。
sendfileを用いればOSが直接ネットワークにページキャッシュからデータを送信することができ、複数回のコピーが回避される。
この最適化されたパスに必要となるのはNICバッファに対するコピーのみとなる。

Kafkaは一般的な使用例は、トピック上に対して複数のConsumerがいると仮定している。
上記のコピー最適化を使用して、データは一度だけページキャッシュにコピーしている。
通常のアプリケーションのようにメモリに格納し、要求があるたびにカーネル空間にコピーがされることはない。
このことによって、メッセージがネットワーク転送速度の限界に近い速度で消費されることを可能にする。

Javaでsendfileとゼロコピーサポートの詳細背景については、以下の記事を参照。

https://www.ibm.com/developerworks/linux/library/j-zerocopy/

9.エンドツーエンドのバッチ圧縮

多くの場合、ボトルネックはCPUではなくネットワークになる。
特にデータセンター間でメッセージを転送する必要があるネットワークトポロジを組んでいる場合は。

もちろん、ユーザはKafkaの介在なしでもいつでも圧縮されたメッセージを送信することはできるが、
えてしてその圧縮効率はメッセージの中のフィールドの繰り返し等の事情があり、悪い。
(JSONのフィールド名称、User Agentの値など)

効率的に圧縮を行うためには個々のメッセージ単位で圧縮を行うのではなく、
複数のメッセージをまとめて圧縮させる必要がある。

理想的にはエンドツーエンド圧縮方式・・・
メッセージはProducerプロセスが送信する前に圧縮され、サーバに圧縮状態で保持される。
その上で各Consumerプロセスで解凍する方式を取るのが望ましい。

Kafkaは再帰的なMessageSetによってこの理想的な圧縮方式を実現している。
メッセージは送信前にメッセージ群をまとめて圧縮し、その後用いるときまで圧縮状態で維持される。
KafkaではGZIPとSnappy圧縮方式をサポートしている。

10.メッセージ消費側駆動の状態管理方式

どのメッセージまでが消費されたかを管理する機能はメッセージングシステムの重要機能の1つ。
また、その状態管理方式は直感的ではないかもしれないが、メッセージングシステムの性能に大きく影響する。

状態管理は永続的な状態値(つまりはファイルに同期)を更新する必要があり、ランダムアクセスが発生する。
結果、ストレージシステムのスループットではなく、シーク時間に影響を受ける可能性がある。

ほとんどのメッセージングシステムにおいては、「状態管理」はBrokerプロセス(メッセージを提供する側)が管理する。
メッセージ提供側はConsumerに対してメッセージを送信した後、ローカルでも状態を更新する。
これは非常に分かりやすい方式だが、「実際にどこにメッセージが送信されたか」についてはわからない。

多くのメッセージングシステムはこの状態管理方式のためにあまりスケールしない。
だが、メッセージ提供側は処理が完了したメッセージをすぐ削除できるために保持するデータは小さく保つことができる関係上、
非常に実用的な選択であるのも確か。

実は、メッセージ提供側とメッセージ消費側のメッセージ消費状態の同意は重要な問題、というのは正しくないのかもしれない。(?)
もしメッセージ提供側がネットワークを通じてメッセージを送信するごとにローカルの状態を更新した場合、
メッセージ消費側がメッセージの処理に失敗した場合、メッセージは消失してしまう。

この問題を解決するために多くのメッセージングシステムにおいてはAck機能を搭載している。
これはメッセージ消費側から「このメッセージについては処理が完了した」と通知を受けることで、
最終的なメッセージの削除を行うもの。
それまでは送信したメッセージについてもフラグを設定するだけで削除は行わない。

この方式はメッセージが消失する問題については対処できるが、また新たな問題を生む。

第1の問題としてメッセージ消費側がメッセージを処理するが、Ackを送信する前に失敗した場合、2回メッセージは消費される。
第2の問題としてパフォーマンス周りの問題がある。
現在メッセージ提供側は1メッセージに対して複数の状態を保持してしまっている。
(未処理、送信済未完了=同じメッセージを違う消費者に送信しないために必要、完了)
また、「送信したが完了していない」が継続するという状態もメッセージ消費側の状態によっては発生するため、
それらの状態への対処も行う必要がある。

メッセージ配信セマンティクス

これまでからわかるように、メッセージ配信の保証方式として下記の3パターンが考えられる。

  1. 1回以下:前述した第1のケース。メッセージは提供後すぐ削除される。結果、消費側の障害によっては処理されないメッセージも発生する。
  2. 少なくとも1回:前述した第2のケース。各メッセージについて1回処理されることが保証されるが、失敗の場合は2回以上配信される。
  3. 1度だけ:何をしたいかによるが、各メッセージに対して「1回だけ」配信されることが保証されるケース

これらはつまりは「トランザクションの問題」のバリエーション。
2または3相コミット、Paxosのバリエーションのアルゴリズムによって「厳密に1回だけ」処理させることも可能。
ただし、いくつかの欠点も存在する。

上記のアルゴリズムを用いた場合複数回の往復通信が必要となり、
かつ特に時間制限も存在しないアルゴリズムのため状況次第では完了しないまま放置されるようなこともあり得る。

分散同意アルゴリズムには上記のような制約がどうしても発生してくる。

Kafkaではこれらのメタデータに対して2つ珍しい対処を行っている。
1つ目として、ストリーム(データの流れ)はBrokerプロセスごとに全く異なるパーティションに分割している。
これらのパーティションの意味と、メッセージをどのパーティションに送信するかはProducerプロセスに委ねられている。
同一パーティション内のメッセージはBrokerプロセス内で受信順にソートされ、同じ順でConsumerプロセスに提供される。

これはつまりは各メッセージごとに状態管理(消費されたか、まだか)をするのではなく、
Consumer、トピック、パーティションごとにのみ状態管理をすればいいことを示している。

そのため、メッセージの消費状態は個々のメッセージに対してではなく
Consumer、トピック、パーティションについて保存される。
これをオフセットと呼び、この機構のおかげでKafkaはメッセージの消費状態について非常に小さいデータで管理可能となる。
詳細については実装セクション参照。

メッセージの消費状態管理

KafkaにおいてはConsumer側でメッセージをどこまで消費したかの状態(オフセット)を維持する必要がある。
KafkaのConsumerライブラリは基本自分がどこまでメッセージを処理したかの「状態値」をZookeeperに書き込む。
しかしながら、状態値をConsumerが処理結果を書き込むデータストアと同じデータストアに書き込むことが有益かもしれない。

例えば、Consumerはいくつかの集計値を集中型のトランザクショナルOLTPデータベースに入力することもできる。
このケースにおいてはConsumerはどこまでメッセージを処理したかの状態をデータベース更新と同トランザクションとして反映することもできる。
同トランザクションで実施することにより分散部分がなくなり、分散同意問題をそもそも発生させなくすることにつながる。
類似の対処はトランザクショナルでは無いデータストアにおいても良好に動作する。

検索システムはメッセージの処理状態を検索インデックスと共に保存することができる。
それは耐久性の保証を提供しない場合があるけれども「メッセージのオフセットと検索インデックスを同期させている」
そのため、検索インデックスを出力する場合にクラッシュして消えた場合、
「検索インデックス出力前のオフセット」から開始可能なため、保存した最新の状態から再開することが可能。

同様に、HadoopでKafkaからのパラレルロードを実行している当社のシステムにおいては
似たような状態同期と復旧を行うことが可能。
個々のマッパーはマップタスクの終了時にHDFSへの最後の消費メッセージのオフセットを書き込む。
HDFSに格納されたオフセットからジョブが失敗して再起動される場合、各マッパーは単に再起動すればよい。

この設計方針を取ったことにより、副次的な利点も生じている。
Consumerはオフセットを古い値に巻き戻すことにより、前の状態から再度データを消費することが可能。
これにより、一般的なキューの概念とは異なるが、消費者サイドから見ると都合がいい動作が実現可能。
例えば、消費する側のコードにバグがあり、メッセージを処理した後にバグが修正された場合、
バグが発現する前のオフセットから再スタートすることによりメッセージを再処理させることが可能。

プッシュ対プル

これまでのBroker、Consumer側の話に関連する質問として、
Broker側がデータをpushするのか、Consumer側がメッセージをpullするのか、がある。

Kafkaはデータの流れとして下記のようなデザインを取っている。

  1. Producer→Brokerにデータをpush
  2. Broker←Consumerにデータをpull

最近のScribeやflume等のシステムにおいてはログ収集に焦点を当て、
各ノードをBrokerとして動作させデータをpushするアプローチを取っている。
しかしながら、PushベースのシステムにおいてはBroker側はConsumer側で転送速度が制限されるような
多彩な消費モデルに対応するのは困難になる。

Kafkaの置くゴールとしては、消費者側で可能な限り最大のレートでメッセージ消費を可能にするというもの。
だが、残念ながらpushモデルのシステムにおいては提供者側が消費者側での最大効率を達成しようとした結果、
消費者側で処理可能な量以上のメッセージを送信してしまい、メッセージが過剰となる傾向がある。
消費者側でサービス停止などの問題が発生した場合、それはより容易に発生する。

pullベースのシステムにおいてはそのペースを消費者側で制御できるため、
一時的に処理速度が低下した場合であっても後から追い付くことが可能。
それはpushベースのシステムにおいて消費者側が状況を通知するよりシンプルなモデルに仕上がるため
Kafkaにおいてはpullベースのシステムモデルを取ることとしている。

11.システムの分散

Kafkaは基本的にはクラスタ全体に分散実行されるモデルを取っており、マスターノードと呼ばれる存在はない。
Brokerプロセスもお互いに1対1で対応しており、設定の手動更新なしにノードの追加削除を行うことができる。

同様に、ProducerプロセスとConsumerプロセスも動的に起動することが可能。
各BrokerプロセスはZookeeperにメタデータ(利用可能なトピック情報など)を登録する。
ProducerプロセスとConsumerプロセスはZookeeperに登録されたメタデータを元に
トピックを発見したりメッセージの生成/消費の協調を行うことが可能。

12.Producer

Producerプロセスの自動ロードバランシング

Kafkaは、ProducerのTCP接続のバランスをとるための専用ロードバランサを使用するために
クライアントサイドのロードバランシングをサポートしている。
専用のレイヤ4ロードバランサはBrokerプロセス経由でTCP接続のバランスを取ることによって動作する。

この構成の場合、Producerプロセスが生産したメッセージは全て単一のBrokerプロセスに集約される。
レイヤ4ロードバランサを使う利点は各Producerプロセスは単一のTCPコネクションのみを必要とし、
Zookeeperへの接続を行う必要がないことである。

欠点はロードバランシングがTCP接続レベルで行われており、結果として上手くバランシングが行われないかもしれない点。
もしいくつかのProducerプロセスが他のものより多くのメッセージを生成する場合、
コネクションを均等にはったとしても均等にメッセージが割り振られるわけではなくなる。

クライアントサイドのZookeeperベースのロードバランシングは、これらの問題のいくつかを解決する。
ZookeeperベースのロードバランシングはProducerプロセスに対して下記2機能を提供する。

  1. 動的なBrokerプロセス発見
  2. リクエスト毎のロードバランシング

同様に、ZookeeperベースのロードバランシングはProducerプロセスに対して
単なるランダムでは無いメッセージのIDや利用するユーザIDに応じたバランシングを可能にする。

この機能を「セマンティックパーティショニング」と呼ばれる。
以下でより詳細に説明する。

Zookeeperベースのロードバランシングの動作は下記の通り。
ZookeeperWatherは下記のイベントに対してコールバックを受け取ることができる。

  1. 新規Brokerプロセス追加
  2. Brokerプロセスがダウン
  3. 新規トピックの追加
  4. Brokerプロセスが既存トピックに対して登録

内部的にはProducerプロセスはBrokerプロセスへの接続プールをBrokerプロセス毎に保持する。
この接続プールはZookeeperからのコールバックを受け、
全ての生存Brokerプロセスへの接続の確立/維持を行っている。

特定のトピックのプロデューサー要求が来たとき​​に、Brokerパーティションはパーティショナで選択される。
(詳細はセマンティックパーティショニングのセクションを参照)
プールから取得したBrokerへの接続オブジェクトを用いて選択したBrokerパーティションへのデータ送信を行っている。

非同期送信

非同期ノンブロッキング操作はメッセージングシステムをスケールさせるための基本の要素。
KafkaではProducerプロセスがメッセージを生産する際に非同期リクエストディスパッチオプションを指定可能。
これは一定時間ごと、または一定サイズごとを事前に設定し、Producerプロセス内のメモリキューにバッファリングした要求をまとめて送付するもの。
データの生成レートは通常ノードごとに異なるため、非同期オプションを指定することで
ネットワーク内のトラフィックを軽減/効率的な利用を可能にし、Brokerプロセスへのリクエスト負荷を一定にするのに役立つ。

セマンティックパーティショニング

各メンバーのプロファイルに対する訪問者数を維持したいアプリケーションを考えるとする。
この場合、あるメンバーのプロファイルに対する訪問履歴イベントは出来れば同じパーティション、同じConsumerスレッドにおいて処理したいと考える。
KafkaのProducerプロセスは生存中のKafkaノード、およびパーティションへメッセージをマッピングさせる機能を持っている。
メッセージ中のいくつかの値を元にBrokerプロセス達に対してストリームを分割する機能・・という形で実現される。

このパーティショニング機能はkafka.producer.Partitionerインタフェースを継承したパーティショナーを作成することで
カスタマイズ可能。デフォルトではランダムでパーティショニングが行われる。
尚、上記の例においてはパーティショニングを行うキーはMEMBER_IDになり、
パーティショナーは Hash(MEMBER_ID) % num_partitions で配信先のBrokerプロセスを算出するだろう。

13.Hadoopその他のバッチデータロードのサポート

Kafkaはスケーラブルな永続性を保持しているため、バッチシステムのために
定期的にデータをロードすることも可能。
LinkedIn(?)ではHadoopクラスタとDWHにデータをロードするために使用している。

バッチ処理はデータロードフェーズと、非循環グラフ的な処理フローを実行してデータを出力するフェーズと
段階的に実行される。
このモデルをサポートする際にKafkaのある時点からのデータロードを再度実施できるという特長は
障害が発生した際に都合がいい要素となっている。

Hadoopのケースにおいて、Kafkaは各ノード/トピック/パーティション単位でデータを分割配分可能なため、
各Mapperタスクに対してデータロード負荷を均等に配分可能。
HadoopはTaskの実行管理を提供するが、Kafkaを利用することで重複データのロードの危険なしに再起動が可能。

14.ProducerのAPI

Producer側のAPIは以下の2つの低レベルAPIをラップしている。

  1. kafka.producer.SyncProducer
  2. kafka.producer.async.AsyncProducer
class Producer {
  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData producerData);

  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List<kafka.javaapi.producer.ProducerData> producerData);

  /* Closes the producer and cleans up */	
  public void close();
}

このAPIとした目的は、クライアントへの単一のAPIを介してすべての生産機能を公開すること。
新しいProducerを作成する際には下記のAPIを記述する必要がある。

1.複数のプロデューサーの要求をバッファリングして非同期ディスパッチ/キューイングを行うAPI

kafka.producer.Producerは(producer.type=async)と設定することで
シリアライズ/Brokerプロセスに対して配分する前に複数のリクエストをバッチ化する機能を提供する。
バッチのサイズは、いくつかの設定パラメータによって制御することができる。
イベントがキューに入った場合、「queue.time」「batch.size」のどちらかに達するまでキューでバッファリングされる。

バッファリングしたデータを処理する場合、
バックグラウンドスレッド(kafka.producer.async.ProducerSendThread)がキューからデータを取得し、
イベントハンドラ(kafka.producer.EventHandler)がデータのシリアライズと適切なBrokerプロセスへの送信を行う。

カスタムのイベントハンドラもevent.handlerの設定値にてプラグイン方式で追加することが可能。
この処理の流れをProduerパイプラインと呼び、各フェーズにコールバックやログ出力、
トレース用のメトリクス情報収集などの処理をプラグイン方式で追加する追加することが可能。

これらの処理は kafka.producer.async.CallbackHandler を継承したクラスを
設定値「callback.handler」に設定することで追加可能。

2.エンコーダの指定

ユーザ側で下記のインタフェースを継承したエンコーダを指定する。

interface Encoder<T> {
  public Message toMessage(T data);
}

デフォルトでは何も実施しない「kafka.serializer.DefaultEncoder」クラスが使用される。

3.Zookeeperベースの自動Brokerプロセス検知

Zookeeperベースの自動Brokerプロセス検知、ロードバランシングはZookeeperへの接続URLを
設定値「zk.connect」に設定することで使用可能。
ただ、いくつかのユースケースにおいてはZookeeperへの依存が不適切となるケースもある。

その場合はProducer側の「broker.list」に静的なBrokerリストを定義する形で対応可能。
この設定を行った場合、Producer要求はランダムBrokerパーティションに配分される。
もし送信時に対象のBrokerがダウンしていた場合、そのProducerリクエストは失敗する。

4.必要に応じてパーティショナーを定義

ルーティングは下記のインタフェースを継承したクラスを定義することで実現可能。

interface Partitioner<T> {
   int partition(T key, int numPartitions);
}

このAPIはキーとパーティション数を渡して配分先のパーティションIDを取得する構成になっている。
このIDはProducerリクエストに対してBrokerパーティションを選択するbroker_idsと
パーティションリストへのインデックスとして使用される。

デフォルトのパーティショナーはキーのハッシュ値をnumPartitionsで剰余を取ることで配分を行う。
キーがnullの場合ランダムBrokerパーティションで配分を行う。
カスタムパーティショナーは設定値「partitioner.class」で定義が可能。

15.ConsumerのAPI

KafkaではConsumerのAPIとして2レベルのAPIを保持している。
低レベルの "Simple" APIは、単一のBrokerプロセスへの接続を維持し、
サーバに送信されるネットワーク要求に密接な対応を有している。

高レベルのAPIはConsumerからBrokerプロセスの詳細を隠し、背後のネットワークトポロジ等も気にすることなく
メッセージを取得することが可能となる。
また、自動的にConsumer側のオフセットを設定する。
加えて、高レベルAPIはブラックリスト/ホワイトリスト方式(正規表現を使用可能)でメッセージをフィルタリングして購読することが可能。

低レベルAPIの構成は下記の通り。

class SimpleConsumer {
	
  /* Send fetch request to a broker and get back a set of messages. */ 
  public ByteBufferMessageSet fetch(FetchRequest request);

  /* Send a list of fetch requests to a broker and get back a response set. */ 
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);

  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

低レベルAPIは高レベルAPIを構成するために使用されるだけでなく、
Hadoop向けConsumer等のように状態管理が一部特殊となるConsumerの構成にも使用されている。

高レベルAPIの構成は下記の通り。

/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {
	
  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 

  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()
  
  /* Shut down the connector */
  public shutdown()
}

このAPIはKafkaStreamクラスをイテレータで束ねる形で実現している。
各KafkaStreamは、1〜n個のサーバ上の1〜n個のパーティションからのメッセージストリームを表す。
したがって、各KafkaStreamは複数パーティションからのデータを束ねて受信することができるが、
1パーティションから送信されるデータは1Streamオブジェクトにのみ送られる。
各KafkaStreamはシングルスレッド処理のために使用される。
クライアントは任意にストリームを作成することが可能。

createMessageStreamsメソッドを呼び出すことにより、Consumerをトピックに対して登録する。
結果、Consumer/Broker間でのリバランスが走ることになる。
API側ではリバランスを最小限にするためにcreateMessageStreamsメソッド実行時に
複数のトピックに対するStreamを生成することを推奨している。

createMessageStreamsByFilterメソッドはフィルタに一致するトピックを発見するWatcherオブジェクトを登録する。
フィルタで複数トピックを許容した場合、createMessageStreamsByFilterが返す複数トピックが返すメッセージは
「反復するケースがある」ということに注意すること。

16.ネットワーク層

ネットワーク層の構成は単純なNIOサーバであるため詳細な説明は省略する。
sendfileの実装はMessageSet#writeToメソッドを用いて行われる。
FileChannel#transferToメソッドにおいてFile-backedメッセージ群を用いることで
プロセス内でバッファリングするより効率的なバッファリングを実現している。

スレッドモデルとしては1つのacceptorスレッドとスレッドごとに特定の数のコネクションを処理する
N個のプロセッサスレッドで構成される。
このモデルはVoldemortで用いているNIO SocketServerにおいて実装が簡単かつ高速であることが示されている。
プロトコル自体は将来的に他の言語からでも用いれるようシンプルに保っている。

17.メッセージ

メッセージは、固定サイズのヘッダと可変長の不確定バイト配列ペイロードで構成されている。
ヘッダには、フォーマットバージョンと破損や切り捨てを検出するCRC32チェックサムを含んでいる。
不確定の要素は現状シリアライズライブラリの進歩が激しく、1つに絞ることは得策ではないため、あえて残している。
ただ、当然ながらKafkaを使用して特定のアプリケーションを作成する際にはシリアライズ方式を選択する必要が出てくる。
MessageSetインタフェースはメッセージのイテレータセットを通したNIOチャネルへのバルク書込/バルク読込に特化した構成になっている。

18.メッセージフォーマット

NByteのメッセージは下記のフォーマットとなっている。

  • 1.1Byte MagicByte(フォーマット変更を可能とするための識別バイト)

MagicByte == 0

MagicByte == 1

  • 2.1Byte attributes識別子(圧縮状態、コーデック状態などを示す)
  • 3.4Byte CRC32チェックサム
  • 4.N-5Byte メッセージペイロード

19.ログ

"my_topic"というトピック名称を持つ2パーティションを持つトピックは"my_topic_0"と"my_topic_1"という
2つのディレクトリと、メッセージを保持するファイル群から構成される。
ログファイルのフォーマットは"ログエントリ"の配列となっている。
ログエントリは、メッセージ長(NByte)を4Byteの整数で保持している。
各メッセージは対象トピック/パーティション上の全メッセージ中の開始位置であるオフセットを64ビット整数で保持しており、
そのオフセットによって一意に識別される。

ディスク上に保存されている際のメッセージフォーマットは以下の通り。
各ログファイルはファイル中に保持する最初のメッセージのオフセット値を名称に保持する。
そのため、最初に作成されるログファイル名称は「00000000000.kafka」となる。
以後の各ファイルの名称は大体S毎の間隔を持つログファイルとなる。
(ここでのSは設定ファイルで設定された最大ログファイルサイズの値)

このバイナリフォーマットを標準インタフェースとして厳密に維持することで、
Producer、Broker、Consumer間で通信する際に再コピーや変換を最小限に抑えている。

ディスク上のフォーマットは下記。

message length : 4 bytes (value: 1+4+n) 
"magic" value  : 1 byte
crc            : 4 bytes
payload        : n bytes

通常、メッセージのIDとしてオフセットは使用されない。
私たちの元々のアイディアはProducerによって生成されたGUIDを用い、
各BrokerでGUID→オフセットのマッピングを保持するというものだった。

しかし、上記の方式を採用した場合Consumerは各サーバ(Broker?)のIDを維持しなければならない関係上、
GUIDの一意性を確保しても意味がなくなる。
さらに、GUID→オフセットのマッピングを維持するための複雑さは
ディスクに常時同期させる必要があり、かつランダムアクセスが必要となる重いインデックス構造を必要とする。

そこで私たちはこのルックアップ構造を単純化するために
パーティション/ノードIDとペアで用いることでメッセージを一意に識別可能なアトミックなカウンタを用いることにした。
この方針変更によってConsumerから複数のシークが並列で走る状況であってもルックアップ構造は単純化された。

単調増加し、パーティション毎にユニークに割り振られるオフセットを用いることにより、
オフセットを用いてメッセージに直接飛ぶ構造が単純化され、固定化された処理となった。

尚、オフセットはConsumerAPIから隠されているため、実装の詳細は後から変更でき、
私たちはより効率的なアプローチにするため対応を続けている。

書込

ログにデータを追加する際、常に最後のファイルに直接追加することが可能。
ログファイルサイズが1GBのような設定可能な最大サイズに達した場合、次のファイルが生成される。
ログは2つのパラメータを取る。
M:OSに対して強制的にフラッシュするまでのメッセージ数
S:OSに対して強制的にフラッシュするまでの時間

上記の設定によって、システムのクラッシュが発生した際に失われる最大のメッセージ数/メッセージの配信期間を設定することが可能。

読込

読み込みはオフセットと最大読み込みチャンクサイズSを渡すことで行われている。
読み込みの結果、SByteのメッセージを含むメッセージのイテレータが返される。
Sの値はどのメッセージよりも大きくなることを想定しているが、イベントが通常でない大きなサイズとなっていた場合、
読込はメッセージの読み込みが成功するまで複数回実施され、実行された回数だけSByteのメッセージ領域が合計して確保される。
最大メッセージ数/最大メッセージサイズはサーバが一定サイズを超えたメッセージを受信した際に拒否する形で実現される。
これはConsumerが受信可能な最大サイズをオーバーしないために設定されている。
もし受信側が1メッセージの途中で受信が終了してしまった場合、ログエントリのデリメータが発生しないためすぐに検知できる。

オフセットを用いた実際の読み込みプロセスはまずはじめにどのファイルに該当のメッセージが含まれるかを特定する。
その上でファイル名に記述されたオフセットと読込対象オフセットの差分を取って読込場所を特定する。
この検索処理はファイルごとに管理されるメモリに対して単純なバイナリサーチのバリエーションアルゴリズムによる検索を実施することで実現している。

このログの方式によってKafkaは「今書き込まれた」ファイルも含めてConsumerの取得対象にすることができる。
この方式はSLAとして設定したパラメータ内でCosumer側がログの処理に失敗した場合にも有用。
尚、この方式は存在しないオフセットを指定した場合にはOutOfRangeException を発生させるためその例外を持ってハンドリングを行うこと。

Consumerに送信されるメッセージのフォーマットは下記。

MessageSetSend (fetch result)

total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes


MultiMessageSetSend (multiFetch result)

total length     : 4 bytes
error code       : 2 bytes
messageSetSend 1
...
messageSetSend n
削除

ログを削除する際、同時に1つのログセグメントを削除する。
ログマネージャには各ファイルが削除対象かどうかを判断するためのポリシーをプラグイン方式で追加することが可能。
現状では「最終更新時刻がN日以上前」か、「保持するログの量が新しい方からNGB保持し、それ以外を削除」という方式が有用だと判断している。
削除対象となったファイルに対して書込み/読み込みが競合するようなパターンを回避するため、セグメントリストの実装にコピーオンライト方式を取っている。
ログマネージャでは削除処理が走っている間はイミュータブルなセグメントリストのスナップショットを作成し、
バイナリサーチアルゴリズムはそちらを参照するようにしている。

保証

ログ出力時、ファイルに強制的にフラッシュをかける保持メッセージ数Mを設定項目として設定可能。
Kafkaのプロセス起動時、ログのリカバリプロセスが走り、
最新のログセグメント中に保持されているメッセージエントリが有効であることを確認している。

メッセージエントリは下記の条件を満たす場合正常と判断する。

  1. メッセージサイズとオフセット値の合計値がファイルサイズより小さい
  2. 保存しているCRC32チェックサム値が保持しているメッセージペイロードのチェックサム結果と一致している

メッセージの破棄はファイル中で一番最後の正常メッセージがあったオフセット以降を
削除することで行われる。

尚、2種類の問題に対処する必要があることに留意すること。

  1. システムがクラッシュしたため不正なメッセージが末尾に残っている
  2. 不正なメッセージ部が記述されている

このような問題に対処する必要がある理由として、一般的にOSはファイルのinodeと実体データの書き込み順同期は保証していないため。
結果、inodeを更新したが実体データの更新に失敗した場合、サイズは大きいが実際には中身がないといった不整合が発生する。
CRCチェックサムはこういった不整合パターンへの対処のために搭載し、ログ破壊の波及を防いでいる。

20.分散方式

以後、BrokerとConsumerの協調に用いるZookeeperのディレクトリ構成とアルゴリズムについて記述する。

Zookeeper上のディレクトリ構成判例

[xyz]という形で[]で囲った個所はトピック名やナンバーによって変動する値を示す。
そのまま"xyz"というZnodeが存在しているとは限らない。

例として、「/topics/[topic]」は/topicというディレクトリの配下にトピック名の名前を持つZnodeが存在することを示す。
同様に「[0...5]」と記述した場合は0、1、2、3、4・・・という形でZnodeが存在することを示す。

また、「->」は実際のZnodeの値を示すことに使用される。
例えば、「/hello -> world」と記述した場合、「/hello」というZnodeが存在し、内容に「world」を保持することを示す。

BrokerIDレジストリ
/brokers/ids/[0...N] --> host:port (ephemeral node)

BrokerIDレジストリにはConsumer側からBrokerを論理的に一意に識別できるIDを名称として持つZnodeを保持する。
(論理的に一意に識別できるID=論理IDは設定項目として指定する必要がある)
Brokerは起動時に「/brokers/ids/」配下に自分の論理IDを持つZnodeを登録する。
このように論理IDを用いる目的は、Brokerが物理的に別のノードに移動せざるを得ない状況になった場合に
Consumer側でそのことをハンドリングする必要がないようにしたいため。

論理ID登録時に既にそのIDが使用されていた場合はエラーとなる。
「ephemeral node」のため、これらのZnodeはBrokerプロセスが起動している間のみ維持され、終了すると消える。
これによってConsumer側はBrokerが落ちたことを検知可能。

Brokerトピックディレクトリ
/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

各Brokerは[topic]ディレクトリ配下にトピックに対して自分が保持しているパーティション数を登録する。

Consumer/Consumerグループ

Consumer側もZookeeper上に自分の状態について登録を行う。
登録を行う理由は、消費量の計算とトピック/パーティションごとに
どこまでメッセージを消費したかを示すオフセットを共有するため。

複数のConsumerはConsumerグループを構成し、グループであるトピックに対するデータを共同で使用することができる。
あるConsumerグループに所属するConsumerに対してIDである「group_id」を共有する。
例えば、"foobar"という名称のConsumerプロセスが3つのノードにまたがって実行される場合、
"foobar"というIDをこれらのConsumerプロセス群に対して与える。

このグループIDはConsumerプロセスの設定値として指定する。
この設定によってConsumerプロセスは自分の所属するグループを認識する。

Consumerグループに所属するConsumerプロセス達はあるトピックに対するパーティションを
出来るだけ均等になるように分け合う。
1パーティションは1Consumerプロセスによって消費される。

ConsumerIDレジストリ

「group_id」の他にConsumerにはConsumerの識別のために「consumer_id」(UUID形式のホスト名)が一時的に割り当てられる。
「consumer_id」は下記のディレクトリ配下に保存される。

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

同一グループ内のConsumerは同一の「group_id」ディレクトリ配下に「consumer_id」のZnodeを作成することで登録する。
Znodeは のマップを保持する。
このIDは単純にグループ内でどれだけのConsumerが生きているかどうかの判別に用いられる。
BrokerIDと同様に「ephemeral node」のため、Consumerが落ちたらこのZnodeも消える。

Consumerのオフセットトレース

Consumerプロセスはオフセットの最大値を各パーティションごとにどこまでメッセージを消費したかを判別するために記録する。

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)
パーティションオーナーのレジストリ

BrokerのパーティションはConsumerグループ毎にある1Consumerによって消費される。
Consumerはメッセージの消費を開始する前に該当のパーティションに対してオーナーであることを確定させる必要がある。
オーナーであることを確定させるためにConsumerは下記のディレクトリに「ephemeral node」を作成する。

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
Broker起動時処理

Brokerは基本独立しているため、Brokerが保持している情報についてのみ登録を行う。
Brokerが起動するとBrokerのレジストリディレクトリ配下にZnodeを作成し、ホストとポートの情報を登録する。
Brokerはあわせて自らが保持しているトピックとパーティション一覧をBrokerトピックレジストリ配下に登録する。
新しいトピックが作成された場合はその都度レジストリ配下にトピックZnodeが追加される。

Consumer起動時処理

Consumer起動時、下記の処理が行われる

  1. 「consumer_id」を所属グループのディレクトリ配下に登録する
  2. ConsumerIDレジストリ配下に対してZookeeperWatcherを設定し、新しいConsumerの追加や削除を検知できるようにする。(Consumerグループ配下のConsumerの状態が変更された場合、グループ内の全Consumer内でリバランスが起動される)
  3. BrokerIDレジストリ配下に対してZookeeperWatcherを設定し、新しいBrokerの追加や削除を検知できるようにする。(BrokerIDレジストリ配下のBrokerの状態が変更された場合、グループ内の全Consumer内でリバランスが起動される)
  4. トピックフィルタを使用している場合、Brokerトピックディレクトリ配下に対してZookeeperWatcherを設定し、新しいトピックの追加を検知できるようにする。(トピックが追加された場合、それがトピックフィルタに合致するかの確認が行われ、合致する場合はグループ内の全Consumer内でリバランスが起動される)
  5. 起動Consumerが所属するConsumerグループに対してリバランスを強制実行する
Consumerリバランスアルゴリズム

ConsumerリバランスはあるConsumerグループ内でどのConsumerがどのパーティションを消費するかを同意するために行われる。
Consumerリバランスはグループ内のConsumerの追加削除、Brokerの追加削除をトリガとして行われる。
1トピック、1Consumerグループを見た場合、Brokerパーティションはグループ内のConsumer毎に均等に割り振られる。
1パーティションは常に1つのConsumerによって消費されるという実装によってこれらのリバランスはシンプルになっている。
こうした理由として、1パーティションに対して複数のConsumerが紐づけられた場合、ロックアルゴリズムが必要となり、競合も発生するためである。
尚、パーティションの数よりConsumerの数の方が多い場合、いくつかのConsumerはデータを一切受信することができないためその点は注意。

Consumerリバランスの際、以下のアルゴリズムを用いてリバランスを行うことにより、1Consumerが接続しに行くBrokerの数を抑えている。
Consumerリバランスの際行われる処理は下記。

前提:

  1. PTはあるトピックに対するパーティションの全体集合をさす
  2. PTに含まれるパーティションはPi(iの値は1〜Tまで変動)と記述する
  3. CGはあるConsumerグループに含まれるConsumer全体集合をさす
  4. CGに所属するConsumerはCi(iの値は1〜Gまで変動)と記述する

リバランス処理:

  1. PTをソートする(結果、同一Broker上のパーティションは固まる)
  2. CGをソートする
  3. パーティション数(PT)/Consumer数(CG)を実行して値Nを算出
  4. パーティション i * N 〜 (i + 1) * N - 1 をConsumer iに対して割り振る
  5. Consumerが元々保持していたパーティションオーナレジストリを削除する
  6. Consumerがリバランス後に読み込むパーティションオーナレジストリを登録する

リバランスがあるConsumerで起動された場合、同一Consumerグループ内のConsumerも同時にリバランスを実行する必要がある
=========
改めて振り返るとそれなりの量を読んできたことがわかります。
Consumer側がオフセットを管理するというがKafkaの設計方針で一番特徴的な個所ですね。
それによって出来ることも多いわけでして。
このあたりは自分が何か作るときにも参考にさせていただきます。