dbt microbatchがもたらす変化と実装のベストプラクティス
dbt Advent Calendar 2024 23日目の記事です。
dbtのincremental_strategy
に新たにmicrobatch
が導入され、大規模な時系列データの処理方法が革新的に変わろうとしています。
この記事では、microbatch
と従来のincremental
を比較しながら、本番環境への導入に向けた重要なポイントを解説します。
batch_size="hour"
で動かすと壊れる @2024/12/23時点
前提: 当記事では筆者の実際の運用環境を想定し、batch_size="hour"
での実装を前提に解説を進めます。
ただし、現在dbt-coreのバグにより、hourオプションは使用できない状態です。 hourでの運用を検討されている方は、以下のissueの解決をお待ちください。
microbatchの基本設定
{{
config(
materialized="incremental",
incremental_strategy="microbatch",
begin="2022-01-01",
batch_size="day",
event_time="created_at",
lookback=1,
)
}}
select *
from {{ ref("model_name") }}
設定しているパラメーターは、以下の通り。
パラメータ | 必須 | 説明 | 設定例 |
---|---|---|---|
materialized |
✅ | incrementalマテリアライゼーションを指定。 | "incremental" |
incremental_strategy |
✅ | 増分更新の戦略としてmicrobatchを使用。 | "microbatch" |
begin |
✅ | データ処理の開始日を指定。初回実行時やフルリフレッシュ時の開始点となる | "2022-01-01" |
batch_size |
✅ | バッチの時間単位を指定 - 選択可能な値: "hour", "day", "month", "year" |
"day" |
event_time |
✅ | 時系列の基準となるカラム。以下の用途で使用 - バッチ処理の時間区切り - 上流モデルの自動フィルタリング - 遅延データの処理範囲の決定 |
"created_at" |
lookback |
- | - 遅延データに対応するための遡及期間 - 指定した数のバッチ分だけ過去に遡って再処理 - 単位はbatch_sizeで指定した単位に従う - デフォルト値は1 |
1 |
詳しいことは公式ドキュメントを御覧ください。
特徴と利点
時系列データ処理には、バッチの区切り方、遅延データの処理、バックフィル実装など、多くの共通課題が存在します。
これまでは各チームが独自の実装方法を採用してきましたが、microbatch
はこれらの課題に対する標準的なアプローチを提供します。
※初歩的な挙動の説明については、他の記事でも解説されているものが多くあるので割愛します。
event_time
を軸とした時系列処理
時系列データ処理では、データがいつ作成されたか、いつ更新されたかなど、複数の時間軸が存在します。
microbatch
ではevent_time
パラメータで「このモデルが処理すべき時間」を明示的に指定します。
例えば、クリックログを日次で集計する場合はevent_time="click_at"
と指定することで、クリック時刻を基準とした日次バッチ処理が行われます。
batch_size
に基づいた処理の分割
大規模なデータセットを効率的に処理するために、microbatch
ではデータを設定された時間単位(hour/day/month/year)で分割して処理します。
これにより、長期間のデータを適切な大きさのバッチに分割でき、処理の安定性と管理のしやすさが向上します。
例えば、1年分のクリックログを処理する際に batch_size="day"
と指定すると、365個の日次バッチに分割して処理されます。各バッチは独立して処理されるます。
また、特定の日付の処理が失敗しても dbt retry
で該当日のみ再実行が簡単にできるようになります。
lookback
による遅延データの対応
データが発生した時点と到着する時点にずれが生じることが一般的です。
lookback
パラメータはこの遅延に対応するため、指定した期間分だけ過去に遡って再処理を行う機能を提供します。
例えば、batch_sizeが="day"
の時に、lookback=2
と指定すると、今日のデータを処理する際に、今日と昨日のデータが再処理されます。こうすることで遅れたデータもまとめて再処理することができます。
incremental
vs microbatch
標準のじゃあ、実際どう変わるの?ってところをリアルワールドユースケースで比較していきます。
標準的な時系列データの処理
「dbtとデータパーティショニングで、大量データを扱う」で紹介している時系列データの処理パターンをベースに説明します。
ざっくりどういうことやるか説明すると、S3のディレクトリ構造(例:/y=2022/m=11/d=01/h=08/
)をDWH側のパーティションカラム(_partition_hourly="2022-11-01T08:00:00"
)として利用します。
時間を基準とした明示的なデータの区切りにより、大規模データの効率的な処理を実現します。詳しい内容は記事を御覧ください。
図: S3のパスとDWHのテーブルのパーティションの関係性
データの処理の流れとしては以下の図の通りです。
単一のパーティションを指定して、そこに対してのみ変換処理を行うといった具合です。
図: 特定のパーティションを処理する
incremental
での実装
標準の以下は_partition_hourly = 2024-12-13T02:00:00
のデータを処理する例です。
{{
config(
materialized="incremental",
incremental_strategy="delete+insert",
unique_key="_partition_hourly",
)
}}
select *
from {{ ref("model_name") }}
where {{ filter_partition() }}
処理範囲の指定にはfilter_partition
マクロを作って対応しています。
{%- macro filter_partition() -%}
{%- set transform_start_datehour = var("transform_start_datehour", default=none) -%}
{%- set transform_end_datehour = var("transform_end_datehour", default=none) -%}
{%- set start_partition = dbt.safe_cast(
"'" ~ transform_start_datehour ~ "'",
api.Column.translate_type("timestamp"),
) -%}
{%- set end_partition = dbt.safe_cast(
"'" ~ transform_end_datehour ~ "'",
api.Column.translate_type("timestamp"),
) -%}
{{ start_partition }} <= _partition_hourly
and _partition_hourly <= {{ end_partition }}
{%- endmacro -%}
次のようなコマンドを叩いて動かすイメージです。
dbt run --select hoge --vars '{ "transform_start_datehour": 2024-12-17T00:00:00, "transform_end_datehour": 2024-12-19T00:00:00, }'
microbatch
での実装
microbatch
を使用すると、より簡潔な実装が可能です。
遅延データへの対応もlookback
パラメータの調整だけで実現できます。
{{
config(
materialized="incremental",
incremental_strategy="microbatch",
begin="2022-01-01",
batch_size="hour",
event_time="_partition_hourly",
lookback=1,
)
}}
select *
from {{ ref("model_name") }}
これだけでも結構便利なのですが、本当に便利なのは、これらをbackfillする時に真価を発揮します。
Backfill処理
単一パーティションの処理ではmicrobatch
と標準のincrementalに大きな違いはありませんが、複数パーティションのbackfill処理では、microbatchが大きなアドバンテージを持ちます。
2024-12-13 00:00:00
から 2024-12-13 02:00:00
までのデータをbackfillする例で比較してみましょう。
incremental
でのbackfill
標準の標準的なincrementalモデルは以下のようになります。
{{
config(
materialized="incremental",
incremental_strategy="delete+insert",
unique_key="_partition_hourly",
)
}}
select *
from {{ ref("model_name") }}
where {{ filter_partition() }}
このモデルを実行すると、以下のようなクエリが生成されます。
create or replace temporary table db_name.schema_name.model_name__dbt_tmp as
(
select *
from db_name.schema_name.model_name
where
try_cast('2024-12-13 00:00:00' as timestamp) <= _partition_hourly
and _partition_hourly <= try_cast('2024-12-13 02:00:00' as timestamp)
)
処理のイメージはこちらです。
図: 従来のincrementalを使った複数パーティション処理
標準のincremental
では、処理対象期間の全データを一度に中間テーブルに格納します。
データ量が大きい場合、この処理がボトルネックとなる可能性があります。
microbatchでのbackfill
同じモデルをmicrobatchで実装すると。
{{
config(
materialized="incremental",
incremental_strategy="microbatch",
begin="2022-01-01",
batch_size="hour",
event_time="_partition_hourly",
lookback=1,
)
}}
select *
from {{ ref("model_name") }}
実行時には、各時間帯ごとに別々のクエリが生成されます。
create or replace temporary table db_name.schema_name.model_name__dbt_tmp_20241213_00_01
as (
select * from (select * from db_name.schema_name.model_name
where _partition_hourly >= '2024-12-13 00:00:00+00:00'
and _partition_hourly < '2024-12-13 01:00:00+00:00')
)
create or replace temporary table db_name.schema_name.model_name__dbt_tmp_20241213_01_02
as (
select * from (select * from db_name.schema_name.model_name
where _partition_hourly >= '2024-12-13 01:00:00+00:00'
and _partition_hourly < '2024-12-13 02:00:00+00:00')
)
create or replace temporary table db_name.schema_name.model_name__dbt_tmp_20241213_02_03
as (
select * from (select * from db_name.schema_name.model_name
where _partition_hourly >= '2024-12-13 02:00:00+00:00'
and _partition_hourly < '2024-12-13 03:00:00+00:00')
)
これらの処理を図解すると、以下のようになります。
図: microbatchを使った複数パーティション処理(色の違いは別トランザクションを表す)
microbatch
の利点としては、以下の通りです。
- 処理負荷の安定性
- パーティション単位で処理を行うため、単一パーティションでも複数パーティションでも各バッチの処理負荷が一定に保たれます。
- 運用の簡素化
- バックフィル範囲の指定が標準のコマンドラインオプションで可能になり、独自の変数管理が不要になりました。以下のように簡潔なコマンドで実行できます
dbt run --select model_name --event-time-start "2024-01-01T00:00:00" --event-time-end "2024-01-31T00:00:00"
重複排除処理
時系列データにおける重複排除処理は、一見単純に見えて実は複雑な課題です。
特にパーティション化されたデータを扱う場合、単一パーティション内の重複だけでなく、パーティションを跨いだ重複にも対処する必要があります。
時系列データにおける重複排除の課題
例えば、広告のクリックイベントデータでは、同一のIDが異なる時間帯に複数回記録されることがあります。
このような状況では、一般的に「最初に到着したイベントを正とする」という先勝ルールを採用することが多くなります。
しかし、以下のような単純な重複排除の実装では、重要な問題が発生します。
{{
config(
materialized="incremental",
incremental_strategy="delete+insert",
unique_key="_partition_hourly",
)
}}
with source as (
select *
from {{ ref("model_name") }}
where {{ filter_partition() }}
),
deduplicated as (
{{
dbt_utils.deduplicate(
relation="source",
partition_by="click_id",
order_by="_partition_hourly asc, click_at asc",
)
}}
)
select * from deduplicated
この実装の問題点は、パーティションを跨いだ重複の考慮が欠けていることです。
例えば
- click_id: A が
2024-12-13T00:00:00
のパーティションで処理される - 同じ click_id: A が
2024-12-13T01:00:00
のパーティションにも存在する - パーティションごとに独立して処理するため、両方のパーティションで重複したレコードが残ってしまう
図: パーティションを跨いだ同一IDが考慮されていない処理
incremental
での解決策
標準のこの問題に対処するため、処理時に一定時間遡って重複チェックを行う実装が必要です。
{{
config(
materialized="incremental",
incremental_strategy="delete+insert",
unique_key="_partition_hourly",
)
}}
with source as (
select *
from {{ ref("model_name") }}
where {{ filter_partition_for_cumulative_type(1) }}
),
deduplicated as (
{{
dbt_utils.deduplicate(
relation="source",
partition_by="click_id",
order_by="_partition_hourly asc, click_at asc",
)
}}
)
select * from deduplicated
ここで使用する filter_partition_for_cumulative_type
マクロは、指定した時間数だけ遡ってデータを取得します。
{%- macro filter_partition_for_cumulative_type(hours_back) -%}
{%- set transform_start_datehour = var("transform_start_datehour", default=none) -%}
{%- set transform_end_datehour = var("transform_end_datehour", default=none) -%}
{%- set start_datehour = dbt.dateadd(
datepart="hour",
interval=-hours_back,
from_date_or_timestamp="'" ~ transform_start_datehour ~ "'",
) -%}
{%- set end_datehour = dbt.safe_cast(
"'" ~ transform_end_datehour ~ "'",
api.Column.translate_type("timestamp"),
) -%}
{{ start_datehour }} <= _partition_hourly
and _partition_hourly <= {{ end_datehour }}
{%- endmacro -%}
処理の流れとしては、以下のようなイメージです。
図: 複数パーティションを横断して重複排除をする
microbatch
での実装上の注意点
microbatch
では、このような累積することで実現する重複排除に対する標準的な解決策は提供されていません。
そのため以下のような実装で対処する必要があります。
{{
config(
materialized="incremental",
incremental_strategy="microbatch",
begin="2022-01-01",
batch_size="hour",
event_time="_partition_hourly",
lookback=1,
)
}}
with source as (
select *
from {{ ref("model_name").render() }}
where {{ filter_microbatch_for_cumulative_type(model.batch, 1) }}
),
deduplicated as (
{{
dbt_utils.deduplicate(
relation="source",
partition_by="click_id",
order_by="_partition_hourly asc, click_at asc",
)
}}
)
select * from deduplicated
-
.render()
の使用について- microbatchの自動フィルタリングを無効化し、カスタムフィルターを適用するために必要
- これにより、モデル固有の重複排除ロジックを実装可能
ここで使用する filter_microbatch_for_cumulative_type
マクロはmicrobatch
で実行した時に取得できるプロパティを使って処理範囲を決定しています。
※もっといい方法があればアイデア求む
{%- macro filter_microbatch_for_cumulative_type(batch_context, hours_back) -%}
{%- set start_datehour = dbt.dateadd(
datepart="hour",
interval=-hours_back,
from_date_or_timestamp="'" ~ batch_context.event_time_start ~ "'",
) -%}
{%- set end_datehour = dbt.safe_cast(
"'" ~ batch_context.event_time_start ~ "'",
api.Column.translate_type("timestamp"),
) -%}
{{ start_datehour }} <= _partition_hourly
and _partition_hourly <= {{ end_datehour }}
{%- endmacro -%}
lookback
で重複排除は実現できないのか?
microbatch
のlookback
を使用して、パーティションを跨いだ重複排除を実装できないかのか?
結論から言うとlookback
では出来ないです。
lookback
パラメータの内部実装を見ると、以下のようになっています。
lookback
が使われる行はこちら。
その手前の処理で、--event-time-start
が入ってる場合はlookback
を考慮せずにreturnしている状態になっていることがわかります。
つまり、lookback
を設定していたとしても、backfill処理時は考慮されないということです。
lookback
はあくまで遅延データに対するパラメーターなので、累積メトリクスへの対応は想定されていない。
という前提に立つと、backfill時に指定されたパーティションを超えて参照範囲広げるには独自のマクロが必要になるとなります。
その他細かい話
実行コンテキストを表すプロパティ
microbatch
には、バッチ処理の情報にアクセスするためのプロパティが用意されています。
これらのプロパティを使うことで、microbatch
ではカバーしきれない処理に対応することができます。
{% if model.batch %}
{{ log(model.batch.id) }} # バッチIDをログ出力
{{ log(model.batch.event_time_start) }} # バッチの開始時刻をログ出力
{{ log(model.batch.event_time_end) }} # バッチの終了時刻をログ出力
{% endif %}
詳細は公式ドキュメントを参照してください。
注意点: これらのプロパティを過度に使用すると、dbtのバージョンアップ時に互換性の問題が発生する可能性があります。必要最小限の使用にとどめることを推奨します。
event_timeの設定
microbatch
のフィルタリングにはevent_time
の設定が必要不可欠です。
子モデルでmicrobatch
を使用する場合、親モデルがview
であってもevent_time
の設定が必要です。
設定がない場合は、自動的にフルスキャンとなりますのでご注意ください。
SQLファイルでの設定例
{{
config(
materialized="view",
event_time="_partition_hourly",
)
}}
yamlファイルでの設定例
models:
- name: hoge
config:
event_time: _partition_hourly
既存のincrementalモデルからの移行
既存のincremental
モデルからmicrobatch
への移行は、設定の変更だけで完了します。
テーブルの再作成は発生しませんので、安全に移行を進めることができます。
ただし、本記事で説明したように、そのまま移行できないパターンもありますので、事前の確認が必要です。
初期実行の動作
begin
パラメータで指定した日時から現在までのデータを一括処理します。
例えば、begin="2022-04-01"
と設定した場合、2022年4月1日から現在までのデータが処理されます。
2回目以降の動作
lookback
パラメータを指定している場合、現在時刻のパーティションと指定した数だけ遡ったパーティションが処理されます。
例えば、最新パーティションが2024-04-01T12:00:00
でlookback=2
の場合
2024-04-01T12:00:00
2024-04-01T11:00:00
上記の2つのパーティションが処理対象となります。
実際に叩くコマンドのイメージ
microbatch
を前提としたモデルを叩く場合は、以下のような2パターンになりそうです。
# 通常実行(lookback分遡って処理)
dbt run --select hoge
# バックフィル実行(期間指定で処理)
dbt run --select hoge --event-time-start "2024-01-01T00:00:00" --event-time-end "2024-01-31T00:00:00"
並列処理の適用
concurrent_batches=true
の使用は、処理内容によって適切かどうかが変わります。
- 適している場合
- パーティション間の処理順序が結果に影響しないケース
- 各パーティションが独立して処理可能なケース
- 適していない場合
- 累積メトリクスなど、過去のパーティションの処理結果が必要なケース
- パーティションの処理順序が重要なケース
concurrent_batches=true
のイメージ
concurrent_batches=false
のイメージ
lookback=0だと、遡らない?
lookback=1だと、その時間だけを処理するという意味になるっぽいです。
一つ前まで遡るなら、2としましょう。ややこしいね。
まとめ
microbatch
の導入により、時系列データ処理の実装が大幅にシンプル化されました。全てのユースケースに対応できるわけではありませんが、これまで各チームが独自に実装していた処理パターンが標準化されたことは、dbtコミュニティにとって大きな前進と言えます。
microbatch
の実装を見ると、時系列データ処理に関するベストプラクティスが適切に組み込まれていることがわかります。
基本的なユースケースにおいては、microbatch
を採用すること自体が最適な選択となるでしょう。
標準機能として組み込まれたことで、処理パフォーマンスの効率性の向上に加えて、今後は運用監視機能の強化なども期待できます。
これまでは職人芸なところがあった時系列データの処理における技術的なハードルが下がることで、より多くのチームがデータ活用に取り組めるようになるはずです。
Discussion