Skip to content

Commit 11eda4b

Browse files
authored
feat(ingest): refactor LineageConfig class (#10074)
1 parent f4d4e79 commit 11eda4b

File tree

9 files changed

+41
-48
lines changed

9 files changed

+41
-48
lines changed

metadata-ingestion/src/datahub/configuration/common.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -302,10 +302,3 @@ def value(self, string: str) -> List[str]:
302302

303303
class VersionedConfig(ConfigModel):
304304
version: str = "1"
305-
306-
307-
class LineageConfig(ConfigModel):
308-
incremental_lineage: bool = Field(
309-
default=False,
310-
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
311-
)

metadata-ingestion/src/datahub/configuration/source_common.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin):
5454
Any source that is a primary producer of Dataset metadata should inherit this class
5555
"""
5656

57+
# TODO: Deprecate this in favor of the more granular config mixins in order
58+
# to flatten our config inheritance hierarchies.
59+
5760

5861
class LowerCaseDatasetUrnConfigMixin(ConfigModel):
5962
convert_urns_to_lowercase: bool = Field(

metadata-ingestion/src/datahub/ingestion/api/incremental_lineage_helper.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import copy
22
from typing import Dict, Iterable, Optional
33

4+
from pydantic.fields import Field
5+
6+
from datahub.configuration.common import ConfigModel
47
from datahub.emitter.mce_builder import datahub_guid, set_aspect
58
from datahub.emitter.mcp import MetadataChangeProposalWrapper
69
from datahub.ingestion.api.workunit import MetadataWorkUnit
@@ -143,3 +146,10 @@ def auto_incremental_lineage(
143146
)
144147
else:
145148
yield wu
149+
150+
151+
class IncrementalLineageConfigMixin(ConfigModel):
152+
incremental_lineage: bool = Field(
153+
default=False,
154+
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
155+
)

metadata-ingestion/src/datahub/ingestion/run/pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,7 @@ def log_ingestion_stats(self) -> None:
567567
"warnings": stats.discretize(
568568
source_warnings + sink_warnings + global_warnings
569569
),
570+
"has_pipeline_name": bool(self.config.pipeline_name),
570571
},
571572
self.ctx.graph,
572573
)

metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
ConfigEnum,
1717
ConfigModel,
1818
ConfigurationError,
19-
LineageConfig,
2019
)
21-
from datahub.configuration.source_common import DatasetSourceConfigMixin
20+
from datahub.configuration.source_common import (
21+
EnvConfigMixin,
22+
PlatformInstanceConfigMixin,
23+
)
2224
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
2325
from datahub.configuration.validate_field_removal import pydantic_removed_field
2426
from datahub.emitter import mce_builder
@@ -33,6 +35,7 @@
3335
support_status,
3436
)
3537
from datahub.ingestion.api.incremental_lineage_helper import (
38+
IncrementalLineageConfigMixin,
3639
convert_upstream_lineage_to_patch,
3740
)
3841
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
@@ -215,7 +218,10 @@ def is_only_test_results(self) -> bool:
215218

216219

217220
class DBTCommonConfig(
218-
StatefulIngestionConfigBase, DatasetSourceConfigMixin, LineageConfig
221+
StatefulIngestionConfigBase,
222+
PlatformInstanceConfigMixin,
223+
EnvConfigMixin,
224+
IncrementalLineageConfigMixin,
219225
):
220226
env: str = Field(
221227
default=mce_builder.DEFAULT_ENV,

metadata-ingestion/src/datahub/ingestion/source/redshift/config.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
from datahub.configuration.common import AllowDenyPattern
1010
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
1111
from datahub.configuration.validate_field_removal import pydantic_removed_field
12+
from datahub.ingestion.api.incremental_lineage_helper import (
13+
IncrementalLineageConfigMixin,
14+
)
1215
from datahub.ingestion.glossary.classification_mixin import (
1316
ClassificationSourceConfigMixin,
1417
)
@@ -70,6 +73,7 @@ class RedshiftConfig(
7073
BasicSQLAlchemyConfig,
7174
DatasetLineageProviderConfigBase,
7275
S3DatasetLineageProviderConfigBase,
76+
IncrementalLineageConfigMixin,
7377
RedshiftUsageConfig,
7478
StatefulLineageConfigMixin,
7579
StatefulProfilingConfigMixin,
@@ -150,11 +154,6 @@ class RedshiftConfig(
150154
description="Whether to extract column level lineage. This config works with rest-sink only.",
151155
)
152156

153-
incremental_lineage: bool = Field(
154-
default=False,
155-
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.",
156-
)
157-
158157
patch_custom_properties: bool = Field(
159158
default=True,
160159
description="Whether to patch custom properties on existing datasets rather than replace.",

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,11 +1077,8 @@ def generate_lineage(
10771077
self.db_schemas[database][schema],
10781078
)
10791079
if lineage_info:
1080-
yield from gen_lineage(
1081-
dataset_urn,
1082-
(lineage_info, {}),
1083-
incremental_lineage=False, # incremental lineage generation is taken care by auto_incremental_lineage
1084-
)
1080+
# incremental lineage generation is taken care by auto_incremental_lineage
1081+
yield from gen_lineage(dataset_urn, lineage_info)
10851082

10861083
def add_config_to_report(self):
10871084
self.report.stateful_lineage_ingestion_enabled = (

metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
from pydantic import Field
77
from sqlalchemy.engine import URL
88

9-
from datahub.configuration.common import AllowDenyPattern, ConfigModel, LineageConfig
9+
from datahub.configuration.common import AllowDenyPattern, ConfigModel
1010
from datahub.configuration.source_common import (
1111
DatasetSourceConfigMixin,
1212
LowerCaseDatasetUrnConfigMixin,
1313
)
1414
from datahub.configuration.validate_field_removal import pydantic_removed_field
15+
from datahub.ingestion.api.incremental_lineage_helper import (
16+
IncrementalLineageConfigMixin,
17+
)
1518
from datahub.ingestion.glossary.classification_mixin import (
1619
ClassificationSourceConfigMixin,
1720
)
@@ -31,7 +34,7 @@ class SQLCommonConfig(
3134
StatefulIngestionConfigBase,
3235
DatasetSourceConfigMixin,
3336
LowerCaseDatasetUrnConfigMixin,
34-
LineageConfig,
37+
IncrementalLineageConfigMixin,
3538
ClassificationSourceConfigMixin,
3639
):
3740
options: dict = pydantic.Field(

metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Dict, Iterable, List, Optional, Tuple
1+
from typing import Dict, Iterable, List, Optional
22

33
from datahub.configuration.common import AllowDenyPattern
44
from datahub.emitter.mce_builder import (
@@ -19,7 +19,6 @@
1919
from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage
2020
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
2121
from datahub.metadata.schema_classes import DataPlatformInstanceClass
22-
from datahub.specific.dataset import DatasetPatchBuilder
2322
from datahub.utilities.registries.domain_registry import DomainRegistry
2423
from datahub.utilities.urns.dataset_urn import DatasetUrn
2524

@@ -201,32 +200,14 @@ def get_dataplatform_instance_aspect(
201200

202201
def gen_lineage(
203202
dataset_urn: str,
204-
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None,
205-
incremental_lineage: bool = True,
203+
upstream_lineage: Optional[UpstreamLineage],
206204
) -> Iterable[MetadataWorkUnit]:
207-
if lineage_info is None:
208-
return
209-
210-
upstream_lineage, upstream_column_props = lineage_info
211205
if upstream_lineage is not None:
212-
if incremental_lineage:
213-
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(urn=dataset_urn)
214-
for upstream in upstream_lineage.upstreams:
215-
patch_builder.add_upstream_lineage(upstream)
216-
217-
lineage_workunits = [
218-
MetadataWorkUnit(
219-
id=f"upstreamLineage-for-{dataset_urn}",
220-
mcp_raw=mcp,
221-
)
222-
for mcp in patch_builder.build()
223-
]
224-
else:
225-
lineage_workunits = [
226-
MetadataChangeProposalWrapper(
227-
entityUrn=dataset_urn, aspect=upstream_lineage
228-
).as_workunit()
229-
]
206+
lineage_workunits = [
207+
MetadataChangeProposalWrapper(
208+
entityUrn=dataset_urn, aspect=upstream_lineage
209+
).as_workunit()
210+
]
230211

231212
for wu in lineage_workunits:
232213
yield wu

0 commit comments

Comments
 (0)