Skip to content

Commit

Permalink
feat(ingest/BigQuery): refractor+parallise dataset metadata extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Jul 10, 2024
1 parent cd932c3 commit aecd4ca
Show file tree
Hide file tree
Showing 9 changed files with 1,410 additions and 1,253 deletions.
1,225 changes: 32 additions & 1,193 deletions metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_exported_bigquery_audit_metadata(
rate_limiter = RateLimiter(max_calls=self.requests_per_min, period=60)

with self.report.get_exported_log_entries as current_timer:
self.report.num_get_exported_log_entries_api_requests += 1
for dataset in bigquery_audit_metadata_datasets:
logger.info(
f"Start loading log entries from BigQueryAuditMetadata in {dataset}"
Expand Down Expand Up @@ -115,6 +116,7 @@ def get_bigquery_log_entries_via_gcp_logging(
)

with self.report.list_log_entries as current_timer:
self.report.num_list_log_entries_api_requests += 1
list_entries = client.list_entries(
filter_=filter,
page_size=log_page_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ class BigQueryV2Config(

number_of_datasets_process_in_batch: int = Field(
hidden_from_docs=True,
default=500,
default=10000,
description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care.",
)

number_of_datasets_process_in_batch_if_profiling_enabled: int = Field(
default=200,
default=1000,
description="Number of partitioned table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,32 @@

@dataclass
class BigQuerySchemaApiPerfReport(Report):
num_list_projects: int = 0
num_listed_projects: int = 0
num_list_projects_retry_request: int = 0
num_list_projects_api_requests: int = 0
num_list_datasets_api_requests: int = 0
num_get_columns_for_dataset_api_requests: int = 0
num_get_tables_for_dataset_api_requests: int = 0
num_list_tables_api_requests: int = 0
num_get_views_for_dataset_api_requests: int = 0
num_get_snapshots_for_dataset_api_requests: int = 0

list_projects: PerfTimer = field(default_factory=PerfTimer)
list_datasets: PerfTimer = field(default_factory=PerfTimer)
get_columns_for_dataset: PerfTimer = field(default_factory=PerfTimer)
get_tables_for_dataset: PerfTimer = field(default_factory=PerfTimer)
list_tables: PerfTimer = field(default_factory=PerfTimer)
get_views_for_dataset: PerfTimer = field(default_factory=PerfTimer)
get_snapshots_for_dataset: PerfTimer = field(default_factory=PerfTimer)

get_columns_for_dataset_sec: float = 0
get_tables_for_dataset_sec: float = 0
list_tables_sec: float = 0
get_views_for_dataset_sec: float = 0
get_snapshots_for_dataset_sec: float = 0


@dataclass
class BigQueryAuditLogApiPerfReport(Report):
num_get_exported_log_entries_api_requests: int = 0
get_exported_log_entries: PerfTimer = field(default_factory=PerfTimer)

num_list_log_entries_api_requests: int = 0
list_log_entries: PerfTimer = field(default_factory=PerfTimer)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
Expand All @@ -24,9 +25,11 @@
BigqueryTableType,
)
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.ratelimiter import RateLimiter

logger: logging.Logger = logging.getLogger(__name__)
SCHEMA_PARALLELISM = int(os.getenv("DATAHUB_BIGQUERY_SCHEMA_PARALLELISM", 20))


@dataclass
Expand Down Expand Up @@ -163,6 +166,7 @@ def _should_retry(exc: BaseException) -> bool:
return True

with self.report.list_projects:
self.report.num_list_projects_api_requests += 1
try:
# Bigquery API has limit in calling project.list request i.e. 2 request per second.
# https://cloud.google.com/bigquery/quotas#api_request_quotas
Expand All @@ -180,7 +184,7 @@ def _should_retry(exc: BaseException) -> bool:
BigqueryProject(id=p.project_id, name=p.friendly_name)
for p in projects_iterator
]
self.report.num_list_projects = len(projects)
self.report.num_listed_projects = len(projects)
return projects
except Exception as e:
logger.error(f"Error getting projects. {e}", exc_info=True)
Expand All @@ -190,6 +194,7 @@ def get_datasets_for_project_id(
self, project_id: str, maxResults: Optional[int] = None
) -> List[BigqueryDataset]:
with self.report.list_datasets:
self.report.num_list_datasets_api_requests += 1
datasets = self.bq_client.list_datasets(project_id, max_results=maxResults)
return [
BigqueryDataset(name=d.dataset_id, labels=d.labels) for d in datasets
Expand Down Expand Up @@ -222,10 +227,12 @@ def get_datasets_for_project_id_with_information_schema(
def list_tables(
self, dataset_name: str, project_id: str
) -> Iterator[TableListItem]:
with self.report.list_tables as current_timer:
with PerfTimer() as current_timer:
for table in self.bq_client.list_tables(f"{project_id}.{dataset_name}"):
with current_timer.pause():
yield table
self.report.num_list_tables_api_requests += 1
self.report.list_tables_sec += current_timer.elapsed_seconds()

def get_tables_for_dataset(
self,
Expand All @@ -235,7 +242,7 @@ def get_tables_for_dataset(
with_data_read_permission: bool = False,
report: Optional[BigQueryV2Report] = None,
) -> Iterator[BigqueryTable]:
with self.report.get_tables_for_dataset as current_timer:
with PerfTimer() as current_timer:
filter_clause: str = ", ".join(f"'{table}'" for table in tables.keys())

if with_data_read_permission:
Expand Down Expand Up @@ -284,6 +291,8 @@ def get_tables_for_dataset(
"metadata-extraction",
f"Failed to get table {table_name}: {e}",
)
self.report.num_get_tables_for_dataset_api_requests += 1
self.report.get_tables_for_dataset_sec += current_timer.elapsed_seconds()

@staticmethod
def _make_bigquery_table(
Expand Down Expand Up @@ -332,7 +341,7 @@ def get_views_for_dataset(
has_data_read: bool,
report: BigQueryV2Report,
) -> Iterator[BigqueryView]:
with self.report.get_views_for_dataset as current_timer:
with PerfTimer() as current_timer:
if has_data_read:
# If profiling is enabled
cur = self.get_query_result(
Expand Down Expand Up @@ -361,6 +370,8 @@ def get_views_for_dataset(
"metadata-extraction",
f"Failed to get view {view_name}: {e}",
)
self.report.num_get_views_for_dataset_api_requests += 1
self.report.get_views_for_dataset_sec += current_timer.elapsed_seconds()

@staticmethod
def _make_bigquery_view(view: bigquery.Row) -> BigqueryView:
Expand Down Expand Up @@ -445,7 +456,7 @@ def get_columns_for_dataset(
rate_limiter: Optional[RateLimiter] = None,
) -> Optional[Dict[str, List[BigqueryColumn]]]:
columns: Dict[str, List[BigqueryColumn]] = defaultdict(list)
with self.report.get_columns_for_dataset:
with PerfTimer() as timer:
try:
cur = self.get_query_result(
(
Expand All @@ -468,43 +479,47 @@ def get_columns_for_dataset(

last_seen_table: str = ""
for column in cur:
if (
column_limit
and column.table_name in columns
and len(columns[column.table_name]) >= column_limit
):
if last_seen_table != column.table_name:
logger.warning(
f"{project_id}.{dataset_name}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns"
)
last_seen_table = column.table_name
else:
columns[column.table_name].append(
BigqueryColumn(
name=column.column_name,
ordinal_position=column.ordinal_position,
field_path=column.field_path,
is_nullable=column.is_nullable == "YES",
data_type=column.data_type,
comment=column.comment,
is_partition_column=column.is_partitioning_column == "YES",
cluster_column_position=column.clustering_ordinal_position,
policy_tags=(
list(
self.get_policy_tags_for_column(
project_id,
dataset_name,
column.table_name,
column.column_name,
report,
rate_limiter,
with timer.pause():
if (
column_limit
and column.table_name in columns
and len(columns[column.table_name]) >= column_limit
):
if last_seen_table != column.table_name:
logger.warning(
f"{project_id}.{dataset_name}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns"
)
last_seen_table = column.table_name
else:
columns[column.table_name].append(
BigqueryColumn(
name=column.column_name,
ordinal_position=column.ordinal_position,
field_path=column.field_path,
is_nullable=column.is_nullable == "YES",
data_type=column.data_type,
comment=column.comment,
is_partition_column=column.is_partitioning_column
== "YES",
cluster_column_position=column.clustering_ordinal_position,
policy_tags=(
list(
self.get_policy_tags_for_column(
project_id,
dataset_name,
column.table_name,
column.column_name,
report,
rate_limiter,
)
)
)
if extract_policy_tags_from_catalog
else []
),
if extract_policy_tags_from_catalog
else []
),
)
)
)
self.report.num_get_columns_for_dataset_api_requests += 1
self.report.get_columns_for_dataset_sec += timer.elapsed_seconds()

return columns

Expand Down Expand Up @@ -554,7 +569,7 @@ def get_snapshots_for_dataset(
has_data_read: bool,
report: BigQueryV2Report,
) -> Iterator[BigqueryTableSnapshot]:
with self.report.get_snapshots_for_dataset as current_timer:
with PerfTimer() as current_timer:
if has_data_read:
# If profiling is enabled
cur = self.get_query_result(
Expand Down Expand Up @@ -583,6 +598,8 @@ def get_snapshots_for_dataset(
"metadata-extraction",
f"Failed to get view {snapshot_name}: {e}",
)
self.report.num_get_snapshots_for_dataset_api_requests += 1
self.report.get_snapshots_for_dataset_sec += current_timer.elapsed_seconds()

@staticmethod
def _make_bigquery_table_snapshot(snapshot: bigquery.Row) -> BigqueryTableSnapshot:
Expand Down
Loading

0 comments on commit aecd4ca

Please sign in to comment.