Skip to content

Commit

Permalink
feat(ingest/kafka): Flag for optional schemas ingestion (#12077)
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal authored Dec 11, 2024
1 parent e6cc676 commit b091e46
Show file tree
Hide file tree
Showing 7 changed files with 620 additions and 18 deletions.
3 changes: 2 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```

The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatibility, However, we recommend enabling this flag after performing the necessary cleanup.
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.

- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
- #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2.
- #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation.
- #12077: `Kafka` source no longer ingests schemas from schema registry as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them
- OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set.
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.

Expand Down
29 changes: 18 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ class KafkaSourceConfig(
default=False,
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
)
ingest_schemas_as_entities: bool = pydantic.Field(
default=False,
description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics",
)


def get_kafka_consumer(
Expand Down Expand Up @@ -343,17 +347,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
else:
self.report.report_dropped(topic)

# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(f"Failed to extract subject {subject}", exc_info=True)
self.report.report_warning(
"subject", f"Exception while extracting topic {subject}: {e}"
)
if self.source_config.ingest_schemas_as_entities:
# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(
f"Failed to extract subject {subject}", exc_info=True
)
self.report.report_warning(
"subject", f"Exception while extracting topic {subject}: {e}"
)

def _extract_record(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ run_id: kafka-test
source:
type: kafka
config:
ingest_schemas_as_entities: true
connection:
bootstrap: "localhost:29092"
schema_registry_url: "http://localhost:28081"
Expand Down
Loading

0 comments on commit b091e46

Please sign in to comment.