Skip to content

Commit 8faa921

Browse files
authored
fix: Fixed table_type for GOVERNED tables (dbt-labs#661)
1 parent 557df06 commit 8faa921

File tree

4 files changed

+37
-32
lines changed

4 files changed

+37
-32
lines changed

dbt/adapters/athena/impl.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
)
4848
from dbt.adapters.athena.python_submissions import AthenaPythonJobHelper
4949
from dbt.adapters.athena.relation import (
50-
RELATION_TYPE_MAP,
5150
AthenaRelation,
5251
AthenaSchemaSearchMap,
5352
TableType,
@@ -542,12 +541,13 @@ def _s3_path_exists(self, s3_bucket: str, s3_prefix: str) -> bool:
542541
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
543542
return True if "Contents" in response else False
544543

545-
def _get_one_table_for_catalog(self, table: TableTypeDef, database: str) -> List[Dict[str, Any]]:
544+
@staticmethod
545+
def _get_one_table_for_catalog(table: TableTypeDef, database: str) -> List[Dict[str, Any]]:
546546
table_catalog = {
547547
"table_database": database,
548548
"table_schema": table["DatabaseName"],
549549
"table_name": table["Name"],
550-
"table_type": RELATION_TYPE_MAP[table.get("TableType", "EXTERNAL_TABLE")].value,
550+
"table_type": get_table_type(table).value,
551551
"table_comment": table.get("Parameters", {}).get("comment", table.get("Description", "")),
552552
}
553553
return [
@@ -563,14 +563,13 @@ def _get_one_table_for_catalog(self, table: TableTypeDef, database: str) -> List
563563
for idx, col in enumerate(table["StorageDescriptor"]["Columns"] + table.get("PartitionKeys", []))
564564
]
565565

566-
def _get_one_table_for_non_glue_catalog(
567-
self, table: TableTypeDef, schema: str, database: str
568-
) -> List[Dict[str, Any]]:
566+
@staticmethod
567+
def _get_one_table_for_non_glue_catalog(table: TableTypeDef, schema: str, database: str) -> List[Dict[str, Any]]:
569568
table_catalog = {
570569
"table_database": database,
571570
"table_schema": schema,
572571
"table_name": table["Name"],
573-
"table_type": RELATION_TYPE_MAP[table.get("TableType", "EXTERNAL_TABLE")].value,
572+
"table_type": get_table_type(table).value,
574573
"table_comment": table.get("Parameters", {}).get("comment", ""),
575574
}
576575
return [
@@ -583,6 +582,7 @@ def _get_one_table_for_non_glue_catalog(
583582
"column_comment": col.get("Comment", ""),
584583
},
585584
}
585+
# TODO: review this code part as TableTypeDef class does not contain "Columns" attribute
586586
for idx, col in enumerate(table["Columns"] + table.get("PartitionKeys", []))
587587
]
588588

dbt/adapters/athena/relation.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def add(self, relation: AthenaRelation) -> None:
8181
RELATION_TYPE_MAP = {
8282
"EXTERNAL_TABLE": TableType.TABLE,
8383
"EXTERNAL": TableType.TABLE, # type returned by federated query tables
84+
"GOVERNED": TableType.TABLE,
8485
"MANAGED_TABLE": TableType.TABLE,
8586
"VIRTUAL_VIEW": TableType.VIEW,
8687
"table": TableType.TABLE,
@@ -91,16 +92,20 @@ def add(self, relation: AthenaRelation) -> None:
9192

9293

9394
def get_table_type(table: TableTypeDef) -> TableType:
94-
_type = RELATION_TYPE_MAP.get(table.get("TableType"))
95-
_specific_type = table.get("Parameters", {}).get("table_type", "")
95+
table_full_name = ".".join(filter(None, [table.get("CatalogId"), table.get("DatabaseName"), table["Name"]]))
9696

97-
if _specific_type.lower() == "iceberg":
98-
_type = TableType.ICEBERG
97+
input_table_type = table.get("TableType")
98+
if input_table_type and input_table_type not in RELATION_TYPE_MAP:
99+
raise ValueError(f"Table type {table['TableType']} is not supported for table {table_full_name}")
99100

100-
if _type is None:
101-
raise ValueError("Table type cannot be None")
101+
if table.get("Parameters", {}).get("table_type", "").lower() == "iceberg":
102+
_type = TableType.ICEBERG
103+
elif not input_table_type:
104+
raise ValueError(f"Table type cannot be None for table {table_full_name}")
105+
else:
106+
_type = RELATION_TYPE_MAP[input_table_type]
102107

103-
LOGGER.debug(f"table_name : {table.get('Name')}")
108+
LOGGER.debug(f"table_name : {table_full_name}")
104109
LOGGER.debug(f"table type : {_type}")
105110

106111
return _type

tests/unit/test_adapter.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -414,16 +414,11 @@ def test__get_one_catalog(self, mock_aws_service):
414414
mock_aws_service.create_database("baz")
415415
mock_aws_service.create_table(table_name="bar", database_name="foo")
416416
mock_aws_service.create_table(table_name="bar", database_name="quux")
417-
mock_aws_service.create_table_without_type(table_name="qux", database_name="baz")
418417
mock_information_schema = mock.MagicMock()
419418
mock_information_schema.database = "awsdatacatalog"
420419

421420
self.adapter.acquire_connection("dummy")
422-
actual = self.adapter._get_one_catalog(
423-
mock_information_schema,
424-
{"foo", "quux", "baz"},
425-
self.used_schemas,
426-
)
421+
actual = self.adapter._get_one_catalog(mock_information_schema, {"foo", "quux"}, self.used_schemas)
427422

428423
expected_column_names = (
429424
"table_database",
@@ -443,14 +438,16 @@ def test__get_one_catalog(self, mock_aws_service):
443438
("awsdatacatalog", "quux", "bar", "table", None, "id", 0, "string", None),
444439
("awsdatacatalog", "quux", "bar", "table", None, "country", 1, "string", None),
445440
("awsdatacatalog", "quux", "bar", "table", None, "dt", 2, "date", None),
446-
("awsdatacatalog", "baz", "qux", "table", None, "id", 0, "string", None),
447-
("awsdatacatalog", "baz", "qux", "table", None, "country", 1, "string", None),
448441
]
449442
assert actual.column_names == expected_column_names
450443
assert len(actual.rows) == len(expected_rows)
451444
for row in actual.rows.values():
452445
assert row.values() in expected_rows
453446

447+
mock_aws_service.create_table_without_type(table_name="qux", database_name="baz")
448+
with pytest.raises(ValueError):
449+
self.adapter._get_one_catalog(mock_information_schema, {"baz"}, self.used_schemas)
450+
454451
@mock_aws
455452
def test__get_one_catalog_by_relations(self, mock_aws_service):
456453
mock_aws_service.create_data_catalog()

tests/unit/test_relation.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,24 @@
88

99

1010
class TestRelation:
11-
def test__get_relation_type_table(self):
12-
assert get_table_type({"Name": "name", "TableType": "table"}) == TableType.TABLE
11+
@pytest.mark.parametrize(
12+
("table", "expected"),
13+
[
14+
({"Name": "n", "TableType": "table"}, TableType.TABLE),
15+
({"Name": "n", "TableType": "VIRTUAL_VIEW"}, TableType.VIEW),
16+
({"Name": "n", "TableType": "EXTERNAL_TABLE", "Parameters": {"table_type": "ICEBERG"}}, TableType.ICEBERG),
17+
],
18+
)
19+
def test__get_relation_type(self, table, expected):
20+
assert get_table_type(table) == expected
1321

1422
def test__get_relation_type_with_no_type(self):
1523
with pytest.raises(ValueError):
1624
get_table_type({"Name": "name"})
1725

18-
def test__get_relation_type_view(self):
19-
assert get_table_type({"Name": "name", "TableType": "VIRTUAL_VIEW"}) == TableType.VIEW
20-
21-
def test__get_relation_type_iceberg(self):
22-
assert (
23-
get_table_type({"Name": "name", "TableType": "EXTERNAL_TABLE", "Parameters": {"table_type": "ICEBERG"}})
24-
== TableType.ICEBERG
25-
)
26+
def test__get_relation_type_with_unknown_type(self):
27+
with pytest.raises(ValueError):
28+
get_table_type({"Name": "name", "TableType": "test"})
2629

2730

2831
class TestAthenaRelation:

0 commit comments

Comments
 (0)