ããã«ã¡ã¯ããã¯ã©ã¯äºæ¥é¨ æ©æ¢°å¦ç¿ã»ãã¼ã¿é¨ ãã¼ã¿ã°ã«ã¼ãã®@civitaspoã§ããããå¹´ã®ç¬ã§ãããçããã¯ãã®ä¸å¹´ã©ããªå¹´ã ã£ãã§ããããï¼ç§ã¯Snowflakeä¸è²ã®ä¸å¹´ã§ããããã¼ã¿åºç¤ã®ä¸»è¦æè¡ãBigQueryããSnowflakeã¸ç§»ç®¡ããããã¸ã§ã¯ãã®æ¨é²ãè¡ãããã®éSnowflakeãå¦ã³ãçºä¿¡ããSnowfalkeä¹å·ã¦ã¼ã¶ã¼ã°ã«ã¼ãã®ç«ã¡ä¸ããè¡ãã¾ããã
ã¾ããSnowflake Squadã¸é¸åºãããSnowflakeï¾ï½®ï½¯ï¾ï¾ï½¶ï¾äººæèªå®ãããã¾ããã
ç·ãã¦ãæé«ãªä¸å¹´ã§ãããæ¥å¹´ã¯Snowflakeãä¸å¿ã«ãããé«åº¦ãªãã¼ã¿æ´»ç¨ãæ¨é²ãã¦ããã ðª
ãã¦ãä»åã¯ã¿ã¤ãã«ã®ã¨ããã dbt-snowflake ã® Python Model 㧠Custom Materialization ãå®è£ ããæ¹æ³ãæ¸ãã¾ããä»åã¯Stored Procedureã«ãã©ã¼ã«ã¹ãã¦æ¸ãã¾ãããUDFãªã©ä»ã®ãªãã¸ã§ã¯ããå®è£ ããã¨ããåæ§ã®ææ³ã§æ¸ããã®ã§ãPython Model 㧠Custom Materialization ãå®è£ ãããã¨æã£ã¦ããæ¹ã¯èªãã§ããããã¨å¬ããã§ãã
ã¯ããã«ãæ¬è¨äºã«ç»å ´ããç¨èªãããã¤ã説æãããã¨ãæ¬é¡ã¨ãªãdbt Python Modelã使ã£ã¦Custom Materializationãå®è£ ããæ¹æ³ãæ¸ãã¾ãã
dbt Python Modelã¨ã¯
dbt Python Modelã¯ãdbtã®SQLãã¼ã¹ã®ã¢ããªã³ã°æ©è½ãæ¡å¼µããPythonã使ç¨ãã¦ãã¼ã¿å¤æãè¡ããæ©è½ã§ããSnowflakeã®å ´åãSnowpark Python APIãå©ç¨ãã¦ããã¼ã¿ãã¬ã¼ã æä½ãPythonã®è±å¯ãªã©ã¤ãã©ãªï¼Pandasãªã©ï¼ãæ´»ç¨ããè¤éãªãã¼ã¿å¦çãå®ç¾ã§ãã¾ããå¾æ¥ã®SQLã§ã¯é£ããã£ãæ©æ¢°å¦ç¿ã¢ãã«ã®é©ç¨ãé«åº¦ãªçµ±è¨è¨ç®ãªã©ããPython Modelã使ç¨ãããã¨ã§ç°¡åã«å®è£ ã§ããããã«ãªãã¾ããã
ç¾ç¶ãdbt Python Modelã¯ãdbtããµãã¼ããã¦ããmaterializationã®ãã¡ãtable materializationã¨incremental materializationã®ã¿ããµãã¼ããã¦ãã¾ãã
dbt Custom Materializationã¨ã¯
dbt Custom Materializationã¯ãç¬èªã®materializationæ¦ç¥ãå®è£ ã§ããæ©è½ã§ããããã©ã«ãã®materializationï¼view, table, incremental, ephemeralï¼ä»¥å¤ã«ããè¦ä»¶ã«åãããç¬èªã®ãã¼ã¿ä¿åã»æ´æ°æ¹æ³ãå®ç¾©ãããã¨ãã§ãã¾ãããã®Custom Materializationæ©è½ã¯ãdbtã®æ¡å¼µæ§ã®é«ãã象徴ãããããªæ©è½ã§ãé«åº¦ãªãã¼ã¿ãã¤ãã©ã¤ã³ã®è¦ä»¶ã«æè»ã«å¯¾å¿ã§ããå¼·åãªãã¼ã«ã¨ãªã£ã¦ãã¾ããä¾ãã°ãç¹å®ã®ãã¼ãã«æ§é ã®èªåçæããè¤éãªãã¼ã¸æ¦ç¥ã®å®è£ ããããã¯ã«ã¹ã¿ã ã®ããã¯ãã£ã«å¦çãªã©ãçµç¹åºæã®ãã¼ãºã«åãããå®è£ ãå¯è½ã§ãã
æ¬è¨äºã§ã¯ããã®Custom Materializationæ©è½ã使ã£ã¦ãSnowflakeã®Stored Procedureãå®ç¾©ããæ¹æ³ãæ¸ãã¾ãããªããCustom Materializationå®è£ ã«é¢ããåºç¤çãªç解ã¯æ¬è¨äºã§ã¯å²æãã¾ããCustom Materializationã«é¢ããåºç¤ãç解ãããæ¹ã¯ãå ¬å¼ããã¥ã¡ã³ãã«è¨è¼ããã¦ããã¬ã¤ãããä¸èªãã ããã
Snowflake Stored Procedureã¨ã¯
Snowflake Stored Procedureã¯ãSnowflakeä¸ã§æç¶ãçãªå¦çãå®ç¾©ã»å®è¡ã§ããæ©è½ã§ããSQLã ãã§ãªããJavaScriptãPythonãJavaãScalaã¨ãã£ãããã°ã©ã è¨èªã使ç¨ãã¦ãè¤éãªãã¸ãã¹ãã¸ãã¯ããã¼ã¿å¦çãå®è£ ãããã¨ãã§ãã¾ãã
æ¬è¨äºã§ã¯ãSnowflake Stored ProcedureãPythonã§å®è£ ãããã¨ãæ³å®ããCustom Materializationãå®è£ ãã¾ãã
dbt Python Modelã使ã£ãCustom Materializationå®è£
ããããæ¬é¡ã¨ãªãå®è£ å 容ã«ã¤ãã¦æ¸ãã¦ããã¾ãã
å®è£ ããã³ã¼ã
ã¾ããæåã«å®è£ ããã³ã¼ããè¼ãã¾ãã以ä¸ã®ã³ã¼ããã³ãã¼ãã¦ä½¿ç¨ããã°ãSnowflakeã®Stored Procedureãdbtã®Python Modelã§å®è£ ãããã¨ãã§ãã¾ãã
/* ./macros/materializations/procedure/snowflake__procedure.sql */ {%- materialization procedure, adapter='snowflake', supported_languages=['sql', 'python'] -%} {%- set identifier = model['alias'] -%} {%- set language = model['language'] -%} {%- set compiled_code = model['compiled_code'] -%} {%- set arguments = config.get('arguments', default=[]) -%} -- name, type, default {%- set return_type = config.get('return_type', default='varchar' ) -%} {%- set execute_as = config.get('execute_as', default='caller' ) -%} {%- set runtime_version = config.get('runtime_version', default='3.11' ) -%} {%- set packages = config.get('packages', default=[]) -%} {%- set imports = config.get('imports', default=[]) -%} {%- set external_access_integrations = config.get('external_access_integrations', default=[]) -%} {%- set secret_mapping = config.get('secret_mapping', default={}) -%} {%- set comment = config.get('comment', default='') -%} {%- for arg in arguments -%} {%- if not arg.get('name', None) -%} {{ exceptions.raise_compiler_error("Argument 'name' is required for Python procedures") }} {%- endif -%} {%- if not arg.get('type', None) -%} {{ exceptions.raise_compiler_error("Argument 'type' is required for Python procedures") }} {%- endif -%} {%- endfor -%} {%- if language == 'python' -%} {%- if 'snowflake-snowpark-python' not in packages -%} {{ exceptions.raise_compiler_error("Package 'snowflake-snowpark-python' is required for Python procedures") }} {%- endif -%} -- https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-python#prerequisites-for-writing-stored-procedures-locally {%- set supported_python_runtime_versions = ['3.8', '3.9', '3.10', '3.11'] -%} {%- if runtime_version not in supported_python_runtime_versions -%} {{ exceptions.raise_compiler_error("Runtime version '" ~ runtime_version ~ "' is not supported for Python procedures. Supported versions: " ~ supported_python_runtime_versions | join(', ')) }} {%- endif -%} {%- endif -%} {%- set target_relation = api.Relation.create( identifier=identifier, schema=schema, database=database) -%} {%- set has_transactional_hooks = (hooks | selectattr('transaction', 'equalto', True) | list | length) > 0 %} {{ run_hooks(pre_hooks, inside_transaction=False) }} -- transaction start {{ run_hooks(pre_hooks, inside_transaction=True) }} {% call statement('main') -%} {{ snowflake_create_stored_procedure_statement( relation=target_relation, language=language, arguments=arguments, return_type=return_type, execute_as=execute_as, runtime_version=runtime_version, packages=packages, imports=imports, external_access_integrations=external_access_integrations, secret_mapping=secret_mapping, comment=comment, compiled_code=compiled_code ) }} {%- endcall %} {{ run_hooks(post_hooks, inside_transaction=True) }} {{ adapter.commit() }} -- transaction end {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} {%- endmaterialization -%}
/* ./macros/materializations/procedure/snowflake_create_procedure_statement.sql */ {%- macro snowflake_create_stored_procedure_statement( relation, language, arguments, return_type, execute_as, compiled_code, runtime_version, packages, imports, external_access_integrations, secret_mapping, comment ) -%} {{ log("Creating Stored Procedure " ~ relation) }} CREATE OR REPLACE PROCEDURE {{ relation.include(database=(not temporary), schema=(not temporary)) }}( {%- for arg in arguments %} {{ arg['name'] }} {{ arg['type'] }} {% if arg.get('default', none) is not none%}default {{ arg['default'] }}{% endif %} {%- if not loop.last %},{% endif %} {%- endfor %} ) returns {{ return_type }} language {{ language }} {%- if language == 'python' %} runtime_version = '{{ runtime_version }}' {%- if packages | length > 0 %} packages = ( {%- for p in packages %} '{{ p }}'{% if not loop.last %},{% endif %} {%- endfor %} ) {%- endif %} {%- if imports | length > 0 %} imports = ( {%- for i in imports %} '{{ i }}'{% if not loop.last %},{% endif %} {%- endfor %} ) {%- endif %} handler = 'main' {%- if external_access_integrations | length > 0 %} external_access_integrations = ( {%- for e in external_access_integrations %} {{ e }}{% if not loop.last %},{% endif %} {%- endfor %} ) {%- endif %} {%- if secret_mapping | length > 0 %} secrets = ( {%- for k, v in secret_mapping.items() %} '{{ k }}' = {{ v }}{% if not loop.last %},{% endif %} {%- endfor %} ) {%- endif %} {%- endif %} comment = $$ {{ comment }} $$ execute as {{ execute_as }} AS $$ {{ compiled_code }} {% if language == 'python' %} def main(session, *args, **kwargs): dbt = dbtObj(session.table) model(dbt, session) return procedure(session, *args, **kwargs) {% endif %} $$ ; {%- endmacro -%}
å®è£ ããã³ã¼ãã®ä½¿ç¨ä¾
以ä¸ã®ããã«ã materialized="procedure"
ãæå®ãããã¨ã§ Stored Procedure ãå®ç¾©ã§ããããã«ãªãã¾ããStored Procedureã§ã¯ procedure
ã¡ã½ãããCallããã¾ãããã®ä¾ã§ã¯ç¬¬ä¸å¼æ°ã«ä¸ããããæååã®Prefixã« hello
ãå ããæååãè¿ãã ãã®Stored Procedureãå®ç¾©ããã¾ãã
def model(dbt, session): dbt.config( enabled=True, materialized="procedure", database="example_db", schema="procedures", alias="example_procedure", arguments=[ # Stored Procedureã®å¼æ°å®ç¾© {"name": "arg1", "type": "varchar"}, {"name": "arg2", "type": "number", "default": "5"}, ], runtime_version="3.11", packages=[ # 使ç¨ãããã©ã¤ãã©ãªã®å®ç¾© "snowflake-snowpark-python", "pandas", ], external_access_integrations=["example_api"], # å¤é¨APIãCallããå ´åãExternal Access Integrationã®æå®ãå¿ è¦ secret_mapping={ # ç§å¿æ å ±ãæ±ãå ´åã®æå® "example_token": "example_db.secrets.example_token", }, ) return None from snowflake.snowpark import Session import pandas as pd def procedure( session: Session, arg1 str, arg2: int, ) -> str: return "hello " + arg1
dbt Python Modelã使ã£ãCustom Materializationå®è£ ã®è§£èª¬
ããããå®è£
ããã³ã¼ãã詳ãã解説ãã¦ããã¾ãããsnowflake__procedure.sql
㨠snowflake_create_procedure_statement.sql
ã¨ãã2ã¤ã®ãã¡ã¤ã«ã§æ§æããã¦ãã¾ããCustom Materialization ã®èµ·ç¹ã¨ãªãã³ã¼ã㯠snowflake__procedure.sql
ã«å®è£
ããã¦ãã¦ãsnowflake_create_procedure_statement.sql
ã¯snowflake__procedure.sql
ããå¼ã°ããæ§æã¨ãªã£ã¦ãã¾ãã
snowflake__procedure.sql
snowflake__procedure.sql
ã«ã¤ãã¦è©³ãã説æãã¦ããã¾ãããã®ãã¡ã¤ã«ã¯dbtã®Custom Materializationãå®è£
ããä¸ã§å¿
é ã®å¦çã¨dbt Python Modelãå®ç¾©ããéã«å¿
è¦ãªconifgãã©ã¡ã¼ã¿ã®å®ç¾©ãè¡ã£ã¦ãã¾ãã
{%- materialization procedure, adapter='snowflake', supported_languages=['sql', 'python'] -%}
ãã®è¨è¿°ã§ã¯ã procedure
ã¨ããååã® materialization ã Snowflake åãã«å®ç¾©ãã¦ãã¾ããsupported_languages ã«ã¯ sql
㨠python
ãæå®ãã¦ãã¾ããç¾ç¶ã®dbtã§ã¯ãã®2ã¤ã®ã¿ä½¿ç¨å¯è½ã§ãã
{%- set identifier = model['alias'] -%} {%- set language = model['language'] -%} {%- set compiled_code = model['compiled_code'] -%}
ããã§ã¯ model ãªãã¸ã§ã¯ãããå¦çã«å¿
è¦ãªæ
å ±ãåå¾ãã¦ãã¾ããlanguage ãã©ã¡ã¼ã¿ã¯ãã¡ã¤ã«ã®æ¡å¼µåã«ãã£ã¦ sql
ã¾ã㯠python
ãæ ¼ç´ããã¦ãã¾ãã
以éã®è¨è¿°ã¯æç²ããéãå¤ããªãã®ã§ãæç²ã¯å²æãã¦èª¬æãé²ãã¾ããrun_hooks
ã®å¦çã¾ã§ã®è¨è¿°ã§ã¯ãmaterialization ã«å¿
è¦ãªãã©ã¡ã¼ã¿ã®åå¾ãè¡ã£ã¦ãã¾ããSnowflakeã®Stored Procedureãå®ç¾©ããããã«å¿
è¦ãªãã©ã¡ã¼ã¿ãconfigã§æå®ãã¦ãããæ³å®ã§ãã
ãã®å¾ãsnowflake_create_procedure_statement.sql
ã®callã«ç¶ãã¾ãã
snowflake_create_procedure_statement.sql
ãã®ãã¡ã¤ã«ã¯jinja2ãã³ãã¬ã¼ãã大éã«è¨è¼ããã¦ãã¦è¤éã«è¦ãã¾ãããSnowflakeã®Stored Procedureãå®ç¾©ããããã®SQLãçµã¿ç«ã¦ã¦ãã¾ãã
ç¹çãã¹ã㯠{% if language == 'python' %}
ããå§ã¾ã以ä¸ã®è¨è¿°ã§ãã
{% if language == 'python' %} def main(session, *args, **kwargs): dbt = dbtObj(session.table) model(dbt, session) return procedure(session, *args, **kwargs) {% endif %}
ãã®è¨è¿°ã¯ãdbt-snowflakeã®Python Modelã«ããtable materializationã®å®è£ ã模å£ãã¦ãã¾ãã
dbt Python Modelã§ã¯ãcompileæã«dbtããã¸ã§ã¯ãå ã®ãªã½ã¼ã¹ã¸ã¢ã¯ã»ã¹ããããã®ã¢ããã¿ã¼ãæ¿å ¥ããã¾ããä¸è¨ã®å®è£ ã¯ããã®ã¢ããã¿ã¼ã使ç¨ããããã®å®è£ ã§ãã
ããã2ã¤ã®ãã¡ã¤ã«ã«ãã£ã¦Snowflake Stored Procedureãdbt Python Modelã使ã£ãCustom Materializationã§å®è£ ã§ããããã«ãªãã¾ãã
ãããã«
æ¬è¨äºã§ã¯ãdbt Python Modelã«ãããCustom Materializationã®å®è£ æ¹æ³ã«ã¤ãã¦ãSnowflake Stored Procedureãä¾ã«è§£èª¬ãã¾ãããdbt Python Modelã使ã£ãCustom Materializationã®ä¾ã¯ã¾ã ãã¾ãä¸ã®ä¸ã«å¤ããªãã®ã§ããã²æ¬è¨äºãåèã«ãã¦Custom Materializationã®å®è£ ã«ãã©ã¤ãã¦ããã ããã¨å¬ããã§ãã
LayerXã§ã¯ä¸ç·ã«ãã¼ã¿åºç¤ãä½ã£ã¦ããã仲éãåéãã¦ãã¾ããã¡ãã£ã¨ã§ãèå³ã®ããæ¹ã¯ä¸åº¦ãã²ã話ãã¾ãããï¼