Skip to content

Commit

Permalink
fix: allow switch from hive to iceberg table (dbt-labs#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicor88 authored Apr 18, 2024
1 parent 0d32c0c commit 107437c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
17 changes: 14 additions & 3 deletions dbt/include/athena/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
{%- do drop_relation(old_tmp_relation) -%}
{%- endif -%}

{%- set old_relation_bkp = make_temp_relation(old_relation, '__bkp') -%}
-- If we have this, it means that at least the first renaming occurred but there was an issue
-- afterwards, therefore we are in weird state. The easiest and cleanest should be to remove
-- the backup relation. It won't have an impact because since we are in the else condition,
Expand All @@ -132,10 +131,22 @@
{{ query_result }}
{% endcall %}
{%- endif -%}
{{ rename_relation(old_relation, old_relation_bkp) }}

{%- set old_relation_table_type = adapter.get_glue_table_type(old_relation) -%}

{%- if old_relation_table_type == 'iceberg' -%}
{{ rename_relation(old_relation, old_bkp_relation) }}
{%- else -%}
{%- do drop_relation_glue(old_relation) -%}
{%- endif -%}

{{ rename_relation(tmp_relation, target_relation) }}

{{ drop_relation(old_relation_bkp) }}
-- old_bkp_relation might not exists in case we have a switch from hive to iceberg
-- we prevent to drop something that doesn't exist even if drop_relation is able to deal with not existing tables
{%- if old_bkp_relation is not none -%}
{%- do drop_relation(old_bkp_relation) -%}
{%- endif -%}
{%- endif -%}
{%- endif -%}

Expand Down
67 changes: 67 additions & 0 deletions tests/functional/adapter/test_hive_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import pytest

from dbt.contracts.results import RunStatus
from dbt.tests.util import run_dbt

models__table_base_model = """
{{ config(
materialized='table',
table_type=var("table_type"),
s3_data_naming='table_unique'
)
}}
select
1 as id,
'test 1' as name,
{{ cast_timestamp('current_timestamp') }} as created_at
union all
select
2 as id,
'test 2' as name,
{{ cast_timestamp('current_timestamp') }} as created_at
"""


class TestTableFromHiveToIceberg:
@pytest.fixture(scope="class")
def models(self):
return {"table_hive_to_iceberg.sql": models__table_base_model}

def test__table_creation(self, project):
relation_name = "table_hive_to_iceberg"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

model_run_hive = run_dbt(["run", "--select", relation_name, "--vars", '{"table_type":"hive"}'])
model_run_result_hive = model_run_hive.results[0]
assert model_run_result_hive.status == RunStatus.Success
models_records_count_hive = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
assert models_records_count_hive == 2

model_run_iceberg = run_dbt(["run", "--select", relation_name, "--vars", '{"table_type":"iceberg"}'])
model_run_result_iceberg = model_run_iceberg.results[0]
assert model_run_result_iceberg.status == RunStatus.Success
models_records_count_iceberg = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
assert models_records_count_iceberg == 2


class TestTableFromIcebergToHive:
@pytest.fixture(scope="class")
def models(self):
return {"table_iceberg_to_hive.sql": models__table_base_model}

def test__table_creation(self, project):
relation_name = "table_iceberg_to_hive"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

model_run_iceberg = run_dbt(["run", "--select", relation_name, "--vars", '{"table_type":"iceberg"}'])
model_run_result_iceberg = model_run_iceberg.results[0]
assert model_run_result_iceberg.status == RunStatus.Success
models_records_count_iceberg = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
assert models_records_count_iceberg == 2

model_run_hive = run_dbt(["run", "--select", relation_name, "--vars", '{"table_type":"hive"}'])
model_run_result_hive = model_run_hive.results[0]
assert model_run_result_hive.status == RunStatus.Success
models_records_count_hive = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
assert models_records_count_hive == 2

0 comments on commit 107437c

Please sign in to comment.