Skip to content

Commit

Permalink
fix(ingest): only auto-enable stateful ingestion if pipeline name is …
Browse files Browse the repository at this point in the history
…set (#10075)
  • Loading branch information
hsheth2 authored Mar 18, 2024
1 parent 3a4bdef commit 104e787
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Breaking Changes

- #9934 - Stateful ingestion is now enabled by default if datahub-rest sink is used or if a `datahub_api` is specified. It will still be disabled by default when any other sink type is used.
- #9934 and #10075 - Stateful ingestion is now enabled by default if a `pipeline_name` is set and either a datahub-rest sink or `datahub_api` is specified. It will still be disabled by default when any other sink type is used or if there is no pipeline name set.
- #10002 - The `DataHubGraph` client no longer makes a request to the backend during initialization. If you want to preserve the old behavior, call `graph.test_connection()` after constructing the client.

### Potential Downtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class StatefulIngestionConfig(ConfigModel):
enabled: bool = Field(
default=False,
description="Whether or not to enable stateful ingest. "
"Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False",
"Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
)
max_checkpoint_state_size: pydantic.PositiveInt = Field(
default=2**24, # 16 MB
Expand Down Expand Up @@ -233,9 +233,13 @@ def _initialize_checkpointing_state_provider(self) -> None:
IngestionCheckpointingProviderBase
] = None

if self.stateful_ingestion_config is None and self.ctx.graph:
if (
self.stateful_ingestion_config is None
and self.ctx.graph
and self.ctx.pipeline_name
):
logger.info(
"Stateful ingestion got enabled by default, as datahub-rest sink is used or `datahub_api` is specified"
"Stateful ingestion will be automatically enabled, as datahub-rest sink is used or `datahub_api` is specified"
)
self.stateful_ingestion_config = StatefulIngestionConfig(
enabled=True,
Expand Down

0 comments on commit 104e787

Please sign in to comment.