Skip to content

Commit

Permalink
refactor(ingest/stateful): remove IngestionJobStateProvider (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and szalai1 committed Dec 22, 2022
1 parent 1e97f47 commit 2aa30e4
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 159 deletions.
6 changes: 4 additions & 2 deletions metadata-ingestion/src/datahub/cli/state_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from click_default_group import DefaultGroup

from datahub.cli.cli_utils import get_url_and_token
from datahub.ingestion.api.ingestion_job_state_provider import IngestionJobStateProvider
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
IngestionCheckpointingProviderBase,
)
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
Expand Down Expand Up @@ -49,7 +51,7 @@ def inspect(pipeline_name: str, platform: str, platform_instance: str) -> None:

job_name = StaleEntityRemovalHandler.compute_job_id(platform)

data_job_urn = IngestionJobStateProvider.get_data_job_urn(
data_job_urn = IngestionCheckpointingProviderBase.get_data_job_urn(
DatahubIngestionCheckpointingProvider.orchestrator_name,
pipeline_name,
job_name,
Expand Down
15 changes: 4 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/api/committable.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum, auto
from typing import Generic, List, Optional, TypeVar
from typing import Generic, TypeVar


class CommitPolicy(Enum):
Expand All @@ -14,7 +14,6 @@ class CommitPolicy(Enum):
class Committable(ABC):
name: str
commit_policy: CommitPolicy
committed: bool = False

@abstractmethod
def commit(self) -> None:
Expand All @@ -23,28 +22,22 @@ def commit(self) -> None:

StateKeyType = TypeVar("StateKeyType")
StateType = TypeVar("StateType")
# TODO: Add a better alternative to a string for the filter.
FilterType = TypeVar("FilterType")


class StatefulCommittable(
Committable,
Generic[StateKeyType, StateType, FilterType],
Generic[StateKeyType, StateType],
):
def __init__(
self, name: str, commit_policy: CommitPolicy, state_to_commit: StateType
):
super().__init__(name=name, commit_policy=commit_policy)
self.committed: bool = False
self.state_to_commit: StateType = state_to_commit

def has_successfully_committed(self) -> bool:
return bool(not self.state_to_commit or self.committed)

@abstractmethod
def get_previous_states(
self,
state_key: StateKeyType,
last_only: bool = True,
filter_opt: Optional[FilterType] = None,
) -> List[StateType]:
def get_last_state(self, state_key: StateKeyType) -> StateType:
pass
12 changes: 0 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def __init__(
self.pipeline_name = pipeline_name
self.dry_run_mode = dry_run
self.preview_mode = preview_mode
self.reporters: Dict[str, Committable] = {}
self.checkpointers: Dict[str, Committable] = {}
try:
self.graph = DataHubGraph(datahub_api) if datahub_api is not None else None
Expand All @@ -83,16 +82,5 @@ def register_checkpointer(self, committable: Committable) -> None:
)
self.checkpointers[committable.name] = committable

def register_reporter(self, committable: Committable) -> None:
if committable.name in self.reporters:
raise IndexError(
f"Reporting provider {committable.name} already registered."
)
self.reporters[committable.name] = committable

def get_reporters(self) -> Iterable[Committable]:
yield from self.reporters.values()

def get_committables(self) -> Iterable[Tuple[str, Committable]]:
yield from self.reporters.items()
yield from self.checkpointers.items()
Original file line number Diff line number Diff line change
@@ -1,64 +1,73 @@
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, NewType, Type, TypeVar

from datahub.ingestion.api.committable import CommitPolicy
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.committable import CommitPolicy, StatefulCommittable
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_state_provider import (
IngestionJobStateProvider,
IngestionJobStateProviderConfig,
JobId,
JobStateFilterType,
JobStateKey,
JobStatesMap,
)
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass

#
# Common type exports
#
JobId = JobId
JobStateKey = JobStateKey
JobStateFilterType = JobStateFilterType

#
# Checkpoint state specific types
#
JobId = NewType("JobId", str)
CheckpointJobStateType = DatahubIngestionCheckpointClass
CheckpointJobStatesMap = JobStatesMap[CheckpointJobStateType]
CheckpointJobStatesMap = Dict[JobId, CheckpointJobStateType]


@dataclass
class JobStateKey:
pipeline_name: str
platform_instance_id: str
job_names: List[JobId]

class IngestionCheckpointingProviderConfig(IngestionJobStateProviderConfig):

class IngestionCheckpointingProviderConfig(ConfigModel):
pass


_Self = TypeVar("_Self", bound="IngestionCheckpointingProviderBase")


@dataclass()
class IngestionCheckpointingProviderBase(
IngestionJobStateProvider[CheckpointJobStateType]
StatefulCommittable[JobStateKey, CheckpointJobStatesMap]
):
"""
The base class(non-abstract) for all checkpointing state provider implementations.
This class is implemented this way as a concrete class is needed to work with the registry,
but we don't want to implement any of the functionality yet.
The base class for all checkpointing state provider implementations.
"""

def __init__(
self, name: str, commit_policy: CommitPolicy = CommitPolicy.ON_NO_ERRORS
):
super(IngestionCheckpointingProviderBase, self).__init__(name, commit_policy)
# Set the initial state to an empty dict.
super().__init__(name, commit_policy, {})

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "IngestionJobStateProvider":
cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "_Self":
raise NotImplementedError("Sub-classes must override this method.")

def get_previous_states(
@abstractmethod
def get_last_state(
self,
state_key: JobStateKey,
last_only: bool = True,
filter_opt: Optional[JobStateFilterType] = None,
) -> List[CheckpointJobStatesMap]:
raise NotImplementedError("Sub-classes must override this method.")
) -> CheckpointJobStatesMap:
...

@abstractmethod
def commit(self) -> None:
raise NotImplementedError("Sub-classes must override this method.")
...

@staticmethod
def get_data_job_urn(
orchestrator: str,
pipeline_name: str,
job_name: JobId,
platform_instance_id: str,
) -> str:
"""
Standardizes datajob urn minting for all ingestion job state providers.
"""
return builder.make_data_job_urn(
orchestrator, f"{pipeline_name}_{platform_instance_id}", job_name
)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.ingestion_job_state_provider import JobId
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pydantic

from datahub.ingestion.api.ingestion_job_state_provider import JobId
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pydantic

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.ingestion_job_state_provider import JobId
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.checkpoint import Checkpoint, CheckpointStateBase
from datahub.ingestion.source.state.stateful_ingestion_base import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,12 @@ def _initialize_checkpointing_state_provider(self) -> None:
Dict[str, Any],
self.stateful_ingestion_config.state_provider.dict().get("config", {}),
)
self.ingestion_checkpointing_state_provider = checkpointing_state_provider_class.create( # type: ignore
config_dict=config_dict,
ctx=self.ctx,
name=checkpointing_state_provider_class.__name__,
self.ingestion_checkpointing_state_provider = (
checkpointing_state_provider_class.create(
config_dict=config_dict,
ctx=self.ctx,
name=checkpointing_state_provider_class.__name__,
)
)
assert self.ingestion_checkpointing_state_provider
if self.stateful_ingestion_config.ignore_old_state:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional

from datahub.configuration.common import ConfigurationError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -10,7 +10,6 @@
IngestionCheckpointingProviderBase,
IngestionCheckpointingProviderConfig,
JobId,
JobStateFilterType,
JobStateKey,
)
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
Expand Down Expand Up @@ -41,7 +40,7 @@ def __init__(self, graph: DataHubGraph, name: str):
@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> IngestionCheckpointingProviderBase:
) -> "DatahubIngestionCheckpointingProvider":
if ctx.graph:
# Use the pipeline-level graph if set
return cls(ctx.graph, name)
Expand Down Expand Up @@ -103,30 +102,19 @@ def get_latest_checkpoint(

return None

def get_previous_states(
def get_last_state(
self,
state_key: JobStateKey,
last_only: bool = True,
filter_opt: Optional[JobStateFilterType] = None,
) -> List[CheckpointJobStatesMap]:
if not last_only:
raise NotImplementedError(
"Currently supports retrieving only the last commited state."
)
if filter_opt is not None:
raise NotImplementedError(
"Support for optional filters is not implemented yet."
)
checkpoints: List[CheckpointJobStatesMap] = []
) -> CheckpointJobStatesMap:
last_job_checkpoint_map: CheckpointJobStatesMap = {}
for job_name in state_key.job_names:
last_job_checkpoint = self.get_latest_checkpoint(
state_key.pipeline_name, state_key.platform_instance_id, job_name
)
if last_job_checkpoint is not None:
last_job_checkpoint_map[job_name] = last_job_checkpoint
checkpoints.append(last_job_checkpoint_map)
return checkpoints

return last_job_checkpoint_map

def commit(self) -> None:
if not self.state_to_commit:
Expand Down
10 changes: 9 additions & 1 deletion metadata-ingestion/tests/test_helpers/type_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from _pytest.config import Config as PytestConfig # noqa: F401
from typing import Optional, TypeVar

# The current PytestConfig solution is somewhat ugly and not ideal.
# However, it is currently the best solution available, as the type itself is not
# exported: https://docs.pytest.org/en/stable/reference.html#config.
# As pytest's type support improves, this will likely change.
# TODO: revisit pytestconfig as https://github.com/pytest-dev/pytest/issues/7469 progresses.
from _pytest.config import Config as PytestConfig # noqa: F401

_T = TypeVar("_T")


def assert_not_null(value: Optional[_T]) -> _T:
assert value is not None, "value is unexpectedly None"
return value
Loading

0 comments on commit 2aa30e4

Please sign in to comment.