Skip to content

Commit 299a7f8

Browse files
authored
Merge pull request #2 from datahub-project/feature/iceberg_source
Iceberg: Use logical type in the contained types for ArrayTypeClass & MapTypeClass + fix unit-tests.
2 parents f408559 + fade9d7 commit 299a7f8

File tree

2 files changed

+19
-8
lines changed

2 files changed

+19
-8
lines changed

metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,17 @@ def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None:
129129
}
130130

131131
@staticmethod
132-
def _get_type_name(avro_schema: avro.schema.Schema) -> str:
133-
return str(getattr(avro_schema.type, "type", avro_schema.type))
132+
def _get_type_name(
133+
avro_schema: avro.schema.Schema, logical_if_present: bool = False
134+
) -> str:
135+
logical_type_name: Optional[str] = None
136+
if logical_if_present:
137+
logical_type_name = getattr(
138+
avro_schema, "logical_type", None
139+
) or avro_schema.props.get("logicalType")
140+
return logical_type_name or str(
141+
getattr(avro_schema.type, "type", avro_schema.type)
142+
)
134143

135144
@staticmethod
136145
def _get_column_type(
@@ -151,15 +160,17 @@ def _get_column_type(
151160
avro_schema, avro.schema.ArraySchema
152161
):
153162
dt.type.nestedType = [
154-
AvroToMceSchemaConverter._get_type_name(avro_schema.items)
163+
AvroToMceSchemaConverter._get_type_name(
164+
avro_schema.items, logical_if_present=True
165+
)
155166
]
156167
elif isinstance(dt.type, MapTypeClass) and isinstance(
157168
avro_schema, avro.schema.MapSchema
158169
):
159170
# Avro map's key is always a string. See: https://avro.apache.org/docs/current/spec.html#Maps
160171
dt.type.keyType = "string"
161172
dt.type.valueType = AvroToMceSchemaConverter._get_type_name(
162-
avro_schema.values
173+
avro_schema.values, logical_if_present=True
163174
)
164175
return dt
165176

metadata-ingestion/tests/unit/test_iceberg.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,10 @@ def test_iceberg_primitive_type_to_schema_field(
192192
IcebergTypes.TimestampType.without_timezone(),
193193
"timestamp-micros",
194194
),
195-
(IcebergTypes.TimeType.get(), "timestamp-micros"),
195+
(IcebergTypes.TimeType.get(), "time-micros"),
196196
(
197197
IcebergTypes.UUIDType.get(),
198-
"string",
198+
"uuid",
199199
),
200200
],
201201
)
@@ -250,10 +250,10 @@ def test_iceberg_list_to_schema_field(
250250
IcebergTypes.TimestampType.without_timezone(),
251251
"timestamp-micros",
252252
),
253-
(IcebergTypes.TimeType.get(), "timestamp-micros"),
253+
(IcebergTypes.TimeType.get(), "time-micros"),
254254
(
255255
IcebergTypes.UUIDType.get(),
256-
"string",
256+
"uuid",
257257
),
258258
],
259259
)

0 commit comments

Comments
 (0)