Skip to content

Commit

Permalink
feat(ingest/snowflake): integrate snowflake-queries into main source (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and aviv-julienjehannet committed Jul 25, 2024
1 parent 433ff40 commit ff76d07
Show file tree
Hide file tree
Showing 20 changed files with 616 additions and 482 deletions.
35 changes: 31 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import datetime
import logging
from abc import ABCMeta, abstractmethod
Expand All @@ -10,6 +11,7 @@
Dict,
Generic,
Iterable,
Iterator,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -97,6 +99,7 @@ def report_log(
context: Optional[str] = None,
exc: Optional[BaseException] = None,
log: bool = False,
stacklevel: int = 1,
) -> None:
"""
Report a user-facing warning for the ingestion run.
Expand All @@ -109,7 +112,8 @@ def report_log(
exc: The exception associated with the event. We'll show the stack trace when in debug mode.
"""

stacklevel = 2
# One for this method, and one for the containing report_* call.
stacklevel = stacklevel + 2

log_key = f"{title}-{message}"
entries = self._entries[level]
Expand All @@ -118,6 +122,8 @@ def report_log(
context = f"{context[:_MAX_CONTEXT_STRING_LENGTH]} ..."

log_content = f"{message} => {context}" if context else message
if title:
log_content = f"{title}: {log_content}"
if exc:
log_content += f"{log_content}: {exc}"

Expand Down Expand Up @@ -255,9 +261,10 @@ def report_failure(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=False
StructuredLogLevel.ERROR, message, title, context, exc, log=log
)

def failure(
Expand All @@ -266,9 +273,10 @@ def failure(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=True
StructuredLogLevel.ERROR, message, title, context, exc, log=log
)

def info(
Expand All @@ -277,11 +285,30 @@ def info(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.INFO, message, title, context, exc, log=True
StructuredLogLevel.INFO, message, title, context, exc, log=log
)

@contextlib.contextmanager
def report_exc(
self,
message: LiteralString,
title: Optional[LiteralString] = None,
context: Optional[str] = None,
level: StructuredLogLevel = StructuredLogLevel.ERROR,
) -> Iterator[None]:
# Convenience method that helps avoid boilerplate try/except blocks.
# TODO: I'm not super happy with the naming here - it's not obvious that this
# suppresses the exception in addition to reporting it.
try:
yield
except Exception as exc:
self._structured_logs.report_log(
level, message=message, title=title, context=context, exc=exc
)

def __post_init__(self) -> None:
self.start_time = datetime.datetime.now()
self.running_time: datetime.timedelta = datetime.timedelta(seconds=0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeIdentifierConfig,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeIdentifierMixin
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeIdentifierBuilder,
)
from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
AssertionResult,
AssertionResultType,
Expand All @@ -40,23 +39,20 @@ class DataQualityMonitoringResult(BaseModel):
VALUE: int


class SnowflakeAssertionsHandler(SnowflakeIdentifierMixin):
class SnowflakeAssertionsHandler:
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
connection: SnowflakeConnection,
identifiers: SnowflakeIdentifierBuilder,
) -> None:
self.config = config
self.report = report
self.logger = logger
self.connection = connection
self.identifiers = identifiers
self._urns_processed: List[str] = []

@property
def identifier_config(self) -> SnowflakeIdentifierConfig:
return self.config

def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -80,10 +76,10 @@ def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:
return MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DataPlatformInstance(
platform=make_data_platform_urn(self.platform),
platform=make_data_platform_urn(self.identifiers.platform),
instance=(
make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
self.identifiers.platform, self.config.platform_instance
)
if self.config.platform_instance
else None
Expand All @@ -98,7 +94,7 @@ def _process_result_row(
result = DataQualityMonitoringResult.parse_obj(result_row)
assertion_guid = result.METRIC_NAME.split("__")[-1].lower()
status = bool(result.VALUE) # 1 if PASS, 0 if FAIL
assertee = self.get_dataset_identifier(
assertee = self.identifiers.get_dataset_identifier(
result.TABLE_NAME, result.TABLE_SCHEMA, result.TABLE_DATABASE
)
if assertee in discovered_datasets:
Expand All @@ -107,7 +103,7 @@ def _process_result_row(
aspect=AssertionRunEvent(
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
asserteeUrn=self.gen_dataset_urn(assertee),
asserteeUrn=self.identifiers.gen_dataset_urn(assertee),
status=AssertionRunStatus.COMPLETE,
assertionUrn=make_assertion_urn(assertion_guid),
result=AssertionResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class SnowflakeIdentifierConfig(
# Changing default value here.
convert_urns_to_lowercase: bool = Field(
default=True,
description="Whether to convert dataset urns to lowercase.",
)


Expand Down Expand Up @@ -210,8 +211,13 @@ class SnowflakeV2Config(
description="Populates view->view and table->view column lineage using DataHub's sql parser.",
)

lazy_schema_resolver: bool = Field(
use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from snowflake.",
)

lazy_schema_resolver: bool = Field(
default=True,
description="If enabled, uses lazy schema resolver to resolve schemas for tables and views. "
"This is useful if you have a large number of schemas and want to avoid bulk fetching the schema for each table/view.",
)
Expand Down
Loading

0 comments on commit ff76d07

Please sign in to comment.