Skip to content

Commit

Permalink
feat(ingest): enable stateful_ingestion by default for DataHub rest s…
Browse files Browse the repository at this point in the history
…ink (#9934)
  • Loading branch information
shubhamjagtap639 authored Mar 5, 2024
1 parent c0aedd4 commit fda5eb8
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks.
- #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release.
- #9934 - The stateful_ingestion is now enabled by default, if datahub-rest sink is used or if a `datahub_api` is specified

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class StatefulIngestionConfig(ConfigModel):

enabled: bool = Field(
default=False,
description="The type of the ingestion state provider registered with datahub.",
description="Whether or not to enable stateful ingest. "
"Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False",
)
max_checkpoint_state_size: pydantic.PositiveInt = Field(
default=2**24, # 16 MB
Expand Down Expand Up @@ -231,6 +232,16 @@ def _initialize_checkpointing_state_provider(self) -> None:
self.ingestion_checkpointing_state_provider: Optional[
IngestionCheckpointingProviderBase
] = None

if self.stateful_ingestion_config is None and self.ctx.graph:
logger.info(
"Stateful ingestion got enabled by default, as datahub-rest sink is used or `datahub_api` is specified"
)
self.stateful_ingestion_config = StatefulIngestionConfig(
enabled=True,
state_provider=DynamicTypedStateProviderConfig(type="datahub"),
)

if (
self.stateful_ingestion_config is not None
and self.stateful_ingestion_config.state_provider is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
StateProviderWrapper,
)
from datahub.ingestion.source.state.usage_common_state import (
BaseTimeWindowCheckpointState,
)
Expand Down Expand Up @@ -181,3 +185,34 @@ def test_providers(self):
state_class=type(job2_state_obj),
)
self.assertEqual(job2_last_checkpoint, job2_checkpoint)

def test_state_provider_wrapper_with_config_provided(self):
# stateful_ingestion_config.enabled as true
ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name)
ctx.graph = self.mock_graph
state_provider = StateProviderWrapper(
StatefulIngestionConfig(enabled=True), ctx
)
assert state_provider.stateful_ingestion_config
assert state_provider.ingestion_checkpointing_state_provider
# stateful_ingestion_config.enabled as false
ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name)
ctx.graph = self.mock_graph
state_provider = StateProviderWrapper(
StatefulIngestionConfig(enabled=False), ctx
)
assert state_provider.stateful_ingestion_config
assert not state_provider.ingestion_checkpointing_state_provider

def test_state_provider_wrapper_with_config_not_provided(self):
# graph object is present
ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name)
ctx.graph = self.mock_graph
state_provider = StateProviderWrapper(None, ctx)
assert state_provider.stateful_ingestion_config
assert state_provider.ingestion_checkpointing_state_provider
# graph object is none
ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name)
state_provider = StateProviderWrapper(None, ctx)
assert not state_provider.stateful_ingestion_config
assert not state_provider.ingestion_checkpointing_state_provider
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create_owners_list_from_urn_list(


def create_mocked_dbt_source() -> DBTCoreSource:
ctx = PipelineContext("test-run-id")
ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source")
graph = mock.MagicMock()
graph.get_ownership.return_value = mce_builder.make_ownership_aspect_from_urn_list(
["urn:li:corpuser:test_user"], "AUDIT"
Expand Down

0 comments on commit fda5eb8

Please sign in to comment.