Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): enable stateful_ingestion by default for DataHub rest sink #9934

1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks.
- #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release.
- #9934 - The stateful_ingestion is now enabled by default, if datahub-rest sink is used or if a `datahub_api` is specified

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class StatefulIngestionConfig(ConfigModel):

enabled: bool = Field(
default=False,
description="The type of the ingestion state provider registered with datahub.",
description="Whether or not to enable stateful ingest. "
"Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False",
)
max_checkpoint_state_size: pydantic.PositiveInt = Field(
default=2**24, # 16 MB
Expand Down Expand Up @@ -231,6 +232,16 @@ def _initialize_checkpointing_state_provider(self) -> None:
self.ingestion_checkpointing_state_provider: Optional[
IngestionCheckpointingProviderBase
] = None

if self.stateful_ingestion_config is None and self.ctx.graph:
logger.info(
"Stateful ingestion got enabled by default, as datahub-rest sink is used or `datahub_api` is specified"
)
self.stateful_ingestion_config = StatefulIngestionConfig(
enabled=True,
state_provider=DynamicTypedStateProviderConfig(type="datahub"),
)

if (
self.stateful_ingestion_config is not None
and self.stateful_ingestion_config.state_provider is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
StateProviderWrapper,
)
from datahub.ingestion.source.state.usage_common_state import (
BaseTimeWindowCheckpointState,
)
Expand Down Expand Up @@ -181,3 +185,34 @@ def test_providers(self):
state_class=type(job2_state_obj),
)
self.assertEqual(job2_last_checkpoint, job2_checkpoint)

def test_state_provider_wrapper_with_config_provided(self):
# stateful_ingestion_config.enabled as true
ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name)
ctx.graph = self.mock_graph
state_provider = StateProviderWrapper(
StatefulIngestionConfig(enabled=True), ctx
)
assert state_provider.stateful_ingestion_config
assert state_provider.ingestion_checkpointing_state_provider
# stateful_ingestion_config.enabled as false
ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name)
ctx.graph = self.mock_graph
state_provider = StateProviderWrapper(
StatefulIngestionConfig(enabled=False), ctx
)
assert state_provider.stateful_ingestion_config
assert not state_provider.ingestion_checkpointing_state_provider

def test_state_provider_wrapper_with_config_not_provided(self):
# graph object is present
ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name)
ctx.graph = self.mock_graph
state_provider = StateProviderWrapper(None, ctx)
assert state_provider.stateful_ingestion_config
assert state_provider.ingestion_checkpointing_state_provider
# graph object is none
ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name)
state_provider = StateProviderWrapper(None, ctx)
assert not state_provider.stateful_ingestion_config
assert not state_provider.ingestion_checkpointing_state_provider
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create_owners_list_from_urn_list(


def create_mocked_dbt_source() -> DBTCoreSource:
ctx = PipelineContext("test-run-id")
ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source")
graph = mock.MagicMock()
graph.get_ownership.return_value = mce_builder.make_ownership_aspect_from_urn_list(
["urn:li:corpuser:test_user"], "AUDIT"
Expand Down
Loading