LayerX エンジニアブログ

LayerX の エンジニアブログです。

dbt-snowflakeでPython modelのCustom Materializationを実装する ~Stored Procedure編~

こんにちは。バクラク事業部 機械学習・データ部 データグループの@civitaspoです。もう年の瀬ですが、皆さんはこの一年どんな年だったでしょうか?私はSnowflake一色の一年でした。データ基盤の主要技術をBigQueryからSnowflakeへ移管するプロジェクトの推進を行い、その間Snowflakeを学び、発信し、Snowfalke九州ユーザーグループの立ち上げも行いました。

findy-tools.io

techplay.jp

また、Snowflake Squadへ選出され、Snowflakeチョットワカル人材認定もされました。

developerbadges.snowflake.com

総じて、最高な一年でした。来年はSnowflakeを中心に、より高度なデータ活用を推進していくぞ 💪


さて、今回はタイトルのとおり、 dbt-snowflake の Python Model で Custom Materialization を実装する方法を書きます。今回はStored Procedureにフォーカスして書きますが、UDFなど他のオブジェクトを実装するときも同様の手法で書けるので、Python Model で Custom Materialization を実装したいと思っている方は読んでもらえると嬉しいです。

はじめに、本記事に登場する用語をいくつか説明したあと、本題となるdbt Python Modelを使ってCustom Materializationを実装する方法を書きます。

dbt Python Modelとは

docs.getdbt.com

dbt Python Modelは、dbtのSQLベースのモデリング機能を拡張し、Pythonを使用してデータ変換を行える機能です。Snowflakeの場合、Snowpark Python APIを利用して、データフレーム操作やPythonの豊富なライブラリ(Pandasなど)を活用した複雑なデータ処理を実現できます。従来のSQLでは難しかった機械学習モデルの適用や高度な統計計算なども、Python Modelを使用することで簡単に実装できるようになりました。

現状、dbt Python Modelは、dbtがサポートしているmaterializationのうち、table materializationとincremental materializationのみをサポートしています。

docs.getdbt.com

dbt Custom Materializationとは

dbt Custom Materializationは、独自のmaterialization戦略を実装できる機能です。デフォルトのmaterialization(view, table, incremental, ephemeral)以外にも、要件に合わせた独自のデータ保存・更新方法を定義することができます。このCustom Materialization機能は、dbtの拡張性の高さを象徴するような機能で、高度なデータパイプラインの要件に柔軟に対応できる強力なツールとなっています。例えば、特定のテーブル構造の自動生成や、複雑なマージ戦略の実装、あるいはカスタムのバックフィル処理など、組織固有のニーズに合わせた実装が可能です。

本記事では、このCustom Materialization機能を使って、SnowflakeのStored Procedureを定義する方法を書きます。なお、Custom Materialization実装に関する基礎的な理解は本記事では割愛します。Custom Materializationに関する基礎を理解したい方は、公式ドキュメントに記載されているガイドをご一読ください。

docs.getdbt.com

Snowflake Stored Procedureとは

docs.snowflake.com

Snowflake Stored Procedureは、Snowflake上で手続き的な処理を定義・実行できる機能です。SQLだけでなく、JavaScriptやPython、JavaやScalaといったプログラム言語を使用して、複雑なビジネスロジックやデータ処理を実装することができます。

本記事では、Snowflake Stored ProcedureをPythonで実装することを想定したCustom Materializationを実装します。

dbt Python Modelを使ったCustom Materialization実装

ここから本題となる実装内容について書いていきます。

実装したコード

まず、最初に実装したコードを載せます。以下のコードをコピーして使用すれば、SnowflakeのStored ProcedureをdbtのPython Modelで実装することができます。

/* ./macros/materializations/procedure/snowflake__procedure.sql */
{%- materialization procedure, adapter='snowflake', supported_languages=['sql', 'python'] -%}
  {%- set identifier = model['alias'] -%}
  {%- set language = model['language'] -%}
  {%- set compiled_code = model['compiled_code'] -%}

  {%- set arguments = config.get('arguments', default=[]) -%} -- name, type, default
  {%- set return_type = config.get('return_type', default='varchar' ) -%}
  {%- set execute_as = config.get('execute_as', default='caller' ) -%}
  {%- set runtime_version = config.get('runtime_version', default='3.11' ) -%}
  {%- set packages = config.get('packages', default=[]) -%}
  {%- set imports = config.get('imports', default=[]) -%}
  {%- set external_access_integrations = config.get('external_access_integrations', default=[]) -%}
  {%- set secret_mapping = config.get('secret_mapping', default={}) -%}
  {%- set comment = config.get('comment', default='') -%}

  {%- for arg in arguments -%}
    {%- if not arg.get('name', None) -%}
      {{ exceptions.raise_compiler_error("Argument 'name' is required for Python procedures") }}
    {%- endif -%}
    {%- if not arg.get('type', None) -%}
      {{ exceptions.raise_compiler_error("Argument 'type' is required for Python procedures") }}
    {%- endif -%}
  {%- endfor -%}
    {%- if language == 'python' -%}
    {%- if 'snowflake-snowpark-python' not in packages -%}
      {{ exceptions.raise_compiler_error("Package 'snowflake-snowpark-python' is required for Python procedures") }}
    {%- endif -%}
    -- https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-python#prerequisites-for-writing-stored-procedures-locally
    {%- set supported_python_runtime_versions = ['3.8', '3.9', '3.10', '3.11'] -%}
    {%- if runtime_version not in supported_python_runtime_versions -%}
      {{ exceptions.raise_compiler_error("Runtime version '" ~ runtime_version ~ "' is not supported for Python procedures. Supported versions: " ~ supported_python_runtime_versions | join(', ')) }}
    {%- endif -%}
  {%- endif -%}

  {%- set target_relation = api.Relation.create( identifier=identifier, schema=schema, database=database) -%}
  {%- set has_transactional_hooks = (hooks | selectattr('transaction', 'equalto', True) | list | length) > 0 %}

  {{ run_hooks(pre_hooks, inside_transaction=False) }}

  -- transaction start
  {{ run_hooks(pre_hooks, inside_transaction=True) }}

  {% call statement('main') -%}
    {{ snowflake_create_stored_procedure_statement(
        relation=target_relation,
        language=language,
        arguments=arguments,
        return_type=return_type,
        execute_as=execute_as,
        runtime_version=runtime_version,
        packages=packages,
        imports=imports,
        external_access_integrations=external_access_integrations,
        secret_mapping=secret_mapping,
        comment=comment,
        compiled_code=compiled_code
      ) }}
  {%- endcall %}

  {{ run_hooks(post_hooks, inside_transaction=True) }}
  {{ adapter.commit() }}
  -- transaction end
  {{ run_hooks(post_hooks, inside_transaction=False) }}

  {{ return({'relations': [target_relation]}) }}

{%- endmaterialization -%}
/* ./macros/materializations/procedure/snowflake_create_procedure_statement.sql */
{%- macro snowflake_create_stored_procedure_statement(
        relation, language, arguments, return_type, execute_as, compiled_code,
        runtime_version, packages, imports, external_access_integrations, secret_mapping, comment
) -%}

{{ log("Creating Stored Procedure " ~ relation) }}
CREATE OR REPLACE PROCEDURE {{ relation.include(database=(not temporary), schema=(not temporary)) }}(
{%- for arg in arguments %}
  {{ arg['name'] }} {{ arg['type'] }} {% if arg.get('default', none) is not none%}default {{ arg['default'] }}{% endif %}
  {%- if not loop.last %},{% endif %}
{%- endfor %}
)
returns {{ return_type }}
language {{ language }}
{%- if language == 'python' %}
runtime_version = '{{ runtime_version }}'
  {%- if packages | length > 0 %}
packages = (
    {%- for p in packages %}
  '{{ p }}'{% if not loop.last %},{% endif %}
    {%- endfor %}
)
  {%- endif %}
  {%- if imports | length > 0 %}
imports = (
    {%- for i in imports %}
  '{{ i }}'{% if not loop.last %},{% endif %}
    {%- endfor %}
)
  {%- endif %}
handler = 'main'
  {%- if external_access_integrations | length > 0 %}
external_access_integrations = (
    {%- for e in external_access_integrations %}
  {{ e }}{% if not loop.last %},{% endif %}
    {%- endfor %}
)
  {%- endif %}
  {%- if secret_mapping | length > 0 %}
secrets = (
    {%- for k, v in secret_mapping.items() %}
  '{{ k }}' = {{ v }}{% if not loop.last %},{% endif %}
    {%- endfor %}
)
  {%- endif %}
{%- endif %}
comment = $$
{{ comment }}
$$
execute as {{ execute_as }}
AS
$$

{{ compiled_code }}

{% if language == 'python' %}
def main(session, *args, **kwargs):
    dbt = dbtObj(session.table)
    model(dbt, session)
    return procedure(session, *args, **kwargs)

{% endif %}

$$
;

{%- endmacro -%}

実装したコードの使用例

以下のように、 materialized="procedure" を指定することで Stored Procedure を定義できるようになります。Stored Procedureでは procedure メソッドがCallされます。この例では第一引数に与えられた文字列のPrefixに hello を加えた文字列を返すだけのStored Procedureが定義されます。

def model(dbt, session):
    dbt.config(
        enabled=True,
        materialized="procedure",
        database="example_db",
        schema="procedures",
        alias="example_procedure",
        arguments=[ # Stored Procedureの引数定義
            {"name": "arg1", "type": "varchar"},
            {"name": "arg2", "type": "number", "default": "5"},
        ],
        runtime_version="3.11",
        packages=[ # 使用したいライブラリの定義
            "snowflake-snowpark-python",
            "pandas",
        ],
        external_access_integrations=["example_api"], # 外部APIをCallする場合、External Access Integrationの指定が必要
        secret_mapping={ # 秘匿情報を扱う場合の指定
            "example_token": "example_db.secrets.example_token",
        },
    )
    return None

from snowflake.snowpark import Session
import pandas as pd

def procedure(
    session: Session,
    arg1 str,
    arg2: int,
) -> str:
    return "hello " + arg1

dbt Python Modelを使ったCustom Materialization実装の解説

ここから実装したコードを詳しく解説していきます。 snowflake__procedure.sql と snowflake_create_procedure_statement.sql という2つのファイルで構成されています。Custom Materialization の起点となるコードは snowflake__procedure.sql に実装されていて、snowflake_create_procedure_statement.sql はsnowflake__procedure.sql から呼ばれる構成となっています。

snowflake__procedure.sql

snowflake__procedure.sqlについて詳しく説明していきます。このファイルはdbtのCustom Materializationを実装する上で必須の処理とdbt Python Modelを定義する際に必要なconifgパラメータの定義を行っています。

{%- materialization procedure, adapter='snowflake', supported_languages=['sql', 'python'] -%}

この記述では、 procedure という名前の materialization を Snowflake 向けに定義しています。supported_languages には sql と python を指定しています。現状のdbtではこの2つのみ使用可能です。

github.com

  {%- set identifier = model['alias'] -%}
  {%- set language = model['language'] -%}
  {%- set compiled_code = model['compiled_code'] -%}

ここでは model オブジェクトから処理に必要な情報を取得しています。language パラメータはファイルの拡張子によって sql または python が格納されています。

github.com

以降の記述は抜粋する量が多くなるので、抜粋は割愛して説明を進めます。run_hooks の処理までの記述では、materialization に必要なパラメータの取得を行っています。SnowflakeのStored Procedureを定義するために必要なパラメータをconfigで指定してもらう想定です。

その後、snowflake_create_procedure_statement.sqlのcallに続きます。

snowflake_create_procedure_statement.sql

このファイルはjinja2テンプレートが大量に記載されていて複雑に見えますが、SnowflakeのStored Procedureを定義するためのSQLを組み立てています。

docs.snowflake.com

特筆すべきは {% if language == 'python' %} から始まる以下の記述です。

{% if language == 'python' %}
def main(session, *args, **kwargs):
    dbt = dbtObj(session.table)
    model(dbt, session)
    return procedure(session, *args, **kwargs)

{% endif %}

この記述は、dbt-snowflakeのPython Modelによるtable materializationの実装を模倣しています。

github.com

dbt Python Modelでは、compile時にdbtプロジェクト内のリソースへアクセスするためのアダブターが挿入されます。上記の実装は、このアダプターを使用するための実装です。

github.com

これら2つのファイルによってSnowflake Stored Procedureをdbt Python Modelを使ったCustom Materializationで実装できるようになります。

おわりに

本記事では、dbt Python ModelにおけるCustom Materializationの実装方法について、Snowflake Stored Procedureを例に解説しました。dbt Python Modelを使ったCustom Materializationの例はまだあまり世の中に多くないので、ぜひ本記事を参考にしてCustom Materializationの実装にトライしていただけると嬉しいです。

LayerXでは一緒にデータ基盤を作ってくれる仲間を募集しています。ちょっとでも興味のある方は一度ぜひお話しましょう!

jobs.layerx.co.jp