Skip to content

Commit

Permalink
feat(ingest): dbt - updating source lineage logic (#5414)
Browse files Browse the repository at this point in the history
Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
gabe-lyons and shirshanka authored Jul 25, 2022
1 parent 941770f commit efc5602
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 76 deletions.
14 changes: 11 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ class DBTConfig(StatefulIngestionConfigBase):
False,
description="Prior to version 0.8.38, dbt tests were represented as datasets. If you ingested dbt tests before, set this flag to True (just needed once) to soft-delete tests that were generated as datasets by previous ingestion.",
)
backcompat_skip_source_on_lineage_edge: bool = Field(
False,
description="Prior to version 0.8.41, lineage edges to sources were directed to the target platform node rather than the dbt source node. This contradicted the established pattern for other lineage edges to point to upstream dbt nodes. To revert lineage logic to this legacy approach, set this flag to true.",
)

@property
def s3_client(self):
Expand Down Expand Up @@ -544,6 +548,7 @@ def get_upstreams(
environment: str,
disable_dbt_node_creation: bool,
platform_instance: Optional[str],
legacy_skip_source_lineage: Optional[bool],
) -> List[str]:
upstream_urns = []

Expand Down Expand Up @@ -579,9 +584,8 @@ def get_upstreams(
materialized = upstream_manifest_node.get("config", {}).get("materialized")
resource_type = upstream_manifest_node["resource_type"]

if (
materialized in {"view", "table", "incremental"}
or resource_type == "source"
if materialized in {"view", "table", "incremental"} or (
resource_type == "source" and legacy_skip_source_lineage
):
# upstream urns point to the target platform
platform_value = target_platform
Expand Down Expand Up @@ -873,6 +877,7 @@ def load_test_results(
config.env,
config.disable_dbt_node_creation,
config.platform_instance,
config.backcompat_skip_source_on_lineage_edge,
)
assertion_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
Expand Down Expand Up @@ -1161,6 +1166,7 @@ def string_map(input_map: Dict[str, Any]) -> Dict[str, str]:
environment=self.config.env,
disable_dbt_node_creation=self.config.disable_dbt_node_creation,
platform_instance=None,
legacy_skip_source_lineage=self.config.backcompat_skip_source_on_lineage_edge,
)

raw_node = manifest_nodes.get(node.dbt_name)
Expand Down Expand Up @@ -1756,6 +1762,7 @@ def _create_lineage_aspect_for_dbt_node(
self.config.env,
self.config.disable_dbt_node_creation,
self.config.platform_instance,
self.config.backcompat_skip_source_on_lineage_edge,
)

# if a node is of type source in dbt, its upstream lineage should have the corresponding table/view
Expand Down Expand Up @@ -1791,6 +1798,7 @@ def _create_lineage_aspect_for_platform_node(
self.config.env,
self.config.disable_dbt_node_creation,
self.config.platform_instance,
self.config.backcompat_skip_source_on_lineage_edge,
)
if upstream_urns:
return get_upstream_lineage(upstream_urns)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -172,7 +172,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.address,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -181,7 +181,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.city,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.city,PROD)",
"type": "TRANSFORMED"
}
],
Expand Down Expand Up @@ -646,7 +646,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_01,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_01,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -655,7 +655,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -664,7 +664,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -673,7 +673,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_03,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_03,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -682,7 +682,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_04,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_04,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -691,7 +691,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_05,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_05,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -700,7 +700,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_06,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_06,PROD)",
"type": "TRANSFORMED"
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -130,7 +130,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.address,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -139,7 +139,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.city,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.city,PROD)",
"type": "TRANSFORMED"
}
],
Expand Down Expand Up @@ -585,7 +585,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_01,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_01,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -594,7 +594,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -603,7 +603,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -612,7 +612,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_03,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_03,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -621,7 +621,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_04,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_04,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -630,7 +630,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_05,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_05,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -639,7 +639,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_06,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_06,PROD)",
"type": "TRANSFORMED"
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.customer,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -130,7 +130,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.address,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.address,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -139,7 +139,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.city,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.city,PROD)",
"type": "TRANSFORMED"
}
],
Expand Down Expand Up @@ -585,7 +585,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_01,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_01,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -594,7 +594,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -603,7 +603,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_02,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_02,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -612,7 +612,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_03,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_03,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -621,7 +621,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_04,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_04,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -630,7 +630,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_05,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_05,PROD)",
"type": "TRANSFORMED"
},
{
Expand All @@ -639,7 +639,7 @@
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,pagila.public.payment_p2020_06,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payment_p2020_06,PROD)",
"type": "TRANSFORMED"
}
],
Expand Down
Loading

0 comments on commit efc5602

Please sign in to comment.