Skip to content

Commit

Permalink
feat(mode/ingest): Correct lineage handling for imported datasets by …
Browse files Browse the repository at this point in the history
…ensuring proper URN usage and establishing correct downstream relationships
  • Loading branch information
sagar-salvi-apptware committed Sep 5, 2024
1 parent 1f3688a commit 5286728
Show file tree
Hide file tree
Showing 9 changed files with 591 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class BIAssetSubTypes(str, Enum):

# Mode
MODE_REPORT = "Report"
MODE_DATASET = "Dataset"
MODE_QUERY = "Query"
MODE_CHART = "Chart"

Expand Down
109 changes: 88 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,25 @@ def construct_dashboard(
# Last refreshed ts.
last_refreshed_ts = self._parse_last_run_at(report_info)

# Datasets
datasets = []
for imported_dataset_name in report_info.get("imported_datasets", {}):
mode_dataset = self._get_request_json(
f"{self.workspace_uri}/reports/{imported_dataset_name.get('token')}"
)
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
str(mode_dataset.get("id")),
platform_instance=None,
env=self.config.env,
)
datasets.append(dataset_urn)

dashboard_info_class = DashboardInfoClass(
description=description if description else "",
title=title if title else "",
charts=self._get_chart_urns(report_token),
datasets=datasets if datasets else None,
lastModified=last_modified,
lastRefreshed=last_refreshed_ts,
dashboardUrl=f"{self.config.connect_uri}/{self.config.workspace}/reports/{report_token}",
Expand Down Expand Up @@ -725,6 +740,10 @@ def _get_platform_and_dbname(
data_source.get("adapter", ""), data_source.get("name", "")
)
database = data_source.get("database", "")
# This is hacky but on bigquery we want to change the database if its default
# For lineage we need project_id.db.table
if platform == "bigquery" and database == "default":
database = data_source.get("host", "")
return platform, database
else:
self.report.report_warning(
Expand Down Expand Up @@ -900,18 +919,24 @@ def normalize_mode_query(self, query: str) -> str:

return rendered_query

def construct_query_from_api_data(
def construct_query_or_dataset(
self,
report_token: str,
query_data: dict,
space_token: str,
report_info: dict,
is_mode_dataset: bool = False,
) -> Iterable[MetadataWorkUnit]:
query_urn = self.get_dataset_urn_from_query(query_data)
query_urn = (
self.get_dataset_urn_from_query(query_data)
if not is_mode_dataset
else self.get_dataset_urn_from_query(report_info)
)

query_token = query_data.get("token")

dataset_props = DatasetPropertiesClass(
name=query_data.get("name"),
name=report_info.get("name") if is_mode_dataset else query_data.get("name"),
description=f"""### Source Code
``` sql
{query_data.get("raw_query")}
Expand Down Expand Up @@ -939,7 +964,15 @@ def construct_query_from_api_data(
).as_workunit()
)

subtypes = SubTypesClass(typeNames=([BIAssetSubTypes.MODE_QUERY]))
subtypes = SubTypesClass(
typeNames=(
[
BIAssetSubTypes.MODE_DATASET
if is_mode_dataset
else BIAssetSubTypes.MODE_QUERY
]
)
)
yield (
MetadataChangeProposalWrapper(
entityUrn=query_urn,
Expand All @@ -958,7 +991,6 @@ def construct_query_from_api_data(
upstream_warehouse_platform,
upstream_warehouse_db_name,
) = self._get_platform_and_dbname(query_data.get("data_source_id"))

if upstream_warehouse_platform is None:
# this means we can't infer the platform
return
Expand Down Expand Up @@ -1022,7 +1054,7 @@ def construct_query_from_api_data(
schema_fields = infer_output_schema(parsed_query_object)
if schema_fields:
schema_metadata = SchemaMetadataClass(
schemaName="mode_query",
schemaName="mode_dataset" if is_mode_dataset else "mode_query",
platform=f"urn:li:dataPlatform:{self.platform}",
version=0,
fields=schema_fields,
Expand All @@ -1040,7 +1072,7 @@ def construct_query_from_api_data(
)

yield from self.get_upstream_lineage_for_parsed_sql(
query_data, parsed_query_object
query_urn, query_data, parsed_query_object
)

operation = OperationClass(
Expand Down Expand Up @@ -1089,10 +1121,9 @@ def construct_query_from_api_data(
).as_workunit()

def get_upstream_lineage_for_parsed_sql(
self, query_data: dict, parsed_query_object: SqlParsingResult
self, query_urn: str, query_data: dict, parsed_query_object: SqlParsingResult
) -> List[MetadataWorkUnit]:
wu = []
query_urn = self.get_dataset_urn_from_query(query_data)

if parsed_query_object is None:
logger.info(
Expand Down Expand Up @@ -1350,6 +1381,24 @@ def _get_reports(self, space_token: str) -> List[dict]:
)
return reports

@lru_cache(maxsize=None)
def _get_datasets(self, space_token: str) -> List[dict]:
"""
Retrieves datasets for a given space token.
"""
datasets = []
try:
url = f"{self.workspace_uri}/spaces/{space_token}/datasets"
datasets_json = self._get_request_json(url)
datasets = datasets_json.get("_embedded", {}).get("reports", [])
except HTTPError as http_error:
self.report.report_failure(
title="Failed to Retrieve Datasets for Space",
message=f"Unable to retrieve datasets for space token {space_token}.",
context=f"Error: {str(http_error)}",
)
return datasets

@lru_cache(maxsize=None)
def _get_queries(self, report_token: str) -> list:
queries = []
Expand Down Expand Up @@ -1523,20 +1572,9 @@ def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
for report in reports:
report_token = report.get("token", "")

if report.get("imported_datasets"):
# The connector doesn't support imported datasets yet.
# For now, we just keep this in the report to track what we're missing.
imported_datasets = [
imported_dataset.get("name") or str(imported_dataset)
for imported_dataset in report["imported_datasets"]
]
self.report.dropped_imported_datasets.setdefault(
report_token, LossyList()
).extend(imported_datasets)

queries = self._get_queries(report_token)
for query in queries:
query_mcps = self.construct_query_from_api_data(
query_mcps = self.construct_query_or_dataset(
report_token,
query,
space_token=space_token,
Expand Down Expand Up @@ -1566,6 +1604,34 @@ def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
query_name=query["name"],
)

def emit_dataset_mces(self):
"""
Emits MetadataChangeEvents (MCEs) for datasets within each space.
"""
for space_token, _ in self.space_tokens.items():
datasets = self._get_datasets(space_token)

for report in datasets:
report_token = report.get("token", "")
queries = self._get_queries(report_token)
for query in queries:
query_mcps = self.construct_query_or_dataset(
report_token,
query,
space_token=space_token,
report_info=report,
is_mode_dataset=True,
)
chart_fields: Dict[str, SchemaFieldClass] = {}
for wu in query_mcps:
if isinstance(
wu.metadata, MetadataChangeProposalWrapper
) and isinstance(wu.metadata.aspect, SchemaMetadataClass):
schema_metadata = wu.metadata.aspect
for field in schema_metadata.fields:
chart_fields.setdefault(field.fieldPath, field)
yield wu

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "ModeSource":
config: ModeConfig = ModeConfig.parse_obj(config_dict)
Expand All @@ -1581,6 +1647,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.emit_dashboard_mces()
yield from self.emit_dataset_mces()
yield from self.emit_chart_mces()

def get_report(self) -> SourceReport:
Expand Down
107 changes: 98 additions & 9 deletions metadata-ingestion/tests/integration/mode/mode_mces_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@
"json": {
"timestampMillis": 1638860400000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"viewsCount": 6
}
Expand Down Expand Up @@ -173,7 +173,9 @@
"charts": [
"urn:li:chart:(mode,f622b9ee725b)"
],
"datasets": [],
"datasets": [
"urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)"
],
"lastModified": {
"created": {
"time": 1639169724316,
Expand Down Expand Up @@ -243,6 +245,77 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {
"updated_at": "2024-09-02T07:40:44.046Z",
"last_run_id": "3535709679",
"data_source_id": "44763",
"report_imports_count": "2"
},
"externalUrl": "https://app.mode.com/acryl/reports/24f66e1701b6/details/queries/9b2f34343531",
"name": "Dataset 1",
"description": "### Source Code\n``` sql\n-- Returns first 100 rows from DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY\n SELECT \n\t\tAGE,\n\t\tID,\n\t\tNAME,\n\t\t_FIVETRAN_DELETED,\n\t\t_FIVETRAN_SYNCED\n FROM DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY LIMIT 100;\n\n-- Returns first 100 rows from ETHAN_TEST_DB.PUBLIC.ACCOUNT_PHONE_NUMBER\n SELECT \n\t\tCOMMUNICATION_ACCOUNT_ID,\n\t\tID,\n\t\tMMS_CAPABLE,\n\t\tPHONE_NUMBER,\n\t\tSMS_CAPABLE,\n\t\tSTATUS,\n\t\tSTATUS_TLM,\n\t\tTLM,\n\t\tVOICE_CAPABLE,\n\t\tWHEN_CREATED\n FROM ETHAN_TEST_DB.PUBLIC.ACCOUNT_PHONE_NUMBER LIMIT 100;\n \n \n```\n ",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1638860400000,
"runId": "mode-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Dataset"
]
}
},
"systemMetadata": {
"lastObserved": 1638860400000,
"runId": "mode-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "acryl"
},
{
"id": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
"urn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a"
},
{
"id": "urn:li:dashboard:(mode,5450544)",
"urn": "urn:li:dashboard:(mode,5450544)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1638860400000,
"runId": "mode-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD)",
Expand Down Expand Up @@ -643,8 +716,8 @@
"json": {
"timestampMillis": 1638860400000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"operationType": "UPDATE",
"lastUpdatedTimestamp": 1639177973273
Expand Down Expand Up @@ -721,9 +794,9 @@
"json": {
"fields": [
{
"schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),payment_date)",
"schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),amount)",
"schemaField": {
"fieldPath": "payment_date",
"fieldPath": "amount",
"nullable": false,
"type": {
"type": {
Expand All @@ -743,9 +816,9 @@
}
},
{
"schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),amount)",
"schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),payment_date)",
"schemaField": {
"fieldPath": "amount",
"fieldPath": "payment_date",
"nullable": false,
"type": {
"type": {
Expand Down Expand Up @@ -943,6 +1016,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1638860400000,
"runId": "mode-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:10149707.34499.1897576958",
Expand Down
Loading

0 comments on commit 5286728

Please sign in to comment.