Skip to content

Commit

Permalink
feat(ingest): enable query usage stats by default (#11281)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 6, 2024
1 parent 455c90f commit 9589ade
Show file tree
Hide file tree
Showing 7 changed files with 7,297 additions and 347 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ disallow_untyped_defs = yes

[tool:pytest]
asyncio_mode = auto
addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers
addopts = --cov=src --cov-report= --cov-config setup.cfg --strict-markers -p no:faker
markers =
slow: marks tests that are slow to run, including all docker-based tests (deselect with '-m not slow')
integration: marks all integration tests, across all batches (deselect with '-m "not integration"')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig):
include_lineage: bool = True
include_queries: bool = True
include_usage_statistics: bool = True
include_query_usage_statistics: bool = False
include_query_usage_statistics: bool = True
include_operations: bool = True

region_qualifiers: List[str] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pydantic
from typing_extensions import Self

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
BucketDuration,
Expand Down Expand Up @@ -67,8 +67,16 @@ class SnowflakeQueriesExtractorConfig(ConfigModel):
# TODO: Support stateful ingestion for the time windows.
window: BaseTimeWindowConfig = BaseTimeWindowConfig()

# TODO: make this a proper allow/deny pattern
deny_usernames: List[str] = []
pushdown_deny_usernames: List[str] = pydantic.Field(
default=[],
description="List of snowflake usernames which will not be considered for lineage/usage/queries extraction. "
"This is primarily useful for improving performance by filtering out users with extremely high query volumes.",
)

user_email_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for user emails to filter in usage.",
)

temporary_tables_pattern: List[str] = pydantic.Field(
default=DEFAULT_TEMP_TABLES_PATTERNS,
Expand All @@ -88,7 +96,7 @@ class SnowflakeQueriesExtractorConfig(ConfigModel):
include_lineage: bool = True
include_queries: bool = True
include_usage_statistics: bool = True
include_query_usage_statistics: bool = False
include_query_usage_statistics: bool = True
include_operations: bool = True


Expand Down Expand Up @@ -150,6 +158,7 @@ def __init__(
bucket_duration=self.config.window.bucket_duration,
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
user_email_pattern=self.config.user_email_pattern,
# TODO make the rest of the fields configurable
),
generate_operations=self.config.include_operations,
Expand Down Expand Up @@ -281,7 +290,7 @@ def fetch_query_log(
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
bucket_duration=self.config.window.bucket_duration,
deny_usernames=self.config.deny_usernames,
deny_usernames=self.config.pushdown_deny_usernames,
)

with self.structured_reporter.report_exc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_stats,
include_operations=self.config.include_operational_stats,
user_email_pattern=self.config.user_email_pattern,
),
structured_report=self.report,
filters=self.filters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class SqlAggregatorReport(Report):
# Usage-related.
usage_skipped_missing_timestamp: int = 0
num_query_usage_stats_generated: int = 0
num_query_usage_stats_outside_window: int = 0

# Operation-related.
num_operations_generated: int = 0
Expand Down Expand Up @@ -432,6 +433,7 @@ def _need_schemas(self) -> bool:
or self.generate_usage_statistics
or self.generate_queries
or self.generate_operations
or self.generate_query_usage_statistics
)

def register_schema(
Expand Down Expand Up @@ -1033,9 +1035,9 @@ def gen_metadata(self) -> Iterable[MetadataChangeProposalWrapper]:
queries_generated: Set[QueryId] = set()

yield from self._gen_lineage_mcps(queries_generated)
yield from self._gen_remaining_queries(queries_generated)
yield from self._gen_usage_statistics_mcps()
yield from self._gen_operation_mcps(queries_generated)
yield from self._gen_remaining_queries(queries_generated)

def _gen_lineage_mcps(
self, queries_generated: Set[QueryId]
Expand Down Expand Up @@ -1322,9 +1324,15 @@ def _gen_query(
query_counter = self._query_usage_counts.get(query_id)
if not query_counter:
return
for bucket in self.usage_config.buckets():
count = query_counter.get(bucket)
if not count:

all_buckets = self.usage_config.buckets()

for bucket, count in query_counter.items():
if bucket not in all_buckets:
# What happens if we get a query with a timestamp that's outside our configured window?
# Theoretically this should never happen, since the audit logs are also fetched
# for the window. However, it's useful to have reporting for it, just in case.
self.report.num_query_usage_stats_outside_window += 1
continue

yield MetadataChangeProposalWrapper(
Expand Down
Loading

0 comments on commit 9589ade

Please sign in to comment.