Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize legacy insert by period materialization #410

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3493d3f
Update legacy macro to 0.21.latest incremental
HorvathDanielMarton Aug 17, 2021
051a2e4
Raise error if no filter specified
HorvathDanielMarton Aug 17, 2021
8536054
Create empty table first
HorvathDanielMarton Aug 17, 2021
96ff9ed
Add configurations and period filter
HorvathDanielMarton Aug 23, 2021
7ac2058
Make macro work on manually defined simple example
HorvathDanielMarton Aug 23, 2021
364772b
Implement for loop with predefined time boundaries
HorvathDanielMarton Aug 23, 2021
147494a
Move period boundary macros to helpers.sql
HorvathDanielMarton Aug 23, 2021
3bb17d5
Modify helper macros to support Snowflake
HorvathDanielMarton Aug 24, 2021
a8d3259
Use macros to automate the for loop
HorvathDanielMarton Aug 24, 2021
3e100c2
Refactor and add logging, comments
HorvathDanielMarton Aug 26, 2021
760a33d
Use incremental_upsert() in the for loop
HorvathDanielMarton Aug 26, 2021
ec73c80
Introduce check_for_period_filter macro
HorvathDanielMarton Aug 26, 2021
f876d1d
Dispatch check_for_period_filter() macro
HorvathDanielMarton Aug 26, 2021
a961622
Remove unnecessary code, improve logging
HorvathDanielMarton Aug 26, 2021
ed6d55c
Handle full refresh
HorvathDanielMarton Aug 31, 2021
e53a801
Remove verbose CLI logging
HorvathDanielMarton Aug 31, 2021
b360a91
Update CHANGELOG.md
HorvathDanielMarton Aug 31, 2021
53017a4
Update README.md
HorvathDanielMarton Aug 31, 2021
888d781
Alias table expression in a helper function
HorvathDanielMarton Aug 31, 2021
6fe4adb
Update integration tests
HorvathDanielMarton Aug 31, 2021
af2dd57
Set default period to week
HorvathDanielMarton Aug 31, 2021
cdba4b5
Use make_temp_relation() in for loop
HorvathDanielMarton Aug 31, 2021
7c81020
Update README.md
HorvathDanielMarton Sep 1, 2021
f82f2ee
Run test_insert_by_period for all targets
HorvathDanielMarton Sep 1, 2021
171e5df
Merge remote-tracking branch 'upstream/master' into fix-legacy-insert…
HorvathDanielMarton Sep 1, 2021
e1cdcee
Add aggressive CLI logging for debug purposes
HorvathDanielMarton Sep 3, 2021
713f8a6
Add logging to get_period_boundaries()
HorvathDanielMarton Sep 3, 2021
1ed1e58
Extend helper function logging
HorvathDanielMarton Sep 3, 2021
9c680f9
Remove debug logging
HorvathDanielMarton Sep 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
- Declare compatibility with dbt v0.21.0, which has no breaking changes for this package ([#398](https://github.com/fishtown-analytics/dbt-utils/pull/398))

## Features

- Modernize the `insert_by_period` materialization and make it Snowflake-compatible.
- Allow user to provide any case type when defining the `exclude` argument in `dbt_utils.star()` ([#403](https://github.com/dbt-labs/dbt-utils/pull/403))


Expand Down
14 changes: 2 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1079,19 +1079,9 @@ with events as (
* `stop_date`: literal date or timestamp (default=current_timestamp)

**Caveats:**
* This materialization is compatible with dbt 0.10.1.
* This materialization has been written for Redshift.
* This materialization is compatible with dbt 0.21.0.
* This materialization has been written primarily for Snowflake.
* This materialization can only be used for a model where records are not expected to change after they are created.
* Any model post-hooks that use `{{ this }}` will fail using this materialization. For example:
```yaml
models:
project-name:
post-hook: "grant select on {{ this }} to db_reader"
```
A useful workaround is to change the above post-hook to:
```yaml
post-hook: "grant select on {{ this.schema }}.{{ this.name }} to db_reader"
```

----

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{{
config(
materialized = 'view',
enabled=(target.type == 'redshift')
materialized = 'view'
)
}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
period = 'month',
timestamp_field = 'created_at',
start_date = '2018-01-01',
stop_date = '2018-06-01',
enabled=(target.type == 'redshift')
stop_date = '2018-06-01'
)
}}

Expand Down
113 changes: 113 additions & 0 deletions macros/materializations/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{% macro check_for_period_filter(model_unique_id, sql) %}
{{ return(adapter.dispatch('check_for_period_filter', 'dbt_utils')(model_unique_id, sql)) }}
{% endmacro %}

{% macro default__check_for_period_filter(model_unique_id, sql) %}
{%- if sql.find('__PERIOD_FILTER__') == -1 -%}
{%- set error_message -%}
Model '{{ model_unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql
{%- endset -%}
{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}
{% endmacro %}

{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}
{{ return(adapter.dispatch('get_period_boundaries', 'dbt_utils')(target_schema, target_table, timestamp_field, start_date, stop_date, period)) }}
{% endmacro %}

{% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}

{% call statement('period_boundaries', fetch_result=True) -%}
{%- set model_name = model['name'] -%}

with data as (
select
coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp,
coalesce(
{{dbt_utils.dateadd('millisecond',
-1,
"nullif('" ~ stop_date ~ "','')::timestamp")}},
{{dbt_utils.current_timestamp()}}
) as stop_timestamp
from "{{target_schema}}"."{{model_name}}"
)

select
start_timestamp,
stop_timestamp,
{{dbt_utils.datediff('start_timestamp',
'stop_timestamp',
period)}} + 1 as num_periods
from data
{%- endcall %}

{%- endmacro %}

{% macro snowflake__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}

{% call statement('period_boundaries', fetch_result=True) -%}
{%- set model_name = model['name'] -%}

with data as (
select
coalesce(max({{timestamp_field}}), '{{start_date}}')::timestamp as start_timestamp,
coalesce(
{{dbt_utils.dateadd('millisecond',
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nanosecond is more accurate, i.e. timestamp(9), millisecond only covers timestamp(3).

-1,
"nullif('" ~ stop_date ~ "','')::timestamp")}},
{{dbt_utils.current_timestamp()}}
) as stop_timestamp
from {{target.schema}}.{{model_name}}
)

select
start_timestamp,
stop_timestamp,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downstream, load_result truncates these timestamp(9) to timestamp(6), which breaks timestamp comparisons downstream, thus inserting duplicates if incremental is rerun after a completed load.

The fix is to cast both start_timestamp and stop_timestamp here to varchar, so that load_result gets a string and not a timestamp.

{{dbt_utils.datediff('start_timestamp',
'stop_timestamp',
period)}} + 1 as num_periods
from data
{%- endcall %}

{%- endmacro %}


{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}
{{ return(adapter.dispatch('get_period_sql', 'dbt_utils')(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset)) }}
{% endmacro %}

{% macro default__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
"{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line will not work properly if period = "month"
example:

select  '2018-12-31 00:00:00'::timestamp + interval '4 month' + interval '1 month', 
        '2018-12-31 00:00:00'::timestamp + interval '5 month'

2019-05-30 00:00:00 & 2019-05-31 00:00:00 => one day will not be processed.
Redshift can deal with the brackets, like '{{start_timestamp}}'::timestamp + (interval '{{offset}} {{period}}' + interval '1 {{period}}'), Snowflake will not like it.
Suggestion (that I cannot test right away): calculate offset_plus_one as offset + 1 and use it in setting the filter, like

  {%- set period_filter -%}
    ("{{timestamp_field}}" >  '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
     "{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset_plus_one}} {{period}}' and
     "{{timestamp_field}}" <  '{{stop_timestamp}}'::timestamp)
  {%- endset -%}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this block and it works ok. Feel free to use it.

{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

  {%- set offset_plus_one = offset + 1 -%}

  {%- set period_filter -%}
    ("{{timestamp_field}}" >  '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
     "{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset_plus_one}} {{period}}' and
     "{{timestamp_field}}" <  '{{stop_timestamp}}'::timestamp)
  {%- endset -%}

  {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

  select
    {{target_cols_csv}}
  from (
    {{filtered_sql}}
  )

{%- endmacro %}

"{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp)
{%- endset -%}

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

select
{{target_cols_csv}}
from (
{{filtered_sql}}
) as t -- has to have an alias

{%- endmacro %}

{% macro snowflake__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
({{timestamp_field}} > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
{{timestamp_field}} <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
{{timestamp_field}} < '{{stop_timestamp}}'::timestamp)
{%- endset -%}

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

select
{{target_cols_csv}}
from (
{{filtered_sql}}
)

{%- endmacro %}
Loading