Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/bq): integrate bigquery-queries into main source #11247

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
)
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.queries_extractor import (
BigQueryQueriesExtractor,
BigQueryQueriesExtractorConfig,
)
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
Expand All @@ -51,6 +55,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.ingestion.source_report.ingestion_stage import QUERIES_EXTRACTION
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.utilities.registries.domain_registry import DomainRegistry

Expand Down Expand Up @@ -139,6 +144,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.lineage_extractor = BigqueryLineageExtractor(
config,
self.report,
schema_resolver=self.sql_parser_schema_resolver,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_lineage_run_skip_handler,
)
Expand Down Expand Up @@ -196,7 +202,9 @@ def test_connection(config_dict: dict) -> TestConnectionReport:

def _init_schema_resolver(self) -> SchemaResolver:
schema_resolution_required = (
self.config.lineage_parse_view_ddl or self.config.lineage_use_sql_parser
self.config.use_queries_v2
or self.config.lineage_parse_view_ddl
or self.config.lineage_use_sql_parser
)
schema_ingestion_enabled = (
self.config.include_schema_metadata
Expand Down Expand Up @@ -244,22 +252,54 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)

if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
)
if self.config.use_queries_v2:
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")

if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.sql_parser_schema_resolver,
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

queries_extractor = BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
top_n_queries=self.config.usage.top_n_queries,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
)
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
)

if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)

def get_report(self) -> BigQueryV2Report:
return self.report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ class BigQueryV2Config(
"enabled.",
)

use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from bigquery.",
)

@property
def have_table_data_read_permission(self) -> bool:
return self.use_tables_list_query_v2 or self.is_profiling_enabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
BigQueryIdentifierConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQueryQueriesExtractorReport,
BigQuerySchemaApiPerfReport,
)
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigQuerySchemaApi
Expand All @@ -25,7 +26,6 @@
from datahub.ingestion.source.bigquery_v2.queries_extractor import (
BigQueryQueriesExtractor,
BigQueryQueriesExtractorConfig,
BigQueryQueriesExtractorReport,
)

logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ class BigQueryProcessingPerfReport(Report):
usage_state_size: Optional[str] = None


@dataclass
class BigQueryQueriesExtractorReport(Report):
query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer)
sql_aggregator: Optional[SqlAggregatorReport] = None
num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict)

num_total_queries: int = 0
num_unique_queries: int = 0


@dataclass
class BigQueryV2Report(
ProfilingSqlReport,
Expand Down Expand Up @@ -143,10 +155,8 @@ class BigQueryV2Report(

snapshots_scanned: int = 0

num_view_definitions_parsed: int = 0
num_view_definitions_failed_parsing: int = 0
num_view_definitions_failed_column_parsing: int = 0
view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList)
# view lineage
sql_aggregator: Optional[SqlAggregatorReport] = None

read_reasons_stat: Counter[str] = field(default_factory=collections.Counter)
operation_types_stat: Counter[str] = field(default_factory=collections.Counter)
Expand All @@ -171,8 +181,7 @@ class BigQueryV2Report(
usage_end_time: Optional[datetime] = None
stateful_usage_ingestion_enabled: bool = False

# lineage/usage v2
sql_aggregator: Optional[SqlAggregatorReport] = None
queries_extractor: Optional[BigQueryQueriesExtractorReport] = None

def set_ingestion_stage(self, project_id: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{project_id}: {stage}")
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ def lineage_capability_test(
report: BigQueryV2Report,
) -> CapabilityReport:
lineage_extractor = BigqueryLineageExtractor(
connection_conf, report, BigQueryIdentifierBuilder(connection_conf, report)
connection_conf,
report,
schema_resolver=SchemaResolver(platform="bigquery"),
identifiers=BigQueryIdentifierBuilder(connection_conf, report),
)
for project_id in project_ids:
try:
Expand Down
Loading
Loading