Skip to content

Commit

Permalink
review comments + queries for operation without lineage
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Aug 13, 2024
1 parent d997bb6 commit 6d14175
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 49 deletions.
3 changes: 1 addition & 2 deletions metadata-ingestion/docs/sources/bigquery/bigquery_recipe.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
source:
type: bigquery
config:
# `schema_pattern` for BQ Datasets
schema_pattern:
dataset_pattern:
allow:
- finance_bq_dataset
table_pattern:
Expand Down
22 changes: 18 additions & 4 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from avrogen.dict_wrapper import DictWrapper

from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION
from datahub.metadata._urns.urn_defs import CorpGroupUrn, CorpUserUrn
from datahub.metadata.schema_classes import (
AssertionKeyClass,
AuditStampClass,
Expand Down Expand Up @@ -86,13 +87,11 @@ def get_sys_time() -> int:


@overload
def make_ts_millis(ts: None) -> None:
...
def make_ts_millis(ts: None) -> None: ...


@overload
def make_ts_millis(ts: datetime) -> int:
...
def make_ts_millis(ts: datetime) -> int: ...


def make_ts_millis(ts: Optional[datetime]) -> Optional[int]:
Expand Down Expand Up @@ -224,6 +223,21 @@ def make_user_urn(username: str) -> str:
)


def make_actor_urn(actor: str) -> Union[CorpUserUrn, CorpGroupUrn]:
"""
Makes a user urn if the input is not a user or group urn already
"""
return (
CorpUserUrn(actor)
if not actor.startswith(("urn:li:corpuser:", "urn:li:corpGroup:"))
else (
CorpUserUrn.create_from_string(actor)
if actor.startswith("urn:li:corpuser:")
else CorpGroupUrn.create_from_string(actor)
)
)


def make_group_urn(groupname: str) -> str:
"""
Makes a group urn if the input is not a user or group urn already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def sharded_table_pattern_is_a_valid_regexp(cls, v):
re.compile(v)
except Exception as e:
raise ValueError(
f"sharded_table_pattern configuration pattern is invalid. The exception was: {e}"
)
"sharded_table_pattern configuration pattern is invalid."
) from e
return v

@root_validator(pre=True, skip_on_failure=True)
Expand Down Expand Up @@ -229,6 +229,12 @@ class BigQueryFilterConfig(SQLFilterConfig):
description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)

# NOTE: `schema_pattern` is added here only to hide it from docs.
schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
hidden_from_docs=True,
)

@root_validator(pre=False, skip_on_failure=True)
def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
dataset_pattern: Optional[AllowDenyPattern] = values.get("dataset_pattern")
Expand Down Expand Up @@ -301,6 +307,7 @@ class BigQueryV2Config(
BigQueryConnectionConfig,
BigQueryBaseConfig,
BigQueryFilterConfig,
# BigQueryFilterConfig must come before (higher precedence) the SQLCommon config, so that the documentation overrides are applied.
BigQueryIdentifierConfig,
SQLCommonConfig,
StatefulUsageConfigMixin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def __init__(
# self.filters = filters
self.discovered_tables = discovered_tables

self._structured_report = structured_report
self.structured_report = structured_report

self.aggregator = SqlParsingAggregator(
platform=self.identifiers.platform,
Expand All @@ -175,10 +175,6 @@ def __init__(
)
self.report.sql_aggregator = self.aggregator.report

@property
def structured_report(self) -> SourceReport:
return self._structured_report

@functools.cached_property
def local_temp_path(self) -> pathlib.Path:
if self.config.local_temp_path:
Expand Down Expand Up @@ -340,7 +336,10 @@ def _parse_audit_log_row(self, row: BigQueryJob) -> ObservedQuery:
timestamp: datetime = row["creation_time"]
timestamp = timestamp.astimezone(timezone.utc)

# https://cloud.google.com/bigquery/docs/multi-statement-queries
# Usually bigquery identifiers are always referred as <dataset>.<table> and only
# temporary tables are referred as <table> alone without project or dataset name.
# Note that temporary tables can also be referenced using _SESSION.<table>
# More details here - https://cloud.google.com/bigquery/docs/multi-statement-queries
# Also _ at start considers this as temp dataset as per `temp_table_dataset_prefix` config
TEMP_TABLE_QUALIFIER = "_SESSION"

Expand Down Expand Up @@ -373,27 +372,26 @@ def _build_enriched_query_log_query(

# List of all statement types
# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.QueryStatementType
UNSUPPORTED_STATEMENT_TYPES = [
# procedure
"CREATE_PROCEDURE",
"DROP_PROCEDURE",
"CALL",
"SCRIPT", # individual statements in executed procedure are present as separate jobs
# schema
"CREATE_SCHEMA",
"DROP_SCHEMA",
# function
"CREATE_FUNCTION",
"CREATE_TABLE_FUNCTION",
"DROP_FUNCTION",
# policies
"CREATE_ROW_ACCESS_POLICY",
"DROP_ROW_ACCESS_POLICY",
]

unsupported_statement_types = ",".join(
[
f"'{statement_type}'"
for statement_type in [
# procedure
"CREATE_PROCEDURE",
"DROP_PROCEDURE",
"CALL",
"SCRIPT", # individual statements in executed procedure are present as separate jobs
# schema
"CREATE_SCHEMA",
"DROP_SCHEMA",
# function
"CREATE_FUNCTION",
"CREATE_TABLE_FUNCTION",
"DROP_FUNCTION",
# policies
"CREATE_ROW_ACCESS_POLICY",
"DROP_ROW_ACCESS_POLICY",
]
]
[f"'{statement_type}'" for statement_type in UNSUPPORTED_STATEMENT_TYPES]
)

# NOTE the use of partition column creation_time as timestamp here.
Expand All @@ -402,7 +400,7 @@ def _build_enriched_query_log_query(
# total_bytes_billed, dml_statistics(inserted_row_count, etc) that may be fetched
# as required in future. Refer below link for list of all columns
# https://cloud.google.com/bigquery/docs/information-schema-jobs#schema
return f"""
return f"""\
SELECT
job_id,
project_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mce_builder import get_sys_time, make_ts_millis
from datahub.emitter.mce_builder import get_sys_time, make_actor_urn, make_ts_millis
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import compute_upstream_fields
from datahub.ingestion.api.closeable import Closeable
Expand All @@ -25,6 +25,7 @@
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig, UsageAggregator
from datahub.metadata.urns import (
CorpGroupUrn,
CorpUserUrn,
DataPlatformUrn,
DatasetUrn,
Expand Down Expand Up @@ -107,7 +108,7 @@ class QueryMetadata:
query_type: QueryType
lineage_type: str # from models.DatasetLineageTypeClass
latest_timestamp: Optional[datetime]
actor: Optional[CorpUserUrn]
actor: Optional[Union[CorpUserUrn, CorpGroupUrn]]

upstreams: List[UrnStr] # this is direct upstreams, which may be temp tables
column_lineage: List[ColumnLineageInfo]
Expand Down Expand Up @@ -166,7 +167,7 @@ class PreparsedQuery:
confidence_score: float = 1.0

query_count: int = 1
user: Optional[CorpUserUrn] = None
user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None
timestamp: Optional[datetime] = None
session_id: str = _MISSING_SESSION_ID
query_type: QueryType = QueryType.UNKNOWN
Expand Down Expand Up @@ -494,7 +495,7 @@ def add(
session_id=item.session_id,
usage_multiplier=item.usage_multiplier,
query_timestamp=item.timestamp,
user=CorpUserUrn.from_string(item.user) if item.user else None,
user=make_actor_urn(item.user) if item.user else None,
query_hash=item.query_hash,
)
else:
Expand Down Expand Up @@ -632,7 +633,7 @@ def add_observed_query(
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
query_timestamp: Optional[datetime] = None,
user: Optional[CorpUserUrn] = None,
user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None,
session_id: Optional[
str
] = None, # can only see temp tables with the same session
Expand Down Expand Up @@ -929,7 +930,7 @@ def _run_sql_parser(
schema_resolver: SchemaResolverInterface,
session_id: str = _MISSING_SESSION_ID,
timestamp: Optional[datetime] = None,
user: Optional[CorpUserUrn] = None,
user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None,
) -> SqlParsingResult:
with self.report.sql_parsing_timer:
parsed = sqlglot_lineage(
Expand Down Expand Up @@ -1015,7 +1016,7 @@ def gen_metadata(self) -> Iterable[MetadataChangeProposalWrapper]:
yield from self._gen_lineage_mcps(queries_generated)
yield from self._gen_remaining_queries(queries_generated)
yield from self._gen_usage_statistics_mcps()
yield from self._gen_operation_mcps()
yield from self._gen_operation_mcps(queries_generated)

def _gen_lineage_mcps(
self, queries_generated: Set[QueryId]
Expand Down Expand Up @@ -1455,14 +1456,22 @@ def _gen_usage_statistics_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
# TODO: We should change the usage aggregator to return MCPWs directly.
yield cast(MetadataChangeProposalWrapper, wu.metadata)

def _gen_operation_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
def _gen_operation_mcps(
self, queries_generated: Set[QueryId]
) -> Iterable[MetadataChangeProposalWrapper]:
if not self.generate_operations:
return

for downstream_urn, query_ids in self._lineage_map.items():
for query_id in query_ids:
yield from self._gen_operation_for_downstream(downstream_urn, query_id)

# Avoid generating the same query twice.
if query_id in queries_generated:
continue
queries_generated.add(query_id)
yield from self._gen_query(self._query_map[query_id], downstream_urn)

def _gen_operation_for_downstream(
self, downstream_urn: UrnStr, query_id: QueryId
) -> Iterable[MetadataChangeProposalWrapper]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"json": {
"timestampMillis": 1707182625000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"operationType": "CREATE",
"customProperties": {
Expand All @@ -18,5 +18,60 @@
"lastUpdatedTimestamp": 1707182625000
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:f2e61c641cf14eae74147b6280ae40648516c4b7b58cfca6c4f7fb14ab255ce2",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "CREATE OR REPLACE TABLE `dataset.foo` (\n date_utc TIMESTAMP,\n revenue INT64\n)",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:f2e61c641cf14eae74147b6280ae40648516c4b7b58cfca6c4f7fb14ab255ce2",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.dataset.foo,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.dataset.foo,PROD),date_utc)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.dataset.foo,PROD),revenue)"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:f2e61c641cf14eae74147b6280ae40648516c4b7b58cfca6c4f7fb14ab255ce2",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -535,10 +535,6 @@ def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> N
generate_operations=False,
)

aggregator._schema_resolver.add_raw_schema_info(
DatasetUrn("redshift", "dev.public.bar").urn(),
{"a": "int", "b": "int", "c": "int"},
)
aggregator.add_observed_query(
query="create table derived_from_foo as select * from foo",
default_db="dev",
Expand Down
26 changes: 26 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,3 +1212,29 @@ def get_datasets_for_project_id_side_effect(
config = BigQueryV2Config.parse_obj({**base_config, "exclude_empty_projects": True})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-2"))
assert len({wu.metadata.entityUrn for wu in source.get_workunits()}) == 1 # type: ignore


def test_bigquery_config_deprecated_schema_pattern():
base_config = {
"include_usage_statistics": False,
"include_table_lineage": False,
}

config = BigQueryV2Config.parse_obj(base_config)
assert config.dataset_pattern == AllowDenyPattern(allow=[".*"]) # default

config_with_schema_pattern = {
**base_config,
"schema_pattern": AllowDenyPattern(deny=[".*"]),
}
config = BigQueryV2Config.parse_obj(config_with_schema_pattern)
assert config.dataset_pattern == AllowDenyPattern(deny=[".*"]) # schema_pattern

config_with_dataset_pattern = {
**base_config,
"dataset_pattern": AllowDenyPattern(deny=["temp.*"]),
}
config = BigQueryV2Config.parse_obj(config_with_dataset_pattern)
assert config.dataset_pattern == AllowDenyPattern(
deny=["temp.*"]
) # dataset_pattern

0 comments on commit 6d14175

Please sign in to comment.