Yappli Tech Blog

株式会社ヤプリの開発メンバーによるブログです。最新の技術情報からチーム・働き方に関するテーマまで、日々の熱い想いを持って発信していきます。

BigQuery上にあるUDFをdbt macroで管理するようにした話

この記事は dbt Advent Calendar 4日目&Yappli Advent Calendar 4日目の記事です。

こんにちは! データサイエンス室の山本(@__Y4M4MOTO__)です。

ヤプリではdbtを用いて分析用データ基盤を構築しています。過去の記事で、UDFはdbtで管理せずBigQuery上に置く運用を採っているとの旨を書きました。

BigQueryのUDFはdbtプロジェクトでは管理せず、UDF用のデータセットを用意することで管理していました。この管理方法を採っていた理由は、UDFとして実装した処理をデータ基盤だけでなくアドホックな分析でも利用できるようにするためです。

tech.yappli.io

しかし今回、dbt macroを使ってdbtで管理するよう変更しました。この記事ではその理由や方法などについて記します。

なぜやろうと思ったのか?

UDFを新たに追加したり既存のUDFに変更を加えたりした際に本番環境への反映を手動で行なう必要があり、事故リスクが高かったからです。

当時の開発フローではUDFを追加/改修したい場合、以下のような手順で行なっていました。

  1. dev環境でUDFを追加/改修し、dbtモデルを開発
  2. Pull Request(以下、PR)にて開発したdbtモデルをレビュー
  3. 開発したdbtモデルを本番環境へリリース
  4. リリース後、その日の実行が始まるまでに追加/改修したUDFを本番環境へ反映

課題があったのは手順2.と手順4.です。

まず手順2.について。追加/改修したUDFの内容はFile Changed等からは確認できないため、PR説明で別途レビュアーへ伝達する必要がありました。これがレビューイ、レビュアー双方にとって負担となっていました。

次に手順4.について。UDFはdbtで管理していないため、dbt側のリリースに合わせて別途、本番反映作業が必要でした。これが漏れると本番環境でエラーが発生してしまいます。ヤプリのdbt基盤はアプリプラットフォーム『Yappli』の CMSダッシュボードYappli Analytics など各種サービスに利用されているため、エラーが発生すると顧客に影響が出てしまいます。

これらの課題を解消するため、次の2点を満たせる方法が必要でした。

  • dbtプロジェクト外(=アドホック分析)でもUDFを利用できるようにする
  • dbtプロジェクト内でUDFを管理できるようにする

実装した方法

方法の説明

次の手順を行なう方法を実装しました。

  1. UDF1つにつき、次の2つのdbt macro(以下、あわせてUDF用dbt macro)を作成
    • CREATE FUNCTIONを実行するためのdbt macro(以下、CREATE FUNCTION用dbt macro)
    • BigQuery上にあるUDFを呼び出すためのdbt macro(以下、UDF呼び出し用dbt macro)
  2. dbtモデルではUDF呼び出し用dbt macroを利用

例えば today() というUDFを作成したい場合、 次のような流れで作成します。

まず、CREATE FUNCTION用dbt macroとして create_today() を作成します。実装コードを以下に示します。このmacroを実行すると、BigQuery上に today() というUDFが作成されます。

{%- macro create_today() -%}
{%- set query -%}
CREATE OR REPLACE FUNCTION 
  YOUR_GC_PROJECT_ID.YOUR_GC_DATASET.today() #}
RETURNS STRING {# 返り値の型を書く #}
AS (
  {#- ここに処理を書く -#}
  CURRENT_DATE()
);
{%- endset -%}
{% do run_query(query) %}
{%- endmacro -%}

続いて、 BigQuery上にあるUDF today() を呼び出すためのmacro today() を作成します。実装コードを以下に示します。

{%- macro today() -%}
YOUR_GC_PROJECT_ID.YOUR_GC_DATASET.today()
{%- endmacro -%}

dbtモデルでUDF today() を呼び出したい場合、macro today() 経由で呼び出すようにします。実装例を以下に示します。

SELECT
    *
FROM
    {{ ref('YOUR_MODEL') }}
WHERE
    dt > {{ today() }}

上記を図にすると次のようになります。

graph LR
    subgraph dbt macro
        macro___create_today[create_today]
        macro___today[today]
    end

    subgraph BigQuery
        bq___today[today]
    end

    subgraph dbt model
        YOUR_MODEL
    end

    macro___create_today
    -- CREATE FUNCTION --> bq___today
    -- 呼び出し --> macro___today
    -- 呼び出し --> YOUR_MODEL

この方法により、前述の2点を満たすことができました。

  • dbtプロジェクト外(=アドホック分析)でもUDFを利用できるようにする
  • dbtプロジェクト内でUDFを管理できるようにする

留意点

UDFに引数が与えられている場合、当然、UDF呼び出し用dbt macroにも同じように引数を与える必要があります。その際、引数の書き方を次のようにする必要があります。

引数に与えるもの 書き方 UDFの場合の書き方 dbt macroの場合の書き方
数値 そのままでOK `hoge.bar`.foo(1) {{ foo(1) }}
カラム "..." で囲む `hoge.bar`.foo(col) {{ foo("col") }}
サブクエリ "..." で囲む `hoge.bar`.foo((SELECT 1)) {{ foo("(SELECT 1)") }}
文字列 "\"...\"" で囲む `hoge.bar`.foo("str") {{ foo("\"str\"") }}

このようにする理由はdbt macroのcompile結果にあります。引数を "..." で囲わなかった場合と囲った場合のcompile結果を以下に示します。

compile前 compile後 補足
{{ foo(abc) }} `hoge.bar`.foo() 引数が消えていえる
{{ foo("abc") }} `hoge.bar`.foo(abc) 引数を囲っていた "..." が消えている

この挙動から、上記のような書き方をする必要があります。

実運用のための工夫点

前述の方法を実運用に載せるにあたって、次の工夫を行ないました。

本番環境とdev環境でUDFの場所を切り替えるためのdbt macroを作成

UDFのあるデータセットは本番環境とdev環境で当然異なるため、UDF用dbt macroを作る際にはその切り替えをうまくやる必要があります。

シンプルな方法はdbtの target変数 を利用して {{ target.project }}.{{ target.dataset }} とすることです。この方法をとった場合の各macroの実装例を以下に示します。

  • CREATE FUNCTION用dbt macro
{%- macro create_today() -%}
{%- set query -%}
CREATE OR REPLACE FUNCTION 
  `{{ target.project }}.{{ target.dataset }}`.today() #}
RETURNS STRING {# 返り値の型を書く #}

〜〜(以下略)〜〜
  • UDF呼び出し用macro
{%- macro today() -%}
`{{ target.project }}.{{ target.dataset }}`.today()
{%- endmacro -%}

しかし、弊社のdbt基盤は下記記事の方法を使ってdbtの出力先となるGoogle Cloudプロジェクト(以下、GCプロジェクト)とBigQueryの請求先となるGCプロジェクトを分けています、そのため、前述の方法が採れませんでした。

tech.yappli.io

そこで、環境ごとにUDFの場所を切り替えるためのdbt macroを用意して対処しました。サンプル実装を以下に示します。

{%- macro get_destination_dataset() -%}
{%- if target.name == "dev" -%}
`YOUR_DEV_GC_PROJECT_ID.{{ target.dataset }}`
{%- else -%}
`YOUR_PROD_GC_PROJECT_ID.{{ target.dataset }}`
{%- endif -%}
{%- endmacro -%}

このmacroを使った場合、各macroの実装は次のようになります。

  • CREATE FUNCTION用dbt macro
{%- macro create_today() -%}
{%- set query -%}
CREATE OR REPLACE FUNCTION 
  {{ get_destination_dataset() }}.today() #}
RETURNS STRING {# 返り値の型を書く #}

〜〜(以下略)〜〜
  • UDF呼び出し用amcro
{%- macro today() -%}
{{ get_destination_dataset() }}.today()
{%- endmacro -%}

CREATE FUNCTION用dbt macroとUDF呼び出し用dbt macroは同じファイル内に定義

dbt macroは1ファイルに複数定義できます。そこで、CREATE FUNCTION用dbt macroとUDF呼び出し用dbt macroを同じファイルに定義しました。これにより、同じUDFのmacroが1ファイルにまとまり、管理しやすくなりました。

UDF用dbt macroを定義するためのテンプレートファイルを用意

UDF用dbt macroを簡単に定義できるよう、次のようなテンプレートファイルを用意しました。定義にあたって変えるべき箇所や留意点などをコメントに記載して、なるべくつまづかずに定義作業を行なえるよう工夫しました。

{# -------------------------------- #}
{#   CREATE FUNCTION 用 dbt macro   #}
{# -------------------------------- #}

{# macro名は "create_UDF名" (例: create_today)の形式にすること #}
{# ファイル名はmacro名と同じにすること #}
{%- macro create_YOUR_UDF_NAME() -%}
{%- set query -%}
CREATE OR REPLACE FUNCTION 
  {{ get_destination_dataset() }}.YOUR_UDF_NAME() {# UDF名と引数を設定する(例: today(arg1 STRING) ) #}
RETURNS STRING {# 返り値の型を書く #}
AS (
  {#- ここに処理を書く -#}
  'Hello, World!'
);
{%- endset -%}
{% do run_query(query) %}
{%- endmacro -%}

{# -------------------------------- #}
{#   UDF 呼び出し用 dbt macro         #}
{# -------------------------------- #}

{# macro名、引数はUDFと合わせること(例: today(arg1) ) #}
{%- macro YOUR_UDF_NAME() -%}
{{ get_destination_dataset() }}.YOUR_UDF_NAME() {# UDF名と引数を設定する(例: today({{ arg1 }}) ) #}
{%- endmacro -%}

本番リリース時にCREATE FUNCTION用dbt macroを一括実行

本番リリース前に本番環境へUDFを作成しておく必要があります。そのため、リリース時にすべてのUDFのCREATE FUNCTION用dbt macroを一括実行するワークフローをGitHub Actionsで用意しました。

一括実行をするにあたって、UDF用dbt macroが増減した際にdbt macroの作成/削除以外の操作(例: dbt_project.ymlの vars: に追記する)が不要になるよう工夫しました。理由は、そのような操作があるとエラーの元になるからです。

工夫したのはUDF用dbt macroの配置先とファイル名です。具体的には次のように工夫しました。

  • 配置先 ... macros/ ディレクトリ配下に create_udfs/ というディレクトリを作成しその中に配置
  • ファイル名 ... CREATE FUNCTION用dbt macroのmacro名と同じに設定

イメージとしては次のようなディレクトリ構成になります。

macros/
|-- create_udfs/
|   |-- create_today.sql
|   |-- create_your_udf_name.sql
|   |-- ...
|-- (その他のmacroファイル)

これにより、次のコマンドで一括実行が行なえるようにしました。

find ./macros/create_udfs/ -maxdepth 1 -type f -exec basename {} .sql \; | xargs -n1 -I {} sh -c 'echo {}; dbt run-operation {}'

このコマンドでは次の操作を行なっています。

  1. ./macros/create_udfs/ ディレクトリ直下にあるファイルのbasename(ファイル名からディレクトリ名と拡張子を取り除いたもの)を取得
  2. 取得したbasenameを1つずつ dbt run-operation で実行

実装完了までの流れ

実装完了までの流れは次のとおりです。

  1. 各UDFのdbt macroを作成
  2. dbtモデル内でUDFが利用されている箇所をUDF呼び出し用dbt macroに置き換え
  3. 差分をチェックし、置き換えの漏れやミスを確認
  4. 検証環境で実際に動かして動作確認
  5. 本番環境へリリース

苦労したのは手順3.の差分チェックです。UDFの利用箇所が多く大量の差分が発生したため、それらを1つ1つ確認するのが大変でした…。

追記: 2つのdbt macroを1つにまとめる

dbt macroのデフォルト引数、可変引数を使うと、こんな感じで2つのdbt macroを1つにまとめることができます。

  • UDF用dbt macroのテンプレート
{# ----------------------------------- #}
{#   UDF 用 dbt macro 実装用テンプレート   #}
{# ----------------------------------- #}

{# STEP 1: macro名を定義してください #}
{# -- ファイル名はmacro名と同じにしてください(例: macro名が "_YOUR_UDF_NAME" の場合、 "_YOUR_UDF_NAME.sql" ) #}
{%- macro _YOUR_UDF_NAME(args, create_udf=false) -%}

{# -------------------------------- #}
{#   CREATE FUNCTION 処理            #}
{# -------------------------------- #}
{%- if create_udf -%}
{%- set query -%}

{# STEP 2: UDF名、引数、返り値を定義してください #}
{# -- UDF名は STEP 1 で定義したものに合わせてください #}
{# -- 引数は STEP 1 には従わず自由に設定して構いません #}
CREATE OR REPLACE FUNCTION {{ get_destination_dataset() }}._YOUR_UDF_NAME(int_value INT64, str_value STRING, date_value DATE)
RETURNS DATE
AS (
{# STEP 3: UDF を実装してください -#}

{# INT64 の実装例 -#}
-- int_value + 1

{# STRING の実装例 -#}
-- CONCAT(str_value, '_suffix')

{# DATE の実装例 -#}
DATE_ADD(date_value, INTERVAL 1 DAY)
);
{%- endset -%}

{# {%- do log('finish:'~query, info=True) -%} #}
{%- do run_query(query) %}

{# -------------------------------- #}
{#   UDF 呼び出し用処理                #}
{# -------------------------------- #}
{%- else -%}

{# STEP 4: UDF 呼び出し処理を実装してください #}
{# -- 呼び出す UDF は STEP 2 で定義したものを呼び出してください #}
{# -- 引数は {{ kwargs['UDFに定義した引数名'] }} のように設定してください #}
{{ get_destination_dataset() }}._YOUR_UDF_NAME({{ kwargs['int_value'] }}, {{ kwargs['str_value'] }}, {{ kwargs['date_value'] }})

{%- endif -%}
{%- endmacro -%}

{# STEP 5: 次のコマンドを実行して、 BigQuery 上に UDF が作成してください #}
{# dbt run-operation _YOUR_UDF_NAME --args '{create_udf: true}' #}

{# STEP 6: この dbt macro は次のようにして dbt model 内で呼び出せます #}
{# 例: SELECT {{ _YOUR_UDF_NAME(int_value=1, str_value='"hoge"', date_value='"2024-10-31"') }} #}
  • 一括実行用コマンド
find ./macros/create_udfs/ -maxdepth 1 -type f -exec basename {} .sql \; | xargs -n1 -I {} sh -c 'echo {}; dbt run-operation {} --args "{create_udf: true}"'

結び

この記事ではdbt macroを使ってBigQuery上にあるUDFを管理する方法について記しました。

なお、データモデリングが完璧ならそもそもアドホック分析でdbt macroに実装した処理を使う必要はないはず(すでに処理が施されたdbtモデルを使えば良いはずなので)、という話もあります。データモデリングは奥が深いですね…。

とはいえ、すでにあるデータ基盤をdbtへ移行する場合など、この記事の方法が必要になる状況はあるかと思いますので、その際の参考になれば幸いです。

ここまでお読みいただきありがとうございました!

(2024/12/04追記)別の管理方法:Custom Materializationを使う

この記事を公開したところ、XにてdbtのCustom Materializationを使う方法もあるというのを教えていただきました!(ありがとうございます…!🙇)

zenn.dev