流沙河鎮

情報技術系のこと書きます。

Current 2024「DebeziumとKafka ConnectによってSlackは如何にしてCDCへ移行したか」まとめ

Current 2024 のセッション「Change Data Capture & Kafka How Slack Transitioned to CDC with Debezium & Kafka Connect」を日本語でまとめます。
可能な限り正確に内容を拾えるようにリスニングに努めたつもりですが、もし誤りがあればご指摘ください。

Current とは?

Confluent が主催する、Kafka や Flink をはじめとする Streaming 処理に関わる知見を交換するカンファレンス.

イベントページ

current.confluent.io

各セッションは公式サイトで視聴可能

current.confluent.io

Leveraging Iceberg Puffin Files to Accelerate Queries

セッションリンクは以下.

current.confluent.io

スピーカー

  • Joseph Thaidigsman
  • Tom Thornton
    • Slack 社のソフトウェアエンジニア

セッションまとめ

本セッションはオペレーショナルDBから分析基盤へのレプリケーションを扱う.

レプリケーションの方法として一般にバッチとCDCがある.
バッチをCDCへ移行した方法、CDCのスケーラブルな運用、それらに便利なOSSの紹介がテーマ.

Slack がバッチを CDC へ移行することにした動機

  • コスト
    • 少量の更新でもデータ全量の変更が発生していた
    • 例えば Slack のメッセージを保持しているテーブルがあり、あるチャンネルへメッセージが送られる度にメッセージの履歴全量を更新することになる
    • 結果として、計算コストが莫大になる
    • (筆者注: データレイクにありがちな課題. おそらく何らかのパーティショニングはしていると思うので、本当に過去のメッセージ全体を書き換えているわけではないだろうが、非常に重いことに変わりはない.)
  • 技術負債
  • レイテンシ
    • データ取り込みのバッチ処理に非常に 24 - 48 時間以上かかる上、それぞれの実行が高コストであるため、分析基盤でデータが使えるようになるまでのレイテンシが大きい
    • 例えば新機能の AB テストを実施する際に、新機能を出してから結果がわかるまでに 2 日以上かかるといった問題が起きていた

そこで、CDC ベースのアーキテクチャへ移行した.
データソースとなるオペレーショナル DB は Vitess. Vitess は MySQL の水平スケールクラスタリングに用いる仕組み.
Vitess への変更は Kafka Connect として実行される debezium によって読み取られ、Kafka へ送信される. debezium は OSS の CDC プラットフォーム.
Kafka へ送られたトピックは Avro スキーマで Iceberg Sink Connector がコンシュームする.
Iceberg Sink Connector は Kafka を通じて流れてきた DB の変更を Iceberg として Amazon S3 へ書き込む. Iceberg Catalog は Hive Metastore
S3 に書き込まれたデータを Amazon EMR 上の Spark が処理して、改めて Iceberg として S3 へ書き戻す

Viteess から Debezium, Kafka までのアーキテクチャ

前半部ではオペレーショナルDBであるViteess から Debezium, Kafka までのアーキテクチャを説明する.

Viteess から Debezium, Kafka までのアーキテクチャ解説(スライドの画像と見比べながら読んでね)

  • Vitess
    • Vitess は Keyspace と呼ばれる論理データベースで構成される
    • Keyspace は複数の Shard にパーティショニングして水平スケールできる
    • Shard は Tablet プロセスと MySQL プロセスで構成される
    • Tablet には「Primary」や「Replica」など任意の Tablet Type がアサインされる
    • VTGate は軽量なサーバプロキシであり、クエリを正しい Shard へルートしたり、クエリ結果を Shard 横断で統合する
    • VTGate の VStream API を通じてクライアントは Shard を構成する MySQL の Binlog のストリームをサブスクライブできる
  • Debezium
    • Kafka Connect クラスタ内の Debezium が VStream をコンシュームする
    • Debezium コネクタはデータをどう読み取るかを定義する Task を定義しており、Worker がそれらを実行する
  • Kafka
    • Debezium によって CRUD 情報が Data Changes Topic として Kafka へ書き込まれる
    • Data Changes Topic に加え、Debezium は Offset Topic も書き込む. これは Binlog が何処まで書かれたかを記録している
    • Debezium がクラッシュした場合は、再起動後に Offset Topic を読んで、Binlog の進行状況を把握した上で VStream を再開できる

これらの仕組みを大規模なスケール(以下)で運用している.

Debezium 運用の課題

先述の仕組みを実現するにあたっていくつかの課題があり、OSS をカスタマイズすることで解決した. それらの工夫は OSS にコントリビュートして、みんなが使えるようにしてある

スナップショットのフォールトトレランス

最初の課題は Table Snapshot のフォールトトレランスについて. Binglog に全ての変更が保持されていない場合でも、テーブルの完全なデータを取得する方法が必要だった. 例えばユーザ ID 5, チャネル ID 6 のレコードを Kafka トピックに取り込みたいが、既に Binlog に存在しない場合に、これを Kafka トピックに取り込みたい場合にスナップショットが必要になる.
VStream API はそれを実現する VSteram Copy 機能を提供しており、テーブルからすべての行をコピーして、VStream クライアントへ送信することができる.
しかし、これを Debezium で利用する場合に問題が起きる. Debezium は Offset を保存する際に Keyspace Shard と Global Transaction ID(DTID) を保存するが、VStream コピーの進行状況を示す指標がないため、Debezium がクラッシュして再起動した場合にスナップショットを再開できない。つまり、スナップショットがフォールトトレラントではない.

そこで、Offset トピックにスナップショット中に最後に正常に送信された行の主キーを保存するようにした。これによって、Debezium がクラッシュした場合でも、このフィールドを読み取って VStream へ渡すことができ、VStream コピー操作をその地点から再開できるようになる。これによって、テーブルのスナップショットがフォールトトレラントになった.

Debezium のスケール

Viteess のトランザクション増加に伴って Debezium コネクタをスケールさせる必要があるが、Debezium の Viteess コネクタは単一タスクモードしかサポートしていなかった.
Viteess に Shard が 3つある場合、Offset トピックにはそれぞれの Shard の GTID が保持される.

そこで、Debezium の Viteess コネクタがマルチタスクモードをサポートするように改修した.
Shard をタスクにラウンドロビンで分配できるようにした上で、各タスクが 1 つの Shard を処理し、Offset もそれぞれ管理するようにした. これによって透過的なスケールが可能になった.

Kafka のパーティション数変更対応

Kafka のパーティション数を増やして、Kafka のパフォーマンスをスケールさせたい場面がある.
ここで、データ変更トピックの順序性が問題になる.
例えば、リサさんがプロジェクトギズモチャンネルに参加したとする. ユーザーIDは5、チャンネルIDは6で、チャンネルに参加したため、これは作成操作として記録される.

ここで、パーティション数を 2 つに増やしたとする. 更にその後、リサさんがチャンネルを退出したとする.
つまり、同じプライマリキーに対して削除操作が発生する. 結果的に、同じプライマリキー(ユーザー ID とチャンネル ID)が異なるパーティションに振り分けられる問題が起きる.
パーティションはメッセージのキーのハッシュ値をパーティション数で割った余りで決まるため、パーティション数を変更すると、同じキーに対して新しいパーティションが割り当てられる可能性がある.
結果として、コンシューマは 2 つのプライマリキーに対する順序を保証できず、最新のレコードが特定できなくなる. MySQL の Binlog のタイムスタンプは秒単位の精度しかなく、1 秒あたり 60 万件の書き込みがある Slack のような環境では役に立たない.

単純な対策としては以下が考えられるが、どちらも理想的ではない.

  • トピックの内容を全て削除し、新しいスナップショットを作成してトピックにストリーミング
  • すべてのコンシューマーを、パーティション数が増えた新しいトピックに切り替える

これらの解決策はいずれも、大量の手作業、ダウンタイム、チーム間の調整が必要になる.

そこで、メッセージがコンシュームされる順序に依存せずに順序付けを行える仕組み、つまり任意の 2 つのメッセージに対して、どちらが最新かを判断できるようにする仕組みを作った.
これを実現するため、3 つのフィールドを追加した.

  • tx_order
    • 同じトランザクション内の 2 つのレコードの順序特定に使用する.
    • VStream が開始イベント、複数の変更、コミットイベントを送信するため、カウントを保持して各変更に割り当てることができる.
    • tx_order が大きい方が、そのトランザクション内でより後の変更を意味する.
  • tx_rand
    • 異なるトランザクションの 2 つのレコードの順序特定に使用する. 値が大きいほど新しいレコードを示す単調増加の整数で、GTID から導くことができる.
    • GTID はホストと Binlog 位置の範囲で構成されており、Binlog 位置の範囲の上限は単調増加の整数となるので、単調増加の整数の合計を取ることで、同じ特性を保持できる.
    • スライドの例では、最初のレコードは単一のホストのみを持つため、合計は3になる. 2 番目のレコードでは、3 と 5 の合計が 8 になり、より大きな値を持つ方が、より新しいレコードであることがわかる
  • epoch
    • ランクの単調増加の特性が失われた場合に使用する. これは、後からできたホストセットが以前のホストセットのスーパーセットでない場合に発生する. 例えば、GTID とバイナリログをリセットする管理操作が行われた場合など

これらの新しいフィールドを使用して、パーティションのスケールアウトを再度試みる. 2 つのパーティションにスケールアウトし、リサさんがが再びチャンネルを退出すると、パーティション 2 に削除操作が記録される.
順序付けの確立は以下の手順で行う:

  • epoch の比較: epoch が同じであれば、tx_rand の単調増加の特性が保たれている
  • tx_rand の比較: tx_rand が高い方がより新しいレコード
  • tx_rand が同じ場合は、tx_order を比較して最新のものを決定

この方法により、Kafka のスケールアウト時に手作業、チーム間の調整、ダウンタイムが不要になる

Vitess の Reshard への対応

Vitess の Shard の処理量が増加し、Shard の数をスケールさせたい場合がある.
スライドの例では、Shard s1 があり、トピック内のレコードとしてリサさんがチャンネルに参加したことを示す create 操作がある. ここで Shard は s1 であることが示されている.

そこで、この Shard を s1.1 と s1.2 へ Reshard したとする. その後、リサさんがチャンネルを離れ、delete レコードが生成された時に問題が起きる.
新しい Shard s1.2 では、epoc にデフォルト値の 0 を使用しているため、後のレコードの方が epic 値が低くなってしまい、先に紹介した順序を確保するためのアルゴリズムが壊れる.

これを解決するためには、Shard のリネージを確立する仕組みを作った。
s1.2 は s1 がカバーしていたキーの範囲のサブセットであると言えるため、s1 は s1.2 の親と見做すことが出来る。そこで、新しい Shard の epoch を、すべての親 Shard の最大値に 1 を加えた値に設定する.

先程の例の場合、すべての親 Shard の最大値は 1 なので、それに 1 を加えて 2 とするようにした。これで、後のレコードの方が epic 値が高くなるため、順序を確立するアルゴリズムが再び機能するようになった. このようにして、透過的なスケーリングを実現している.

Kafkaからデータレイクへの取り込み、提供までのアーキテクチャ

ここからは Kafka をコンシュームしてデータレイクに取り込み、利用者へ提供するまでのアーキテクチャを説明する.

CDC プラットフォームのコンシューマ側について、1 兆レコードを超えるテーブルをサポートするための工夫と、従来のバッチパイプラインからの移行について説明する.
Slack が最終的に採用したアーキテクチャは Ryan Blue の記事 「The CDC merge pattern」 にインスピレーションを受ている.
(筆者注:Ryan Blue は Apache Iceberg のオリジナルクリエイターの一人)

前提として、CDC を実現する上で、データレイク上に 3 つの関連するテーブルが有る.

  • CDC テーブル
    • 追記専用の Iceberg テーブルで、生の変更ログイベントを含んでいる
  • ミラーテーブル
    • ソーステーブルを 1 対 1 で反映することを目的とする Iceberg テーブル
  • DS パーティションテーブル

全体のアーキテクチャ. レコードが Kafka を通過すると、k8s 上の Kafka Connect クラスタで実行されている Tabular Iceberg コネクタによって処理される.

(筆者注: Tabular Iceberg コネクタは現在では Iceberg 本体に取り込まれている)

Kafka から読み取った Avro レコードは Apicurio のスキーマレジストリに保存されているスキーマによってデシリアライズされ、そこから S3 の追記専用 CDC テーブルに永続される.

そこから、Airflow が EMR 上で実行される Spark ジョブを起動し、CDC テーブルから最新の更新を増分的にコンシュームし、それらをミラーテーブルにマージする.

さらにダウンストリームでは、CDC テーブルとミラーテーブルの両方から読み取り、DS パーティションテーブルに日次 DS パーティションを生成する日次ジョブがある.

最後に、他のチームは自分たちのジョブを実行し、これらの DS パーティションをコンシュームしてファクトテーブルやメトリクステーブルを生成する.

CDC テーブルの詳細. Kafka トピックごとに増やすことができる可変数のタスクがあり、パイプラインをスケールアップできる. また、Icebergへのコミットを処理するシングルトンコーディネーターがある. Slack が使用している主要な機能は、exactly-once セマンティクス、スキーマ進化、Hive Catalog 対応.

具体例として、リサさんがチャンネルに参加する例を考える.この場合、チャンネルへの参加を示すレコードが Debezium を通じて Avro で Kafka に生成される. Iceberg コネクタはスキーマレジストリ内のスキーマを使用してレコードがデシリアライズされ、最終的に CDC チャンネルメンバーテーブルへ追加される.
この CDC テーブルは、ソーストランザクションのタイムスタンプを使用して時間単位でパーティション化された追記専用の Iceberg テーブル.

CDC レコードがミラーテーブルにマージされる方法について.
ミラーテーブルは、ソースの Vitess テーブルの 1 対 1 のミラーである. このテーブルを生成し、上流の Vitess テーブルと同期させるために、固定の頻度で EMR 上で Spark ジョブを実行し、CDC レコードからミラーテーブルを更新する.
このジョブは Spark の Incremental Read を使用して、まだ処理されていない新しく追加されたイベントのみを読み取る. 今回の例では、Incremental Read は、リサさんがチャンネルに参加することで新しく追加された行だけを選択する.
これらのレコードを読み取った後、Spark の merge into で ミラーテーブルを更新する. 今回の場合、Lisaの新しい行が挿入される.

先ほど紹介した Vitess に関するレコード(tx_rank, tx_order, epoch)を使用して、同じプライマリキーに対して複数のレコードがある場合にどのレコードを採用するかを決定する.
重要な点として、Iceberg で Incremental Read を使用するには、読み取りを開始する Snapshot ID を提供する必要がある.
マージジョブは、CDC テーブルの特定の Snapshot から始まり、別の Snapshot に移動し、その開始と終了の境界を、生成する新しい Snapshot のミラーテーブルのメタデータに保存する. 次回実行時には、そのメタデータを読み取って開始点を知ることができる.

ここまでに説明した仕組みは、それほど規模が大きくないテーブルであればうまく機能する.
しかし、100 億行を超える大規模なテーブルを扱う上では「write amplification」問題が発生する. write amplification とは、少量の入力データが大量の処理を引き起こす現象を意味する用語である.
基本的な対策としては以下の 2 つが挙げられる.

  • ジョブの実行頻度を下げる
    • write amplification の性質上、10 分間の記録をマージするのと 1 日分の記録をマージするのとでは、処理時間にそれほど大きな差がない. したがって、厳密な遅延要件がない場合は、マージの頻度を大幅に下げることができる.
  • Merge-on-Read

これらは大きな助けになるが、1 兆行以上の本当に巨大なテーブルを扱う上では、それでも十分ではない. そこで、「バケットマージ」という手法を使用している.

バケットマージとは、巨大なテーブルに対してシャッフルを減らすために使用するマージ戦略で、Storage Partitioned Join を使用する.
Storage Partitioned Join は、結合する両テーブルのパーティションを直接結合するアプローチ. これを機能させるには、結合の両側のテーブルが同じパーティショニング戦略を持っている必要がある.
これにより、各結合キーが左右のテーブルで同じパーティションにマッピングされる. Slack の CDC パイプラインでは、CDC テーブルからミラーテーブルへプライマリキーで結合するため、プライマリキーのハッシュに基づいてミラーテーブルをバケットパーティショニングしている.

(筆者注: Storage Partitioned Join の仕組みと効果については、 Petabyte-Scale Row-Level Operations in Data Lakehouses 論文で詳しく解説されている)

以下の図版では、バケットマージを使用したマージジョブの新しいステップを追加している.
以前は CDC テーブルから増分読み取りを行い、直接ミラーテーブルにレコードをマージしていた. これを変更し、バケットマージのため、増分読み取りした CDC レコードを一時的な Iceberg テーブルに保存し、このテーブルをミラーテーブルと同じ方法でバケットパーティショニングする.
図版の例では、新しい「percolator_bucket_key」カラムがを追加している. これはユーザー ID とチャンネル ID のハッシュを選択したバケット数で割った余りとして計算されている.
このテーブルを書き込んだ後、5 つの Spark 設定を使用して Storage Partitioned Join を有効にする。その上で通常のマージ用 SQL を実行して CDC レコードをマージする.

具体的な効果を示すために、Slack における最大のテーブルであるメッセージテーブルへの影響を紹介する.
以下はバケットマージを実装する前のマージジョブのスクリーンショットで、赤く囲まれたエクスチェンジステップのブロックがシャッフルを意味しており、ジョブのボトルネックとなっていた.
9520 億のシャッフルレコードが書き込まれており、ジョブは機能せず失敗する.

しかし、バケットマージ導入後はエクスチェンジステップそのものがなくなり、ジョブのパフォーマンスを破壊する巨大なシャッフルはもうなく、約 1 時間で 1 日分のメッセージをマージできるようになった.

移行の課題

Slack を含む、多くの組織では新しい技術を大規模に採用する際の苦労がある. 下流のチームに新しいデータを採用してもらうのは非常に難しい.
今回の移行では、下流のジョブが旧システムが提供していた保証、つまり日次レプリケーションのサイクルを前提に構築されていることが課題になった. 旧システムでは、UTC の深夜までのすべてのレコードが各日次スナップショットに存在することを保証していた。しかし、CDC の世界では、マージジョブがジョブ実行時に利用可能なすべての CDC レコードをマージしていくだけなので、「UTC の深夜に実行されたこのジョブが、ソースのテストデータベースから UTC の深夜までのすべてのレコードを確実に挿入した」といった保証はしない.
そのため、下流のチームが大規模な移行をしなくても、新しいパイプラインで書き込まれたデータに簡単にアクセスできるようにする方法が必要だった。ここで DS パーティションテーブルが登場する.

DS パーティションテーブルは、既存のテーブルで、レガシーなバッチベースのデータ複製システムの下流にある Spark ジョブによって利用されている.
各 DS パーティションには、UTC 0:00 時点の完全なスナップショットが含まれており、冪等性の確保や時系列分析といった用途で使用される.

冪等性の側面では、例えば、メトリクスの定義が変更され、メトリクスチームが過去 1 年間のメトリクスを再計算したい場合、その時間範囲にわたるデータのビューを得るのに使用される.
時系列分析では、例えば、日々のアクティブユーザー数の傾向を時間とともに観察できる.

既存のユーザに影響を与えないため、基本的にこのテーブルのセマンティクスは変更しないことを保証し、裏側では、データソースをレガシーシステムから新しい CDC パイプラインに切り替えた.
これにより、下流のユーザーは変更を加えたり、気づく必要もなく、場合によっては1日以上早く依存関係を解決できるようになる.

この目標を達成するため、「Parcolator DS パーティションテーブルジョブ」を作成した.
これは、古いシステムと同じ保証を持つ同一の日次パーティションを生成するもので、冪等性と一貫性を実現するためにIceberg のタイムトラベル機能を利用している.

ある日の午後に、リサさんがチャンネル ID 7 のチャンネルに参加したとする. 彼女は UTC 真夜中直前にこのチャンネルに参加し、日付が変わった数分後に気が変わってチャンネルを退出したとする.
古いシステムとの一貫性を保つため、日次 DS パーティションには、彼女がチャンネル 7 にいたことに対応するレコードが含まれていなければならない. しかし、この例では、レコードが非常に近接しているため、CDC テーブルの同じスナップショットに含まれている可能性があり、したがってミラーテーブルのスナップショットにはそのレコードが含まれていない可能性がある.

そこで、UTC 0:00 前の最新のスナップショットにミラーテーブルをタイムトラベルさせる. 次に、このタイムトラベルしたミラーテーブルを S3 の一時テーブルとして永続化する.
その後、CDC テーブルの増分読み取りを行う. この読み取りは、先ほどタイムトラベルしたスナップショットの、ミラーテーブルスナップショットメタデータに格納されているスナップショット ID から開始する.
ただし、通常のマージジョブとは異なり、この増分読み取りはソーストランザクションのタイムスタンプによってもフィルタリングされ、UTC 0:00 以前のレコードのみが選択される.
次に、CDC レコードのこのサブセットを、タイムトラベルした一時バージョンのミラーテーブルにマージして、UTC 0:00時点の一貫性のあるスナップショットを生成する.
最後に、この一時テーブルから行を DS パーティションテーブルの新しいパーティションに挿入する.
これによって、下流のユーザにとっては、データが 1 日程度早く到着することに気づく以外は、何も変化がないことになる.

下流のユーザに提供するデータの完全性、つまりある日のすべてのデータが日次 DS パーティションに含まれていることを保証するために、私たちは「Binlog Watermarking」と呼ばれる手法を使用している.

Binlog Watermarking の背景として、ジョブ実行後に遅延して到着するレコードがないことを確認するため、その日のすべてのレコードが処理されたことを示す決定論的なシグナルが必要だった.

MySQL Binlog は Shard 内のすべてのテーブルにわたるすべてのトランザクションにグローバルな順序を課し、Debezium がそれらの Binglog イベントを順次処理する.
従って、Kafka 内の 1 つのレコードの存在を使用して、その Shard 内の他のすべてのテーブルのレコードがすでに処理されたことを推論できることを意味する.

それを実現するには、特定の時間(今回の場合は 0:00)に書き込みが行われることを事前に知っておく必要がある。そうすれば、そのレコードが現れるのを待って、他のすべてが既に処理されたと推論できる.
これを実装するため、1 秒ごとのハートビートを導入した. 各 Vitess Keyspace の各 Shard の特別なハートビートテーブルに行が書き込まれ、これらの書き込みは Binlog に含まれる. スライドの例では、リサさんがチャンネル 7 に参加したレコードが、これら 2 つのハートビートイベントの間に挟まれていることがわかる.
Debezium はこれらのイベントを順次処理し、Kafka に生成する. そこからシステムを流れ、データレイクの対応するテーブルに挿入される. 一方、Airflow はセンサータスクをオーケストレーションしており、UTC 0:00 のハートビートが到着するのを待つため、ハートビートテーブルを継続的にポーリングする.
それが到着すると、Debezium がそのシャード内の他のすべてのテーブルのレコードを既に処理したと推論できる. そして、テーブルコネクターのラグを最後にチェックして、コンシューマーも UTC 0:00 のレコードに追いついていることを確認し、その後、データが利用可能であるという保証付きでジョブを開始できる.

成果と学び

従来のバックアップやストアベースのバッチレプリケーションシステムと比較して、大幅なコスト削減を実現した.
以前はメッセージテーブルの複製だけで 1 日数千ドルかかっていたのが、約 10 %まで削減された.

また、新しいパイプラインは、要件に応じたより多くの調整可能なポイントがある点もポイントと言える.
低レイテンシーを実現するためにより多くのコストをかけたり、低レイテンシーが不要な場合により多くの節約をしたりといった調整ができる.
例えば、製品チームが AB テストを行う際、その結果をすぐに確認したいと考えるなら、マージジョブをより頻繁に実行することができる.
一方、メッセージは主にオフラインの検索インデックス作成に使用されるため、1 日 1 回の実行で十分で、そうすることで多くの費用を節約できる.
組織がバッチデータレプリケーションに多くの費用をかけている場合、これは収支に大きな影響を与えるはずだ.

パフォーマンスについては、バックアップを復元してテーブルの全履歴を毎日再処理する必要がないため、データレイクテーブルの更新にかかる平均時間が大幅に短縮された.
通常のサイズのテーブルの場合、平均で 12 〜 48 時間かかっていたものが約 5 分になった.
スライドは最も時間がかかるシナリオの例だが、旧システムではメッセージの実際の取り込みに 26.7 時間かかっていたのが、新システムでは 1 時間 7 分強になった. つまり、約 26 倍の高速化、あるいは以前の複製時間の約 4 %になった.

大きく 2 つの以下の学びがあった.

まず、複雑さについて.
新システムは、以前のバッチデータレプリケーションのよりも明らかに複雑である. 大きな理由は、バッチアプローチで得られる自動自己修正機能が失われることに依る. バッチの世界では、各実行で完全なテーブルを再処理するので、問題があれば、ジョブを再実行すればよい.
しかし、CDC ではそれほど単純ではない. 行を見逃したり、行が不正確だったりした場合、簡単な回復経路はない. 上流との乖離なしに何ヶ月もパイプラインを実行できる現在の状態に達するまでに長い時間がかかった. 組織が CDC アプローチの採用を検討している場合は、堅牢な監査とデータ品質チェックのフレームワークへの投資を強く勧める.

2 つ目の学びは、Iceberg テーブルのメンテナンスタスクを管理するオーケストレーション上の負担について.
Iceberg の運用にはスナップショットのライフサイクル、コンパクション、孤立ファイルの削除など、さまざまなメンテナンスタスクを実行する必要がある. それぞれは難しいことではないが、テーブル所有者にとっては追加の負担となる. 文字通りより多くのものをオーケストレーションしなければならないという点と、認知的なレベルでも、他のチームが考慮しなければならないことが増えるため、Icebergの採用を躊躇させる可能性がある.

自分自身、これを作り始めたとき、channels_members テーブルに対して最初は Copy on write をテストし、1 時間に 1 回マージしていた。すると、数テラバイトのテーブルが短時間で約 650 テラバイトの S3 ストレージに膨れ上がってしまった。そのとき、「しまった、snapshot_expire を実行する必要があった」と気づき、正常なサイズに戻すことができた. これは他のチームが Iceberg に取り組む際の実際のリスクになるとおもう. Iceberg テーブルを維持するために何をしなければならないかを理解してもらえなければ、データ量が簡単に膨れ上がり、他の問題に直面する可能性がある.
そのため、社内では「Iceberg 管理サービス」のようなものを作成することについて話し合ってる. これは、すべてのチームにわたるデータレイク内のすべての Iceberg テーブルのメンテナンスタスクを一元的に管理する方法になるかもしれない. まだ実装されていないが、検討中である. 組織が Iceberg の採用を検討している場合は、データユーザーが技術を簡単に採用できるようにするために、どのように管理し、すべての Iceberg メンテナンスタスクを管理するかについて慎重に考える必要がある.