Skip to content

Commit

Permalink
fix(bigquery): followups on bigquery queries v2 integration (#11291)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Sep 4, 2024
1 parent 2946131 commit 08a5956
Show file tree
Hide file tree
Showing 6 changed files with 560 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
)

self.bigquery_data_dictionary = BigQuerySchemaApi(
report=BigQueryV2Report().schema_api_perf,
report=self.report.schema_api_perf,
projects_client=config.get_projects_client(),
client=config.get_bigquery_client(),
)
Expand Down Expand Up @@ -248,11 +248,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if not projects:
return

if self.config.include_schema_metadata:
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)

if self.config.use_queries_v2:
# Always ingest View and Snapshot lineage with schema ingestion
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")

yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
Expand All @@ -263,6 +263,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.bq_schema_extractor.snapshots_by_ref,
)

# if both usage and lineage are disabled then skip queries extractor piece
if (
not self.config.include_usage_statistics
and not self.config.include_table_lineage
):
return

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

queries_extractor = BigQueryQueriesExtractor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ def have_table_data_read_permission(self) -> bool:
default=True,
description="Option to enable/disable lineage generation. Is enabled by default.",
)

max_query_duration: timedelta = Field(
default=timedelta(minutes=15),
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
Expand Down Expand Up @@ -521,6 +522,30 @@ def have_table_data_read_permission(self) -> bool:
" Set to 1 to disable.",
)

# include_view_lineage and include_view_column_lineage are inherited from SQLCommonConfig
# but not used in bigquery so we hide them from docs.
include_view_lineage: bool = Field(default=True, hidden_from_docs=True)

include_view_column_lineage: bool = Field(default=True, hidden_from_docs=True)

@root_validator(pre=True)
def set_include_schema_metadata(cls, values: Dict) -> Dict:
# Historically this is used to disable schema ingestion
if (
"include_tables" in values
and "include_views" in values
and not values["include_tables"]
and not values["include_views"]
):
values["include_schema_metadata"] = False
values["include_table_snapshots"] = False
logger.info(
"include_tables and include_views are both set to False."
" Disabling schema metadata ingestion for tables, views, and snapshots."
)

return values

@root_validator(skip_on_failure=True)
def profile_default_settings(cls, values: Dict) -> Dict:
# Extra default SQLAlchemy option for better connection pooling and threading.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, Iterator, List, Optional
from functools import lru_cache
from typing import Any, Dict, FrozenSet, Iterable, Iterator, List, Optional

from google.api_core import retry
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
Expand Down Expand Up @@ -175,6 +176,7 @@ def _should_retry(exc: BaseException) -> bool:
)
return resp.result()

@lru_cache(maxsize=1)
def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]:
def _should_retry(exc: BaseException) -> bool:
logger.debug(
Expand Down Expand Up @@ -222,7 +224,8 @@ def _should_retry(exc: BaseException) -> bool:
return []
return projects

def get_projects_with_labels(self, labels: List[str]) -> List[BigqueryProject]:
@lru_cache(maxsize=1)
def get_projects_with_labels(self, labels: FrozenSet[str]) -> List[BigqueryProject]:
with self.report.list_projects_with_labels_timer:
try:
projects = []
Expand Down Expand Up @@ -675,7 +678,9 @@ def query_project_list_from_labels(
report: SourceReport,
filters: BigQueryFilter,
) -> Iterable[BigqueryProject]:
projects = schema_api.get_projects_with_labels(filters.filter_config.project_labels)
projects = schema_api.get_projects_with_labels(
frozenset(filters.filter_config.project_labels)
)

if not projects: # Report failure on exception and if empty list is returned
report.report_failure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ def __init__(

@property
def store_table_refs(self):
return self.config.include_table_lineage or self.config.include_usage_statistics
return (
self.config.include_table_lineage
or self.config.include_usage_statistics
or self.config.use_queries_v2
)

def get_project_workunits(
self, project: BigqueryProject
Expand Down Expand Up @@ -312,15 +316,17 @@ def _process_project(
f"Excluded project '{project_id}' since no datasets were found. {action_message}"
)
else:
yield from self.gen_project_id_containers(project_id)
if self.config.include_schema_metadata:
yield from self.gen_project_id_containers(project_id)
self.report.warning(
title="No datasets found in project",
message=action_message,
context=project_id,
)
return

yield from self.gen_project_id_containers(project_id)
if self.config.include_schema_metadata:
yield from self.gen_project_id_containers(project_id)

self.report.num_project_datasets_to_scan[project_id] = len(
bigquery_project.datasets
Expand Down Expand Up @@ -392,9 +398,10 @@ def _process_schema(
) -> Iterable[MetadataWorkUnit]:
dataset_name = bigquery_dataset.name

yield from self.gen_dataset_containers(
dataset_name, project_id, bigquery_dataset.labels
)
if self.config.include_schema_metadata:
yield from self.gen_dataset_containers(
dataset_name, project_id, bigquery_dataset.labels
)

columns = None

Expand All @@ -404,11 +411,7 @@ def _process_schema(
max_calls=self.config.requests_per_min, period=60
)

if (
self.config.include_tables
or self.config.include_views
or self.config.include_table_snapshots
):
if self.config.include_schema_metadata:
columns = self.schema_api.get_columns_for_dataset(
project_id=project_id,
dataset_name=dataset_name,
Expand All @@ -418,6 +421,27 @@ def _process_schema(
report=self.report,
rate_limiter=rate_limiter,
)
elif self.store_table_refs:
# Need table_refs to calculate lineage and usage
for table_item in self.schema_api.list_tables(dataset_name, project_id):
identifier = BigqueryTableIdentifier(
project_id=project_id,
dataset=dataset_name,
table=table_item.table_id,
)
if not self.config.table_pattern.allowed(identifier.raw_table_name()):
self.report.report_dropped(identifier.raw_table_name())
continue
try:
self.table_refs.add(
str(BigQueryTableRef(identifier).get_sanitized_table_ref())
)
except Exception as e:
logger.warning(
f"Could not create table ref for {table_item.path}: {e}"
)
yield from []
return

if self.config.include_tables:
db_tables[dataset_name] = list(
Expand Down Expand Up @@ -447,25 +471,6 @@ def _process_schema(
)
),
)
elif self.store_table_refs:
# Need table_refs to calculate lineage and usage
for table_item in self.schema_api.list_tables(dataset_name, project_id):
identifier = BigqueryTableIdentifier(
project_id=project_id,
dataset=dataset_name,
table=table_item.table_id,
)
if not self.config.table_pattern.allowed(identifier.raw_table_name()):
self.report.report_dropped(identifier.raw_table_name())
continue
try:
self.table_refs.add(
str(BigQueryTableRef(identifier).get_sanitized_table_ref())
)
except Exception as e:
logger.warning(
f"Could not create table ref for {table_item.path}: {e}"
)

if self.config.include_views:
db_views[dataset_name] = list(
Expand Down
Loading

0 comments on commit 08a5956

Please sign in to comment.