Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): enable stateful_ingestion by default for DataHub rest sink #9934

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class StatefulIngestionConfig(ConfigModel):

enabled: bool = Field(
default=False,
description="The type of the ingestion state provider registered with datahub.",
description="Default as True if datahub-rest sink is used or if datahub_api is specified, otherwise False",
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
)
max_checkpoint_state_size: pydantic.PositiveInt = Field(
default=2**24, # 16 MB
Expand Down Expand Up @@ -231,6 +231,14 @@ def _initialize_checkpointing_state_provider(self) -> None:
self.ingestion_checkpointing_state_provider: Optional[
IngestionCheckpointingProviderBase
] = None

if self.stateful_ingestion_config is None and self.ctx.graph:
# If stateful ingestion config not set, enable it by default if graph object is not none
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
self.stateful_ingestion_config = StatefulIngestionConfig(
enabled=True,
state_provider=DynamicTypedStateProviderConfig(type="datahub"),
)

if (
self.stateful_ingestion_config is not None
and self.stateful_ingestion_config.state_provider is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
StateProviderWrapper,
)
from datahub.ingestion.source.state.usage_common_state import (
BaseTimeWindowCheckpointState,
)
Expand Down Expand Up @@ -181,3 +185,32 @@ def test_providers(self):
state_class=type(job2_state_obj),
)
self.assertEqual(job2_last_checkpoint, job2_checkpoint)

def test_state_provider_wrapper(self):
ctx: PipelineContext = PipelineContext(
run_id=self.run_id, pipeline_name=self.pipeline_name
)
ctx.graph = self.mock_graph
# Test 1: stateful_ingestion_config provided with enabled as true
state_provider = StateProviderWrapper(
StatefulIngestionConfig(enabled=True), ctx
)
assert state_provider.stateful_ingestion_config
assert state_provider.ingestion_checkpointing_state_provider
ctx.checkpointers = {}
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
# Test 2: stateful_ingestion_config provided with enabled as false
state_provider = StateProviderWrapper(
StatefulIngestionConfig(enabled=False), ctx
)
assert state_provider.stateful_ingestion_config
assert not state_provider.ingestion_checkpointing_state_provider
# Test 3: stateful_ingestion_config not provided but graph object is present
state_provider = StateProviderWrapper(None, ctx)
assert state_provider.stateful_ingestion_config
assert state_provider.ingestion_checkpointing_state_provider
ctx.checkpointers = {}
# Test 4: stateful_ingestion_config not provided and graph object is none
ctx.graph = None
state_provider = StateProviderWrapper(None, ctx)
assert not state_provider.stateful_ingestion_config
assert not state_provider.ingestion_checkpointing_state_provider
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create_owners_list_from_urn_list(


def create_mocked_dbt_source() -> DBTCoreSource:
ctx = PipelineContext("test-run-id")
ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source")
graph = mock.MagicMock()
graph.get_ownership.return_value = mce_builder.make_ownership_aspect_from_urn_list(
["urn:li:corpuser:test_user"], "AUDIT"
Expand Down
Loading