Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize legacy insert by period materialization #410

Closed
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3493d3f
Update legacy macro to 0.21.latest incremental
HorvathDanielMarton Aug 17, 2021
051a2e4
Raise error if no filter specified
HorvathDanielMarton Aug 17, 2021
8536054
Create empty table first
HorvathDanielMarton Aug 17, 2021
96ff9ed
Add configurations and period filter
HorvathDanielMarton Aug 23, 2021
7ac2058
Make macro work on manually defined simple example
HorvathDanielMarton Aug 23, 2021
364772b
Implement for loop with predefined time boundaries
HorvathDanielMarton Aug 23, 2021
147494a
Move period boundary macros to helpers.sql
HorvathDanielMarton Aug 23, 2021
3bb17d5
Modify helper macros to support Snowflake
HorvathDanielMarton Aug 24, 2021
a8d3259
Use macros to automate the for loop
HorvathDanielMarton Aug 24, 2021
3e100c2
Refactor and add logging, comments
HorvathDanielMarton Aug 26, 2021
760a33d
Use incremental_upsert() in the for loop
HorvathDanielMarton Aug 26, 2021
ec73c80
Introduce check_for_period_filter macro
HorvathDanielMarton Aug 26, 2021
f876d1d
Dispatch check_for_period_filter() macro
HorvathDanielMarton Aug 26, 2021
a961622
Remove unnecessary code, improve logging
HorvathDanielMarton Aug 26, 2021
ed6d55c
Handle full refresh
HorvathDanielMarton Aug 31, 2021
e53a801
Remove verbose CLI logging
HorvathDanielMarton Aug 31, 2021
b360a91
Update CHANGELOG.md
HorvathDanielMarton Aug 31, 2021
53017a4
Update README.md
HorvathDanielMarton Aug 31, 2021
888d781
Alias table expression in a helper function
HorvathDanielMarton Aug 31, 2021
6fe4adb
Update integration tests
HorvathDanielMarton Aug 31, 2021
af2dd57
Set default period to week
HorvathDanielMarton Aug 31, 2021
cdba4b5
Use make_temp_relation() in for loop
HorvathDanielMarton Aug 31, 2021
7c81020
Update README.md
HorvathDanielMarton Sep 1, 2021
f82f2ee
Run test_insert_by_period for all targets
HorvathDanielMarton Sep 1, 2021
171e5df
Merge remote-tracking branch 'upstream/master' into fix-legacy-insert…
HorvathDanielMarton Sep 1, 2021
e1cdcee
Add aggressive CLI logging for debug purposes
HorvathDanielMarton Sep 3, 2021
713f8a6
Add logging to get_period_boundaries()
HorvathDanielMarton Sep 3, 2021
1ed1e58
Extend helper function logging
HorvathDanielMarton Sep 3, 2021
9c680f9
Remove debug logging
HorvathDanielMarton Sep 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move period boundary macros to helpers.sql
  • Loading branch information
HorvathDanielMarton committed Aug 23, 2021
commit 147494a3a4bc14476c14b508d9ec81c9be7f94a4
51 changes: 51 additions & 0 deletions macros/materializations/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}
{{ return(adapter.dispatch('get_period_boundaries', 'dbt_utils')(target_schema, target_table, timestamp_field, start_date, stop_date, period)) }}
{% endmacro %}

{% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}

{% call statement('period_boundaries', fetch_result=True) -%}
with data as (
select
coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp,
coalesce(
{{dbt_utils.dateadd('millisecond',
-1,
"nullif('" ~ stop_date ~ "','')::timestamp")}},
{{dbt_utils.current_timestamp()}}
) as stop_timestamp
from "{{target_schema}}"."{{target_table}}"
)

select
start_timestamp,
stop_timestamp,
{{dbt_utils.datediff('start_timestamp',
'stop_timestamp',
period)}} + 1 as num_periods
from data
{%- endcall %}

{%- endmacro %}

{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}
{{ return(adapter.dispatch('get_period_sql', 'dbt_utils')(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset)) }}
{% endmacro %}

{% macro default__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
"{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line will not work properly if period = "month"
example:

select  '2018-12-31 00:00:00'::timestamp + interval '4 month' + interval '1 month', 
        '2018-12-31 00:00:00'::timestamp + interval '5 month'

2019-05-30 00:00:00 & 2019-05-31 00:00:00 => one day will not be processed.
Redshift can deal with the brackets, like '{{start_timestamp}}'::timestamp + (interval '{{offset}} {{period}}' + interval '1 {{period}}'), Snowflake will not like it.
Suggestion (that I cannot test right away): calculate offset_plus_one as offset + 1 and use it in setting the filter, like

  {%- set period_filter -%}
    ("{{timestamp_field}}" >  '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
     "{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset_plus_one}} {{period}}' and
     "{{timestamp_field}}" <  '{{stop_timestamp}}'::timestamp)
  {%- endset -%}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this block and it works ok. Feel free to use it.

{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

  {%- set offset_plus_one = offset + 1 -%}

  {%- set period_filter -%}
    ("{{timestamp_field}}" >  '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
     "{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset_plus_one}} {{period}}' and
     "{{timestamp_field}}" <  '{{stop_timestamp}}'::timestamp)
  {%- endset -%}

  {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

  select
    {{target_cols_csv}}
  from (
    {{filtered_sql}}
  )

{%- endmacro %}

"{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp)
{%- endset -%}

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

select
{{target_cols_csv}}
from (
{{filtered_sql}}
)

{%- endmacro %}