Skip to content

Commit

Permalink
add golden file, refractors
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Aug 8, 2024
1 parent 8f8b7f5 commit d997bb6
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import tempfile
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, Iterable, List, MutableMapping, Optional, TypedDict
from typing import Dict, Iterable, List, Optional, TypedDict

from google.cloud.bigquery import Client
from pydantic import Field
Expand Down Expand Up @@ -109,7 +109,8 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig):

region_qualifiers: List[str] = Field(
default=["region-us", "region-eu"],
description="BigQuery regions to be scanned for bigquery jobs. See [this](https://cloud.google.com/bigquery/docs/information-schema-jobs) for details.",
description="BigQuery regions to be scanned for bigquery jobs. "
"See [this](https://cloud.google.com/bigquery/docs/information-schema-jobs#scope_and_syntax) for details.",
)


Expand Down Expand Up @@ -234,55 +235,64 @@ def get_workunits_internal(
for entry in self.fetch_query_log(project):
self.report.num_queries_by_project[project.id] += 1
queries.append(entry)
self.report.num_total_queries = len(queries)
self.report.num_total_queries = len(queries)

with self.report.audit_log_preprocessing_timer:
# Preprocessing stage that deduplicates the queries using query hash per usage bucket
queries_deduped: MutableMapping[str, Dict[int, ObservedQuery]]
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]]
queries_deduped = self.deduplicate_queries(queries)
self.report.num_unique_queries = len(queries_deduped)

with self.report.audit_log_load_timer:
i = 0
# Is FileBackedDict OrderedDict ? i.e. keys / values are retrieved in same order as added ?
# Does aggregator expect to see queries in same order as they were executed ?
for query_instances in queries_deduped.values():
for _, query in query_instances.items():
if i > 0 and i % 1000 == 0:
if i > 0 and i % 10000 == 0:
logger.info(f"Added {i} query log entries to SQL aggregator")

logger.info(f"{query.query_hash}, {query.timestamp}")
self.aggregator.add(query)
i += 1

yield from auto_workunit(self.aggregator.gen_metadata())

def deduplicate_queries(
self, queries: FileBackedList[ObservedQuery]
) -> MutableMapping[str, Dict[int, ObservedQuery]]:
) -> FileBackedDict[Dict[int, ObservedQuery]]:

# This fingerprint based deduplication is done here to reduce performance hit due to
# repetitive sql parsing while adding observed query to aggregator that would otherwise
# parse same query multiple times. In future, aggregator may absorb this deduplication.
# With current implementation, it is possible that "Operation"(e.g. INSERT) is reported
# only once per day, although it may have happened multiple times throughout the day.

queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict()
for query in queries:
time_bucket = (
datetime_to_ts_millis(

for i, query in enumerate(queries):
if i > 0 and i % 10000 == 0:
logger.info(f"Preprocessing completed for {i} query log entries")

# query = ObservedQuery(**asdict(query))

time_bucket = 0
if query.timestamp:
time_bucket = datetime_to_ts_millis(
get_time_bucket(query.timestamp, self.config.window.bucket_duration)
)
if query.timestamp
else 0
)
query_hash = get_query_fingerprint(

# Not using original BQ query hash as it's not always present
query.query_hash = get_query_fingerprint(
query.query, self.identifiers.platform, fast=True
)
query.query_hash = query_hash
if query_hash not in queries_deduped:
queries_deduped[query_hash] = {time_bucket: query}
else:
seen_query = queries_deduped[query_hash]
if time_bucket not in seen_query:
seen_query[time_bucket] = query
else:
observed_query = seen_query[time_bucket]
observed_query.usage_multiplier += 1
observed_query.timestamp = query.timestamp
queries_deduped[query_hash] = seen_query

query_instances = queries_deduped.setdefault(query.query_hash, {})

observed_query = query_instances.setdefault(time_bucket, query)

# If the query already exists for this time bucket, update its attributes
if observed_query is not query:
observed_query.usage_multiplier += 1
observed_query.timestamp = query.timestamp

return queries_deduped

Expand All @@ -292,55 +302,60 @@ def fetch_query_log(self, project: BigqueryProject) -> Iterable[ObservedQuery]:
regions = self.config.region_qualifiers

for region in regions:
# Each region needs to be a different query
query_log_query = _build_enriched_query_log_query(
project_id=project.id,
region=region,
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
)

with self.structured_report.report_exc(
f"Error fetching query log from BigQuery Project {project.id} Region {region}"
f"Error fetching query log from BQ Project {project.id} for {region}"
):
logger.info(
f"Fetching query log from BigQuery Project {project.id} Region {region}"
yield from self.fetch_region_query_log(project, region)

def fetch_region_query_log(
self, project: BigqueryProject, region: str
) -> Iterable[ObservedQuery]:

# Each region needs to be a different query
query_log_query = _build_enriched_query_log_query(
project_id=project.id,
region=region,
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
)

logger.info(f"Fetching query log from BQ Project {project.id} for {region}")
resp = self.connection.query(query_log_query)

for i, row in enumerate(resp):
if i > 0 and i % 1000 == 0:
logger.info(f"Processed {i} query log rows so far")
try:
entry = self._parse_audit_log_row(row)
except Exception as e:
self.structured_report.warning(
"Error parsing query log row",
context=f"{row}",
exc=e,
)
resp = self.connection.query(query_log_query)

for i, row in enumerate(resp):
if i > 0 and i % 1000 == 0:
logger.info(f"Processed {i} query log rows so far")

try:
entry = self._parse_audit_log_row(row)
except Exception as e:
self.structured_report.warning(
"Error parsing query log row",
context=f"{row}",
exc=e,
)
else:
yield entry
else:
yield entry

def _parse_audit_log_row(self, row: BigQueryJob) -> ObservedQuery:
timestamp: datetime = row["creation_time"]
timestamp = timestamp.astimezone(timezone.utc)

# https://cloud.google.com/bigquery/docs/multi-statement-queries
# Also _ at start considers this as temp dataset as per `temp_table_dataset_prefix` config
TEMP_TABLE_QUALIFIER = "_SESSION"

entry = ObservedQuery(
query=row["query"],
session_id=row["session_id"],
timestamp=row["creation_time"],
# TODO: Move user urn generation to BigQueryIdentifierBuilder
user=(
self.identifiers.gen_user_urn(row["user_email"])
if row["user_email"]
else None
),
default_db=row["project_id"],
default_schema=None,
# Not using BQ query hash as it's not always present
# query_hash=row["query_hash"],
default_schema=TEMP_TABLE_QUALIFIER,
query_hash=row["query_hash"],
)

return entry
Expand All @@ -356,26 +371,44 @@ def _build_enriched_query_log_query(
audit_start_time = start_time.strftime(BQ_DATETIME_FORMAT)
audit_end_time = end_time.strftime(BQ_DATETIME_FORMAT)

# NOTE the use of creation_time as timestamp here
# as JOBS table is partitioned by creation_time.
# Using this column filter significantly reduces processed bytes.
# List of all statement types
# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.QueryStatementType
unsupported_statement_types = ",".join(
[
f"'{statement_type}'"
for statement_type in [
# procedure
"CREATE_PROCEDURE",
"DROP_PROCEDURE",
"CALL",
"SCRIPT", # individual statements in executed procedure are present as separate jobs
# schema
"CREATE_SCHEMA",
"DROP_SCHEMA",
# function
"CREATE_FUNCTION",
"CREATE_TABLE_FUNCTION",
"DROP_FUNCTION",
# policies
"CREATE_ROW_ACCESS_POLICY",
"DROP_ROW_ACCESS_POLICY",
]
]
)

# NOTE the use of partition column creation_time as timestamp here.
# Currently, only required columns are fetched. There are more columns such as
# total_slot_ms, statement_type, job_type, destination_table, referenced_tables,
# total_bytes_billed, dml_statistics(inserted_row_count, etc) that may be fetched
# as required in future. Refer below link for list of all columns
# https://cloud.google.com/bigquery/docs/information-schema-jobs#schema
return f"""
SELECT
job_id,
project_id,
creation_time,
start_time,
end_time,
total_slot_ms,
user_email,
statement_type,
job_type,
query,
destination_table,
referenced_tables,
total_bytes_billed,
total_bytes_processed,
dml_statistics,
session_info.session_id as session_id,
query_info.query_hashes.normalized_literals as query_hash
FROM
Expand All @@ -384,6 +417,7 @@ def _build_enriched_query_log_query(
creation_time >= '{audit_start_time}' AND
creation_time <= '{audit_end_time}' AND
error_result is null AND
not CONTAINS_SUBSTR(query, '.INFORMATION_SCHEMA.')
not CONTAINS_SUBSTR(query, '.INFORMATION_SCHEMA.') AND
statement_type not in ({unsupported_statement_types})
ORDER BY creation_time
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.dataset.foo,PROD)",
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"json": {
"timestampMillis": 1707182625000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"operationType": "CREATE",
"customProperties": {
"query_urn": "urn:li:query:f2e61c641cf14eae74147b6280ae40648516c4b7b58cfca6c4f7fb14ab255ce2"
},
"lastUpdatedTimestamp": 1707182625000
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.derived_from_foo,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_39f4adf89c8ad4d6d307b628c82d8260e1c5cd7eb6fb3a8cbb437421f970c16f"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_39f4adf89c8ad4d6d307b628c82d8260e1c5cd7eb6fb3a8cbb437421f970c16f",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "CREATE TEMPORARY TABLE foo AS\nSELECT\n a,\n b + c AS c\nFROM bar;\n\nCREATE TABLE derived_from_foo AS\nSELECT\n *\nFROM foo",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_39f4adf89c8ad4d6d307b628c82d8260e1c5cd7eb6fb3a8cbb437421f970c16f",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.derived_from_foo,PROD)"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_39f4adf89c8ad4d6d307b628c82d8260e1c5cd7eb6fb3a8cbb437421f970c16f",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:redshift"
}
}
}
]
Loading

0 comments on commit d997bb6

Please sign in to comment.