Skip to content

Commit

Permalink
feat(ingest): improve domain ingestion usability (#5366)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored Jul 11, 2022
1 parent 5bb7fe3 commit 860d475
Show file tree
Hide file tree
Showing 15 changed files with 303 additions and 41 deletions.
44 changes: 43 additions & 1 deletion docs/domains.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ By default, you don't need to worry about this. DataHub will auto-generate an un
Once you've chosen a name and a description, click 'Create' to create the new Domain.


## Assigning an Asset to a Domain
## Assigning an Asset to a Domain

You can assign assets to Domain using the UI or programmatically using the API or during ingestion.

### UI-Based Assignment
To assign an asset to a Domain, simply navigate to the asset's profile page. At the bottom left-side menu bar, you'll
see a 'Domain' section. Click 'Set Domain', and then search for the Domain you'd like to add to. When you're done, click 'Add'.

Expand All @@ -59,6 +62,45 @@ To remove an asset from a Domain, click the 'x' icon on the Domain tag.
> Notice: Adding or removing an asset from a Domain requires the `Edit Domain` Metadata Privilege, which can be granted
> by a [Policy](authorization/policies.md).
### Ingestion-time Assignment
All SQL-based ingestion sources support assigning domains during ingestion using the `domain` configuration. Consult your source's configuration details page (e.g. [Snowflake](./generated/ingestion/sources/snowflake.md)), to verify that it supports the Domain capability.

:::note

Assignment of domains during ingestion will overwrite domains that you have assigned in the UI. A single table can only belong to one domain.

:::


Here is a quick example of a snowflake ingestion recipe that has been enhanced to attach the **Analytics** domain to all tables in the **long_tail_companions** database in the **analytics** schema, and the **Finance** domain to all tables in the **long_tail_companions** database in the **ecommerce** schema.

```yaml
source:
type: snowflake
config:
username: ${SNOW_USER}
password: ${SNOW_PASS}
account_id:
warehouse: COMPUTE_WH
role: accountadmin
database_pattern:
allow:
- "long_tail_companions"
schema_pattern:
deny:
- information_schema
profiling:
enabled: False
domain:
Analytics:
allow:
- "long_tail_companions.analytics.*"
Finance:
allow:
- "long_tail_companions.ecommerce.*"
```
## Searching by Domain
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ class AllowDenyPattern(ConfigModel):

allow: List[str] = Field(
default=[".*"],
description="List of regex patterns for process groups to include in ingestion",
description="List of regex patterns to include in ingestion",
)
deny: List[str] = Field(
default=[],
description="List of regex patterns for process groups to exclude from ingestion.",
description="List of regex patterns to exclude from ingestion.",
)
ignoreCase: Optional[bool] = Field(
default=True,
Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ def _emit_generic(self, url: str, payload: str) -> None:
"Unable to emit metadata to DataHub GMS", {"message": str(e)}
) from e

def __repr__(self) -> str:
token_str = (
f" with token: {self._token[:4]}**********{self._token[-4:]}"
if self._token
else ""
)
return (
f"DataHubRestEmitter: configured to talk to {self._gms_server}{token_str}"
)


class DatahubRestEmitter(DataHubRestEmitter):
"""This class exists as a pass-through for backwards compatibility"""
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,7 @@ def get_report(self) -> SinkReport:
@abstractmethod
def close(self) -> None:
pass

def configured(self) -> str:
"""Override this method to output a human-readable and scrubbed version of the configured sink"""
return ""
83 changes: 82 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
from json.decoder import JSONDecodeError
from typing import Any, Dict, List, Optional, Type
from typing import Any, Dict, Iterable, List, Optional, Type

from avro.schema import RecordSchema
from deprecated import deprecated
Expand Down Expand Up @@ -287,3 +287,84 @@ def get_latest_timeseries_value(
f"Failed to find {aspect_type} in response {aspect_json}"
)
return None

def _get_search_endpoint(self):
return f"{self.config.server}/entities?action=search"

def get_domain_urn_by_name(self, domain_name: str) -> Optional[str]:
"""Retrieve a domain urn based on its name. Returns None if there is no match found"""

filters = []
filter_criteria = [
{
"field": "name",
"value": domain_name,
"condition": "EQUAL",
}
]

filters.append({"and": filter_criteria})
search_body = {
"input": "*",
"entity": "domain",
"start": 0,
"count": 10,
"filter": {"or": filters},
}
results: Dict = self._post_generic(self._get_search_endpoint(), search_body)
num_entities = results.get("value", {}).get("numEntities", 0)
if num_entities > 1:
logger.warning(
f"Got {num_entities} results for domain name {domain_name}. Will return the first match."
)
entities_yielded: int = 0
entities = []
for x in results["value"]["entities"]:
entities_yielded += 1
logger.debug(f"yielding {x['entity']}")
entities.append(x["entity"])
return entities[0] if entities_yielded else None

def get_container_urns_by_filter(
self,
env: Optional[str] = None,
search_query: str = "*",
) -> Iterable[str]:
"""Return container urns that match based on query"""
url = self._get_search_endpoint()

container_filters = []
for container_subtype in ["Database", "Schema", "Project", "Dataset"]:
filter_criteria = []

filter_criteria.append(
{
"field": "customProperties",
"value": f"instance={env}",
"condition": "EQUAL",
}
)

filter_criteria.append(
{
"field": "typeNames",
"value": container_subtype,
"condition": "EQUAL",
}
)
container_filters.append({"and": filter_criteria})
search_body = {
"input": search_query,
"entity": "container",
"start": 0,
"count": 10000,
"filter": {"or": container_filters},
}
results: Dict = self._post_generic(url, search_body)
num_entities = results["value"]["numEntities"]
logger.debug(f"Matched {num_entities} containers")
entities_yielded: int = 0
for x in results["value"]["entities"]:
entities_yielded += 1
logger.debug(f"yielding {x['entity']}")
yield x["entity"]
101 changes: 76 additions & 25 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class Pipeline:
sink: Sink
transformers: List[Transformer]

def _record_initialization_failure(self, e: Exception, msg: str) -> None:
self.pipeline_init_exception: Optional[Exception] = e
self.pipeline_init_failures: Optional[str] = f"{msg} due to {e}"
logger.error(e)

def __init__(
self,
config: PipelineConfig,
Expand All @@ -138,23 +143,59 @@ def __init__(
dry_run=dry_run,
preview_mode=preview_mode,
)
self.pipeline_init_failures = None
self.pipeline_init_exception = None

sink_type = self.config.sink.type
sink_class = sink_registry.get(sink_type)
sink_config = self.config.sink.dict().get("config") or {}
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")

source_type = self.config.source.type
source_class = source_registry.get(source_type)
self.source: Source = source_class.create(
self.config.source.dict().get("config", {}), self.ctx
)
logger.debug(f"Source type:{source_type},{source_class} configured")
try:
sink_class = sink_registry.get(sink_type)
except Exception as e:
self._record_initialization_failure(e, "Failed to create a sink")
return

try:
sink_config = self.config.sink.dict().get("config") or {}
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
logger.info(f"Sink configured successfully. {self.sink.configured()}")
except Exception as e:
self._record_initialization_failure(
e, f"Failed to configure sink ({sink_type})"
)
return

try:
source_type = self.config.source.type
source_class = source_registry.get(source_type)
except Exception as e:
self._record_initialization_failure(e, "Failed to create source")
return

try:
self.source: Source = source_class.create(
self.config.source.dict().get("config", {}), self.ctx
)
logger.debug(f"Source type:{source_type},{source_class} configured")
except Exception as e:
self._record_initialization_failure(
e, f"Failed to configure source ({source_type})"
)
return

self.extractor_class = extractor_registry.get(self.config.source.extractor)
try:
self.extractor_class = extractor_registry.get(self.config.source.extractor)
except Exception as e:
self._record_initialization_failure(
e, f"Failed to configure extractor ({self.config.source.extractor})"
)
return

try:
self._configure_transforms()
except ValueError as e:
self._record_initialization_failure(e, "Failed to configure transformers")
return

self._configure_transforms()
self._configure_reporting()

def _configure_transforms(self) -> None:
Expand Down Expand Up @@ -209,6 +250,10 @@ def create(
def run(self) -> None:

callback = LoggingCallback()
if self.pipeline_init_failures:
# no point continuing, return early
return

extractor: Extractor = self.extractor_class()
for wu in itertools.islice(
self.source.get_workunits(),
Expand Down Expand Up @@ -296,6 +341,9 @@ def process_commits(self) -> None:
logger.info(f"Successfully committed changes for {name}.")

def raise_from_status(self, raise_warnings: bool = False) -> None:
if self.pipeline_init_exception:
raise self.pipeline_init_exception

if self.source.get_report().failures:
raise PipelineExecutionError(
"Source reported errors", self.source.get_report()
Expand All @@ -310,18 +358,18 @@ def raise_from_status(self, raise_warnings: bool = False) -> None:
)

def log_ingestion_stats(self) -> None:

telemetry.telemetry_instance.ping(
"ingest_stats",
{
"source_type": self.config.source.type,
"sink_type": self.config.sink.type,
"records_written": stats.discretize(
self.sink.get_report().records_written
),
},
self.ctx.graph,
)
if not self.pipeline_init_failures:
telemetry.telemetry_instance.ping(
"ingest_stats",
{
"source_type": self.config.source.type,
"sink_type": self.config.sink.type,
"records_written": stats.discretize(
self.sink.get_report().records_written
),
},
self.ctx.graph,
)

def _count_all_vals(self, d: Dict[str, List]) -> int:
result = 0
Expand All @@ -331,6 +379,9 @@ def _count_all_vals(self, d: Dict[str, List]) -> int:

def pretty_print_summary(self, warnings_as_failure: bool = False) -> int:
click.echo()
if self.pipeline_init_failures:
click.secho(f"{self.pipeline_init_failures}", fg="red")
return 1
click.secho(f"Source ({self.config.source.type}) report:", bold=True)
click.echo(self.source.get_report().as_string())
click.secho(f"Sink ({self.config.sink.type}) report:", bold=True)
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,9 @@ def get_report(self) -> SinkReport:

def close(self):
self.executor.shutdown(wait=True)

def __repr__(self) -> str:
return self.emitter.__repr__()

def configured(self) -> str:
return self.__repr__()
10 changes: 9 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
JobStatusClass,
SubTypesClass,
)
from datahub.utilities.registries.domain_registry import DomainRegistry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -150,6 +151,11 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
self.schema_registry_client: KafkaSchemaRegistryBase = (
KafkaSource.create_schema_registry(config, self.report)
)
if self.source_config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[k for k in self.source_config.domain],
graph=self.ctx.graph,
)

def is_checkpointing_enabled(self, job_id: JobId) -> bool:
if (
Expand Down Expand Up @@ -333,7 +339,9 @@ def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]:
# 6. Emit domains aspect MCPW
for domain, pattern in self.source_config.domain.items():
if pattern.allowed(dataset_name):
domain_urn = make_domain_urn(domain)
domain_urn = make_domain_urn(
self.domain_registry.get_domain_urn(domain)
)

if domain_urn:
wus = add_domain_to_entity_wu(
Expand Down
Loading

0 comments on commit 860d475

Please sign in to comment.