dbt microbatchがもたらす変化と実装のベストプラクティス

2024/12/23に公開

dbt Advent Calendar 2024 23日目の記事です。


dbtのincremental_strategyに新たにmicrobatchが導入され、大規模な時系列データの処理方法が革新的に変わろうとしています。
この記事では、microbatchと従来のincrementalを比較しながら、本番環境への導入に向けた重要なポイントを解説します。

前提: batch_size="hour" で動かすと壊れる @2024/12/23時点

当記事では筆者の実際の運用環境を想定し、batch_size="hour"での実装を前提に解説を進めます。
ただし、現在dbt-coreのバグにより、hourオプションは使用できない状態です。 hourでの運用を検討されている方は、以下のissueの解決をお待ちください。

https://github.com/dbt-labs/dbt-core/issues/11165

microbatchの基本設定

{{
    config(
        materialized="incremental",
        incremental_strategy="microbatch",
        begin="2022-01-01",
        batch_size="day",
        event_time="created_at",
        lookback=1,
    )
}}
select * 
from {{ ref("model_name") }}

設定しているパラメーターは、以下の通り。

パラメータ 必須 説明 設定例
materialized incrementalマテリアライゼーションを指定。 "incremental"
incremental_strategy 増分更新の戦略としてmicrobatchを使用。 "microbatch"
begin データ処理の開始日を指定。初回実行時やフルリフレッシュ時の開始点となる "2022-01-01"
batch_size バッチの時間単位を指定
- 選択可能な値: "hour", "day", "month", "year"
"day"
event_time 時系列の基準となるカラム。以下の用途で使用
- バッチ処理の時間区切り
- 上流モデルの自動フィルタリング
- 遅延データの処理範囲の決定
"created_at"
lookback - - 遅延データに対応するための遡及期間
- 指定した数のバッチ分だけ過去に遡って再処理
- 単位はbatch_sizeで指定した単位に従う
- デフォルト値は1
1

詳しいことは公式ドキュメントを御覧ください。

https://docs.getdbt.com/docs/build/incremental-microbatch

特徴と利点

時系列データ処理には、バッチの区切り方、遅延データの処理、バックフィル実装など、多くの共通課題が存在します。
これまでは各チームが独自の実装方法を採用してきましたが、microbatchはこれらの課題に対する標準的なアプローチを提供します。
※初歩的な挙動の説明については、他の記事でも解説されているものが多くあるので割愛します。

event_time を軸とした時系列処理

時系列データ処理では、データがいつ作成されたか、いつ更新されたかなど、複数の時間軸が存在します。
microbatchではevent_timeパラメータで「このモデルが処理すべき時間」を明示的に指定します。

例えば、クリックログを日次で集計する場合はevent_time="click_at"と指定することで、クリック時刻を基準とした日次バッチ処理が行われます。

batch_size に基づいた処理の分割

大規模なデータセットを効率的に処理するために、microbatchではデータを設定された時間単位(hour/day/month/year)で分割して処理します。
これにより、長期間のデータを適切な大きさのバッチに分割でき、処理の安定性と管理のしやすさが向上します。

例えば、1年分のクリックログを処理する際に batch_size="day" と指定すると、365個の日次バッチに分割して処理されます。各バッチは独立して処理されるます。
また、特定の日付の処理が失敗しても dbt retry で該当日のみ再実行が簡単にできるようになります。

lookback による遅延データの対応

データが発生した時点と到着する時点にずれが生じることが一般的です。
lookbackパラメータはこの遅延に対応するため、指定した期間分だけ過去に遡って再処理を行う機能を提供します。

例えば、batch_sizeが="day"の時に、lookback=2 と指定すると、今日のデータを処理する際に、今日と昨日のデータが再処理されます。こうすることで遅れたデータもまとめて再処理することができます。

標準のincremental vs microbatch

じゃあ、実際どう変わるの?ってところをリアルワールドユースケースで比較していきます。

標準的な時系列データの処理

「dbtとデータパーティショニングで、大量データを扱う」で紹介している時系列データの処理パターンをベースに説明します。

ざっくりどういうことやるか説明すると、S3のディレクトリ構造(例:/y=2022/m=11/d=01/h=08/)をDWH側のパーティションカラム(_partition_hourly="2022-11-01T08:00:00")として利用します。

時間を基準とした明示的なデータの区切りにより、大規模データの効率的な処理を実現します。詳しい内容は記事を御覧ください。


図: S3のパスとDWHのテーブルのパーティションの関係性

データの処理の流れとしては以下の図の通りです。
単一のパーティションを指定して、そこに対してのみ変換処理を行うといった具合です。


図: 特定のパーティションを処理する

標準のincrementalでの実装

以下は_partition_hourly = 2024-12-13T02:00:00のデータを処理する例です。

{{
    config(
        materialized="incremental",
        incremental_strategy="delete+insert",
        unique_key="_partition_hourly",
    )
}}
select *
from {{ ref("model_name") }}
where {{ filter_partition() }}

処理範囲の指定にはfilter_partitionマクロを作って対応しています。

{%- macro filter_partition() -%}
    {%- set transform_start_datehour = var("transform_start_datehour", default=none) -%}
    {%- set transform_end_datehour = var("transform_end_datehour", default=none) -%}
    {%- set start_partition = dbt.safe_cast(
        "'" ~ transform_start_datehour ~ "'",
        api.Column.translate_type("timestamp"),
    ) -%}
    {%- set end_partition = dbt.safe_cast(
        "'" ~ transform_end_datehour ~ "'",
        api.Column.translate_type("timestamp"),
    ) -%}
    {{ start_partition }} <= _partition_hourly
    and _partition_hourly <= {{ end_partition }}
{%- endmacro -%}

次のようなコマンドを叩いて動かすイメージです。

dbt run --select hoge --vars '{ "transform_start_datehour": 2024-12-17T00:00:00, "transform_end_datehour": 2024-12-19T00:00:00, }'

microbatchでの実装

microbatchを使用すると、より簡潔な実装が可能です。

遅延データへの対応もlookbackパラメータの調整だけで実現できます。

{{
    config(
        materialized="incremental",
        incremental_strategy="microbatch",
        begin="2022-01-01",
        batch_size="hour",
        event_time="_partition_hourly",
        lookback=1,
    )
}}
select * 
from {{ ref("model_name") }}

これだけでも結構便利なのですが、本当に便利なのは、これらをbackfillする時に真価を発揮します。

Backfill処理

単一パーティションの処理ではmicrobatchと標準のincrementalに大きな違いはありませんが、複数パーティションのbackfill処理では、microbatchが大きなアドバンテージを持ちます。

2024-12-13 00:00:00 から 2024-12-13 02:00:00 までのデータをbackfillする例で比較してみましょう。

標準のincrementalでのbackfill

標準的なincrementalモデルは以下のようになります。

{{
    config(
        materialized="incremental",
        incremental_strategy="delete+insert",
        unique_key="_partition_hourly",
    )
}}
select *
from {{ ref("model_name") }}
where {{ filter_partition() }}

このモデルを実行すると、以下のようなクエリが生成されます。

create or replace temporary table db_name.schema_name.model_name__dbt_tmp as
(
    select *
    from db_name.schema_name.model_name
    where
        try_cast('2024-12-13 00:00:00' as timestamp) <= _partition_hourly
        and _partition_hourly <= try_cast('2024-12-13 02:00:00' as timestamp)
)

処理のイメージはこちらです。


図: 従来のincrementalを使った複数パーティション処理

標準のincrementalでは、処理対象期間の全データを一度に中間テーブルに格納します。
データ量が大きい場合、この処理がボトルネックとなる可能性があります。

microbatchでのbackfill

同じモデルをmicrobatchで実装すると。

{{
    config(
        materialized="incremental",
        incremental_strategy="microbatch",
        begin="2022-01-01",
        batch_size="hour",
        event_time="_partition_hourly",
        lookback=1,
    )
}}
select * 
from {{ ref("model_name") }}

実行時には、各時間帯ごとに別々のクエリが生成されます。

create or replace temporary table db_name.schema_name.model_name__dbt_tmp_20241213_00_01
         as (
    select * from (select * from db_name.schema_name.model_name 
    where _partition_hourly >= '2024-12-13 00:00:00+00:00' 
    and _partition_hourly < '2024-12-13 01:00:00+00:00')
)
create or replace temporary table db_name.schema_name.model_name__dbt_tmp_20241213_01_02
         as (
    select * from (select * from db_name.schema_name.model_name 
    where _partition_hourly >= '2024-12-13 01:00:00+00:00' 
    and _partition_hourly < '2024-12-13 02:00:00+00:00')
)
create or replace temporary table db_name.schema_name.model_name__dbt_tmp_20241213_02_03
         as (
    select * from (select * from db_name.schema_name.model_name 
    where _partition_hourly >= '2024-12-13 02:00:00+00:00' 
    and _partition_hourly < '2024-12-13 03:00:00+00:00')
)

これらの処理を図解すると、以下のようになります。


図: microbatchを使った複数パーティション処理(色の違いは別トランザクションを表す)

microbatchの利点としては、以下の通りです。

  • 処理負荷の安定性
    • パーティション単位で処理を行うため、単一パーティションでも複数パーティションでも各バッチの処理負荷が一定に保たれます。
  • 運用の簡素化
    • バックフィル範囲の指定が標準のコマンドラインオプションで可能になり、独自の変数管理が不要になりました。以下のように簡潔なコマンドで実行できます
    dbt run --select model_name --event-time-start "2024-01-01T00:00:00" --event-time-end "2024-01-31T00:00:00"
    

重複排除処理

時系列データにおける重複排除処理は、一見単純に見えて実は複雑な課題です。
特にパーティション化されたデータを扱う場合、単一パーティション内の重複だけでなく、パーティションを跨いだ重複にも対処する必要があります。

時系列データにおける重複排除の課題

例えば、広告のクリックイベントデータでは、同一のIDが異なる時間帯に複数回記録されることがあります。
このような状況では、一般的に「最初に到着したイベントを正とする」という先勝ルールを採用することが多くなります。

しかし、以下のような単純な重複排除の実装では、重要な問題が発生します。

{{
    config(
        materialized="incremental",
        incremental_strategy="delete+insert",
        unique_key="_partition_hourly",
    )
}}
with source as (
    select *
    from {{ ref("model_name") }}
    where {{ filter_partition() }}
),
deduplicated as (
    {{
        dbt_utils.deduplicate(
            relation="source",
            partition_by="click_id",
            order_by="_partition_hourly asc, click_at asc",
        )
    }}
)
select * from deduplicated

この実装の問題点は、パーティションを跨いだ重複の考慮が欠けていることです。

例えば

  1. click_id: A が 2024-12-13T00:00:00 のパーティションで処理される
  2. 同じ click_id: A が 2024-12-13T01:00:00 のパーティションにも存在する
  3. パーティションごとに独立して処理するため、両方のパーティションで重複したレコードが残ってしまう


図: パーティションを跨いだ同一IDが考慮されていない処理

標準のincrementalでの解決策

この問題に対処するため、処理時に一定時間遡って重複チェックを行う実装が必要です。

{{
    config(
        materialized="incremental",
        incremental_strategy="delete+insert",
        unique_key="_partition_hourly",
    )
}}
with source as (
    select *
    from {{ ref("model_name") }}
    where {{ filter_partition_for_cumulative_type(1) }}
),
deduplicated as (
    {{
        dbt_utils.deduplicate(
            relation="source",
            partition_by="click_id",
            order_by="_partition_hourly asc, click_at asc",
        )
    }}
)
select * from deduplicated

ここで使用する filter_partition_for_cumulative_type マクロは、指定した時間数だけ遡ってデータを取得します。

{%- macro filter_partition_for_cumulative_type(hours_back) -%}
    {%- set transform_start_datehour = var("transform_start_datehour", default=none) -%}
    {%- set transform_end_datehour = var("transform_end_datehour", default=none) -%}
    {%- set start_datehour = dbt.dateadd(
        datepart="hour",
        interval=-hours_back,
        from_date_or_timestamp="'" ~ transform_start_datehour ~ "'",
    ) -%}
    {%- set end_datehour = dbt.safe_cast(
        "'" ~ transform_end_datehour ~ "'",
        api.Column.translate_type("timestamp"),
    ) -%}
    {{ start_datehour }} <= _partition_hourly
    and _partition_hourly <= {{ end_datehour }}
{%- endmacro -%}

処理の流れとしては、以下のようなイメージです。


図: 複数パーティションを横断して重複排除をする

microbatchでの実装上の注意点

microbatchでは、このような累積することで実現する重複排除に対する標準的な解決策は提供されていません。

そのため以下のような実装で対処する必要があります。

{{
    config(
        materialized="incremental",
        incremental_strategy="microbatch",
        begin="2022-01-01",
        batch_size="hour",
        event_time="_partition_hourly",
        lookback=1,
    )
}}
with source as (
    select *
    from {{ ref("model_name").render() }}
    where {{ filter_microbatch_for_cumulative_type(model.batch, 1) }}
),
deduplicated as (
    {{
        dbt_utils.deduplicate(
            relation="source",
            partition_by="click_id",
            order_by="_partition_hourly asc, click_at asc",
        )
    }}
)
select * from deduplicated
  • .render() の使用について
    • microbatchの自動フィルタリングを無効化し、カスタムフィルターを適用するために必要
    • これにより、モデル固有の重複排除ロジックを実装可能

ここで使用する filter_microbatch_for_cumulative_type マクロはmicrobatchで実行した時に取得できるプロパティを使って処理範囲を決定しています。
※もっといい方法があればアイデア求む

{%- macro filter_microbatch_for_cumulative_type(batch_context, hours_back) -%}
    {%- set start_datehour = dbt.dateadd(
        datepart="hour",
        interval=-hours_back,
        from_date_or_timestamp="'" ~ batch_context.event_time_start ~ "'",
    ) -%}
    {%- set end_datehour = dbt.safe_cast(
        "'" ~ batch_context.event_time_start ~ "'",
        api.Column.translate_type("timestamp"),
    ) -%}
    {{ start_datehour }} <= _partition_hourly
    and _partition_hourly <= {{ end_datehour }}
{%- endmacro -%}

lookbackで重複排除は実現できないのか?

microbatchlookbackを使用して、パーティションを跨いだ重複排除を実装できないかのか?

結論から言うとlookbackでは出来ないです。

lookbackパラメータの内部実装を見ると、以下のようになっています。

https://github.com/dbt-labs/dbt-core/blob/459d156e85089961359815a2e9938265a4c67c81/core/dbt/materializations/incremental/microbatch.py#L46-L79

lookback が使われる行はこちら。

https://github.com/dbt-labs/dbt-core/blob/459d156e85089961359815a2e9938265a4c67c81/core/dbt/materializations/incremental/microbatch.py#L70

その手前の処理で、--event-time-startが入ってる場合はlookbackを考慮せずにreturnしている状態になっていることがわかります。

https://github.com/dbt-labs/dbt-core/blob/459d156e85089961359815a2e9938265a4c67c81/core/dbt/materializations/incremental/microbatch.py#L57-L59

つまり、lookbackを設定していたとしても、backfill処理時は考慮されないということです。

lookbackはあくまで遅延データに対するパラメーターなので、累積メトリクスへの対応は想定されていない。
という前提に立つと、backfill時に指定されたパーティションを超えて参照範囲広げるには独自のマクロが必要になるとなります。

その他細かい話

実行コンテキストを表すプロパティ

microbatchには、バッチ処理の情報にアクセスするためのプロパティが用意されています。
これらのプロパティを使うことで、microbatchではカバーしきれない処理に対応することができます。

{% if model.batch %}
  {{ log(model.batch.id) }}  # バッチIDをログ出力
  {{ log(model.batch.event_time_start) }}  # バッチの開始時刻をログ出力
  {{ log(model.batch.event_time_end) }}  # バッチの終了時刻をログ出力
{% endif %}

詳細は公式ドキュメントを参照してください。

注意点: これらのプロパティを過度に使用すると、dbtのバージョンアップ時に互換性の問題が発生する可能性があります。必要最小限の使用にとどめることを推奨します。

event_timeの設定

microbatchのフィルタリングにはevent_timeの設定が必要不可欠です。
子モデルでmicrobatchを使用する場合、親モデルがviewであってもevent_timeの設定が必要です。
設定がない場合は、自動的にフルスキャンとなりますのでご注意ください。

SQLファイルでの設定例

{{
    config(
        materialized="view",
        event_time="_partition_hourly",
    )
}}

yamlファイルでの設定例

models:
  - name: hoge
    config:
      event_time: _partition_hourly

既存のincrementalモデルからの移行

既存のincrementalモデルからmicrobatchへの移行は、設定の変更だけで完了します。
テーブルの再作成は発生しませんので、安全に移行を進めることができます。
ただし、本記事で説明したように、そのまま移行できないパターンもありますので、事前の確認が必要です。

初期実行の動作

beginパラメータで指定した日時から現在までのデータを一括処理します。
例えば、begin="2022-04-01"と設定した場合、2022年4月1日から現在までのデータが処理されます。

2回目以降の動作

lookbackパラメータを指定している場合、現在時刻のパーティションと指定した数だけ遡ったパーティションが処理されます。

例えば、最新パーティションが2024-04-01T12:00:00lookback=2の場合

  • 2024-04-01T12:00:00
  • 2024-04-01T11:00:00

上記の2つのパーティションが処理対象となります。

実際に叩くコマンドのイメージ

microbatchを前提としたモデルを叩く場合は、以下のような2パターンになりそうです。

# 通常実行(lookback分遡って処理)
dbt run --select hoge

# バックフィル実行(期間指定で処理)
dbt run --select hoge --event-time-start "2024-01-01T00:00:00" --event-time-end "2024-01-31T00:00:00"

並列処理の適用

concurrent_batches=trueの使用は、処理内容によって適切かどうかが変わります。

  • 適している場合
    • パーティション間の処理順序が結果に影響しないケース
    • 各パーティションが独立して処理可能なケース
  • 適していない場合
    • 累積メトリクスなど、過去のパーティションの処理結果が必要なケース
    • パーティションの処理順序が重要なケース

concurrent_batches=trueのイメージ

concurrent_batches=falseのイメージ

lookback=0だと、遡らない?

lookback=1だと、その時間だけを処理するという意味になるっぽいです。
一つ前まで遡るなら、2としましょう。ややこしいね。

まとめ

microbatchの導入により、時系列データ処理の実装が大幅にシンプル化されました。全てのユースケースに対応できるわけではありませんが、これまで各チームが独自に実装していた処理パターンが標準化されたことは、dbtコミュニティにとって大きな前進と言えます。

microbatchの実装を見ると、時系列データ処理に関するベストプラクティスが適切に組み込まれていることがわかります。
基本的なユースケースにおいては、microbatchを採用すること自体が最適な選択となるでしょう。

標準機能として組み込まれたことで、処理パフォーマンスの効率性の向上に加えて、今後は運用監視機能の強化なども期待できます。
これまでは職人芸なところがあった時系列データの処理における技術的なハードルが下がることで、より多くのチームがデータ活用に取り組めるようになるはずです。

Discussion