Skip to content

Commit

Permalink
Merge branch 'master' into cus3116-failing-cli-0.14.1
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored Dec 11, 2024
2 parents f7129af + b091e46 commit 5975131
Show file tree
Hide file tree
Showing 23 changed files with 1,155 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
"displayName": "Athena",
"description": "Import Schemas, Tables, Views, and lineage to S3 from Athena.",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/athena/",
"recipe": "source:\n type: athena\n config:\n # Coordinates\n aws_region: my_aws_region\n work_group: primary\n\n # Options\n s3_staging_dir: \"s3://my_staging_athena_results_bucket/results/\""
"recipe": "source:\n type: athena\n config:\n # AWS Keys (Optional - Required only if local aws credentials are not set)\n username: aws_access_key_id\n password: aws_secret_access_key\n # Coordinates\n aws_region: my_aws_region\n work_group: primary\n\n # Options\n s3_staging_dir: \"s3://my_staging_athena_results_bucket/results/\""
},
{
"urn": "urn:li:dataPlatform:clickhouse",
Expand Down
3 changes: 2 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```

The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatibility, However, we recommend enabling this flag after performing the necessary cleanup.
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.

- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
- #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2.
- #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation.
- #12077: `Kafka` source no longer ingests schemas from schema registry as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them
- OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set.
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.

Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/docs/sources/athena/athena_recipe.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
source:
type: athena
config:

# AWS Keys (Optional - Required only if local aws credentials are not set)
username: my_aws_access_key_id
password: my_aws_secret_access_key

# Coordinates
aws_region: my_aws_region
work_group: primary
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ def _ensure_valid_gms_url_acryl_cloud(url: str) -> str:
url = f"{url}/gms"
elif url.endswith("acryl.io/"):
url = f"{url}gms"
if url.endswith("acryl.io/api/gms"):
url = url.replace("acryl.io/api/gms", "acryl.io/gms")

return url

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import Iterable, Optional

from pydantic.fields import Field

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import set_aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import create_dataset_props_patch_builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
MetadataChangeEventClass,
SystemMetadataClass,
)

logger = logging.getLogger(__name__)


def convert_dataset_properties_to_patch(
urn: str,
aspect: DatasetPropertiesClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
patch_builder = create_dataset_props_patch_builder(urn, aspect, system_metadata)
mcp = next(iter(patch_builder.build()))
return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp)


def auto_incremental_properties(
incremental_properties: bool,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
if not incremental_properties:
yield from stream
return # early exit

for wu in stream:
urn = wu.get_urn()

if isinstance(wu.metadata, MetadataChangeEventClass):
properties_aspect = wu.get_aspect_of_type(DatasetPropertiesClass)
set_aspect(wu.metadata, None, DatasetPropertiesClass)
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

if properties_aspect:
yield convert_dataset_properties_to_patch(
urn, properties_aspect, wu.metadata.systemMetadata
)
elif isinstance(wu.metadata, MetadataChangeProposalWrapper) and isinstance(
wu.metadata.aspect, DatasetPropertiesClass
):
properties_aspect = wu.metadata.aspect
if properties_aspect:
yield convert_dataset_properties_to_patch(
urn, properties_aspect, wu.metadata.systemMetadata
)
else:
yield wu


# TODO: Use this in SQLCommonConfig. Currently only used in snowflake
class IncrementalPropertiesConfigMixin(ConfigModel):
incremental_properties: bool = Field(
default=False,
description="When enabled, emits dataset properties as incremental to existing dataset properties "
"in DataHub. When disabled, re-states dataset properties on each run.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
SchemaFieldClass,
SchemaMetadataClass,
StatusClass,
SystemMetadataClass,
TimeWindowSizeClass,
)
from datahub.metadata.urns import DatasetUrn, GlossaryTermUrn, TagUrn, Urn
Expand Down Expand Up @@ -65,9 +66,10 @@ def auto_workunit(
def create_dataset_props_patch_builder(
dataset_urn: str,
dataset_properties: DatasetPropertiesClass,
system_metadata: Optional[SystemMetadataClass] = None,
) -> DatasetPatchBuilder:
"""Creates a patch builder with a table's or view's attributes and dataset properties"""
patch_builder = DatasetPatchBuilder(dataset_urn)
patch_builder = DatasetPatchBuilder(dataset_urn, system_metadata)
patch_builder.set_display_name(dataset_properties.name)
patch_builder.set_description(dataset_properties.description)
patch_builder.set_created(dataset_properties.created)
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/abs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List:
).infer_schema(file)
elif extension == ".json":
fields = json.JsonInferrer().infer_schema(file)
elif extension == ".jsonl":
fields = json.JsonInferrer(
max_rows=self.source_config.max_rows, format="jsonl"
).infer_schema(file)
elif extension == ".avro":
fields = avro.AvroInferrer().infer_schema(file)
else:
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ def get_workunits_internal(
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.dataprocess_cleanup:
try:
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.soft_deleted_entities_cleanup:
try:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
Expand All @@ -170,6 +165,11 @@ def get_workunits_internal(
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.dataprocess_cleanup:
try:
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
yield from []

def truncate_indices(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel):
description="Query to filter entities",
)
limit_entities_delete: Optional[int] = Field(
10000, description="Max number of entities to delete."
25000, description="Max number of entities to delete."
)

runtime_limit_seconds: Optional[int] = Field(
Expand Down
29 changes: 18 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ class KafkaSourceConfig(
default=False,
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
)
ingest_schemas_as_entities: bool = pydantic.Field(
default=False,
description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics",
)


def get_kafka_consumer(
Expand Down Expand Up @@ -343,17 +347,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
else:
self.report.report_dropped(topic)

# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(f"Failed to extract subject {subject}", exc_info=True)
self.report.report_warning(
"subject", f"Exception while extracting topic {subject}: {e}"
)
if self.source_config.ingest_schemas_as_entities:
# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(
f"Failed to extract subject {subject}", exc_info=True
)
self.report.report_warning(
"subject", f"Exception while extracting topic {subject}: {e}"
)

def _extract_record(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class RedshiftConfig(
description="Whether to extract column level lineage. This config works with rest-sink only.",
)

# TODO - use DatasetPropertiesConfigMixin instead
patch_custom_properties: bool = Field(
default=True,
description="Whether to patch custom properties on existing datasets rather than replace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,8 @@ def gen_dataset_workunits(
customProperties=custom_properties,
)
if self.config.patch_custom_properties:
# TODO: use auto_incremental_properties workunit processor instead
# Deprecate use of patch_custom_properties
patch_builder = create_dataset_props_patch_builder(
dataset_urn, dataset_properties
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.incremental_properties_helper import (
IncrementalPropertiesConfigMixin,
)
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
Expand Down Expand Up @@ -188,6 +191,7 @@ class SnowflakeV2Config(
StatefulUsageConfigMixin,
StatefulProfilingConfigMixin,
ClassificationSourceConfigMixin,
IncrementalPropertiesConfigMixin,
):
include_usage_stats: bool = Field(
default=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
support_status,
)
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.incremental_properties_helper import (
auto_incremental_properties,
)
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Expand Down Expand Up @@ -446,6 +449,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
functools.partial(
auto_incremental_lineage, self.config.incremental_lineage
),
functools.partial(
auto_incremental_properties, self.config.incremental_properties
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
)

if table_props:
# TODO: use auto_incremental_properties workunit processor instead
# Consider enabling incremental_properties by default
patch_builder = create_dataset_props_patch_builder(dataset_urn, table_props)
for patch_mcp in patch_builder.build():
yield MetadataWorkUnit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ run_id: kafka-test
source:
type: kafka
config:
ingest_schemas_as_entities: true
connection:
bootstrap: "localhost:29092"
schema_registry_url: "http://localhost:28081"
Expand Down
Loading

0 comments on commit 5975131

Please sign in to comment.