Skip to content

Commit

Permalink
fixing up kafka and hana sources
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Jul 10, 2022
1 parent e0a1513 commit d00ccaa
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
10 changes: 9 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
JobStatusClass,
SubTypesClass,
)
from datahub.utilities.registries.domain_registry import DomainRegistry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -150,6 +151,11 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
self.schema_registry_client: KafkaSchemaRegistryBase = (
KafkaSource.create_schema_registry(config, self.report)
)
if self.source_config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[k for k in self.source_config.domain],
graph=self.ctx.graph,
)

def is_checkpointing_enabled(self, job_id: JobId) -> bool:
if (
Expand Down Expand Up @@ -333,7 +339,9 @@ def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]:
# 6. Emit domains aspect MCPW
for domain, pattern in self.source_config.domain.items():
if pattern.allowed(dataset_name):
domain_urn = make_domain_urn(domain)
domain_urn = make_domain_urn(
self.domain_registry.get_domain_urn(domain)
)

if domain_urn:
wus = add_domain_to_entity_wu(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/integration/hana/hana_to_file.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ source:
include_field_histogram: true
include_field_sample_values: true
domain:
sales:
"urn:li:domain:sales":
allow:
- "HOTEL"
sink:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ source:
bootstrap: "localhost:59092"
schema_registry_url: "http://localhost:58081"
domain:
sales:
"urn:li:domain:sales":
allow:
- "key_value_topic"
sink:
Expand Down

0 comments on commit d00ccaa

Please sign in to comment.