-
Notifications
You must be signed in to change notification settings - Fork 507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modernize legacy insert by period materialization #410
Changes from all commits
3493d3f
051a2e4
8536054
96ff9ed
7ac2058
364772b
147494a
3bb17d5
a8d3259
3e100c2
760a33d
ec73c80
f876d1d
a961622
ed6d55c
e53a801
b360a91
53017a4
888d781
6fe4adb
af2dd57
cdba4b5
7c81020
f82f2ee
171e5df
e1cdcee
713f8a6
1ed1e58
9c680f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
{{ | ||
config( | ||
materialized = 'view', | ||
enabled=(target.type == 'redshift') | ||
materialized = 'view' | ||
) | ||
}} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
{% macro check_for_period_filter(model_unique_id, sql) %} | ||
{{ return(adapter.dispatch('check_for_period_filter', 'dbt_utils')(model_unique_id, sql)) }} | ||
{% endmacro %} | ||
|
||
{% macro default__check_for_period_filter(model_unique_id, sql) %} | ||
{%- if sql.find('__PERIOD_FILTER__') == -1 -%} | ||
{%- set error_message -%} | ||
Model '{{ model_unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql | ||
{%- endset -%} | ||
{{ exceptions.raise_compiler_error(error_message) }} | ||
{%- endif -%} | ||
{% endmacro %} | ||
|
||
{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} | ||
{{ return(adapter.dispatch('get_period_boundaries', 'dbt_utils')(target_schema, target_table, timestamp_field, start_date, stop_date, period)) }} | ||
{% endmacro %} | ||
|
||
{% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} | ||
|
||
{% call statement('period_boundaries', fetch_result=True) -%} | ||
{%- set model_name = model['name'] -%} | ||
|
||
with data as ( | ||
select | ||
coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp, | ||
coalesce( | ||
{{dbt_utils.dateadd('millisecond', | ||
-1, | ||
"nullif('" ~ stop_date ~ "','')::timestamp")}}, | ||
{{dbt_utils.current_timestamp()}} | ||
) as stop_timestamp | ||
from "{{target_schema}}"."{{model_name}}" | ||
) | ||
|
||
select | ||
start_timestamp, | ||
stop_timestamp, | ||
{{dbt_utils.datediff('start_timestamp', | ||
'stop_timestamp', | ||
period)}} + 1 as num_periods | ||
from data | ||
{%- endcall %} | ||
|
||
{%- endmacro %} | ||
|
||
{% macro snowflake__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} | ||
|
||
{% call statement('period_boundaries', fetch_result=True) -%} | ||
{%- set model_name = model['name'] -%} | ||
|
||
with data as ( | ||
select | ||
coalesce(max({{timestamp_field}}), '{{start_date}}')::timestamp as start_timestamp, | ||
coalesce( | ||
{{dbt_utils.dateadd('millisecond', | ||
-1, | ||
"nullif('" ~ stop_date ~ "','')::timestamp")}}, | ||
{{dbt_utils.current_timestamp()}} | ||
) as stop_timestamp | ||
from {{target.schema}}.{{model_name}} | ||
) | ||
|
||
select | ||
start_timestamp, | ||
stop_timestamp, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Downstream, load_result truncates these timestamp(9) to timestamp(6), which breaks timestamp comparisons downstream, thus inserting duplicates if incremental is rerun after a completed load. The fix is to cast both start_timestamp and stop_timestamp here to varchar, so that load_result gets a string and not a timestamp. |
||
{{dbt_utils.datediff('start_timestamp', | ||
'stop_timestamp', | ||
period)}} + 1 as num_periods | ||
from data | ||
{%- endcall %} | ||
|
||
{%- endmacro %} | ||
|
||
|
||
{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} | ||
{{ return(adapter.dispatch('get_period_sql', 'dbt_utils')(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset)) }} | ||
{% endmacro %} | ||
|
||
{% macro default__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} | ||
|
||
{%- set period_filter -%} | ||
("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and | ||
"{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line will not work properly if
2019-05-30 00:00:00 & 2019-05-31 00:00:00 => one day will not be processed.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tested this block and it works ok. Feel free to use it.
|
||
"{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp) | ||
{%- endset -%} | ||
|
||
{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} | ||
|
||
select | ||
{{target_cols_csv}} | ||
from ( | ||
{{filtered_sql}} | ||
) as t -- has to have an alias | ||
|
||
{%- endmacro %} | ||
|
||
{% macro snowflake__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} | ||
|
||
{%- set period_filter -%} | ||
({{timestamp_field}} > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and | ||
{{timestamp_field}} <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and | ||
{{timestamp_field}} < '{{stop_timestamp}}'::timestamp) | ||
{%- endset -%} | ||
|
||
{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} | ||
|
||
select | ||
{{target_cols_csv}} | ||
from ( | ||
{{filtered_sql}} | ||
) | ||
|
||
{%- endmacro %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nanosecond is more accurate, i.e. timestamp(9), millisecond only covers timestamp(3).