diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 2660bb606fbf9..62c6c4c8b00cd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -713,11 +713,9 @@ def get_last_checkpoint( if last_checkpoint is not None and is_conversion_required: # Map the BaseSQLAlchemyCheckpointState to DbtCheckpointState dbt_checkpoint_state: DbtCheckpointState = DbtCheckpointState() - dbt_checkpoint_state.encoded_node_urns = ( + dbt_checkpoint_state.urns = ( cast(BaseSQLAlchemyCheckpointState, last_checkpoint.state) - ).encoded_table_urns - # Old dbt source was not supporting the assertion - dbt_checkpoint_state.encoded_assertion_urns = [] + ).urns last_checkpoint.state = dbt_checkpoint_state return last_checkpoint diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index f82b1e6813497..d426cc1f0275b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -60,7 +60,7 @@ LookerAPI, LookerAPIConfig, ) -from datahub.ingestion.source.state.looker_state import LookerCheckpointState +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, StatefulStaleMetadataRemovalConfig, @@ -234,7 +234,7 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext): self.stale_entity_removal_handler = StaleEntityRemovalHandler( source=self, config=self.source_config, - state_type_class=LookerCheckpointState, + state_type_class=GenericCheckpointState, pipeline_name=self.ctx.pipeline_name, run_id=self.ctx.run_id, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index 3283ad18d01a1..42cd557df98e9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -47,7 +47,7 @@ LookerAPIConfig, TransportOptionsConfig, ) -from datahub.ingestion.source.state.lookml_state import LookMLCheckpointState +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, StaleEntityRemovalSourceReport, @@ -1089,7 +1089,7 @@ def __init__(self, config: LookMLSourceConfig, ctx: PipelineContext): self.stale_entity_removal_handler = StaleEntityRemovalHandler( source=self, config=self.source_config, - state_type_class=LookMLCheckpointState, + state_type_class=GenericCheckpointState, pipeline_name=self.ctx.pipeline_name, run_id=self.ctx.run_id, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py b/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py index 0df8c9ddce99c..b33d061e1a5b5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py @@ -1,5 +1,6 @@ import base64 import bz2 +import contextlib import functools import json import logging @@ -128,7 +129,9 @@ def create_from_checkpoint_aspect( ) elif checkpoint_aspect.state.serde == "base85": state_obj = Checkpoint._from_base85_bytes( - checkpoint_aspect, functools.partial(bz2.decompress) + checkpoint_aspect, + functools.partial(bz2.decompress), + state_class, ) elif checkpoint_aspect.state.serde == "base85-bz2-json": state_obj = Checkpoint._from_base85_json_bytes( @@ -177,11 +180,18 @@ def _from_utf8_bytes( def _from_base85_bytes( checkpoint_aspect: DatahubIngestionCheckpointClass, decompressor: Callable[[bytes], bytes], + state_class: Type[StateType], ) -> StateType: state: StateType = pickle.loads( decompressor(base64.b85decode(checkpoint_aspect.state.payload)) # type: ignore ) + with contextlib.suppress(Exception): + # When loading from pickle, the pydantic validators don't run. + # By re-serializing and re-parsing, we ensure that the state is valid. + # However, we also suppress any exceptions to make sure this doesn't blow up. + state = state_class.parse_obj(state.dict()) + # Because the base85 method is deprecated in favor of base85-bz2-json, # we will automatically switch the serde. state.serde = "base85-bz2-json" diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/dbt_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/dbt_state.py index df9561ba11e1c..00ab652eb97b0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/dbt_state.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/dbt_state.py @@ -1,102 +1,21 @@ -import logging -from typing import Callable, Dict, Iterable, List - -import pydantic - -from datahub.emitter.mce_builder import make_assertion_urn -from datahub.ingestion.source.state.stale_entity_removal_handler import ( - StaleEntityCheckpointStateBase, +from datahub.ingestion.source.state.entity_removal_state import ( + GenericCheckpointState, + pydantic_state_migrator, ) -from datahub.utilities.checkpoint_state_util import CheckpointStateUtil -from datahub.utilities.urns.urn import Urn -logger = logging.getLogger(__name__) - -class DbtCheckpointState(StaleEntityCheckpointStateBase["DbtCheckpointState"]): +class DbtCheckpointState(GenericCheckpointState): """ Class for representing the checkpoint state for DBT sources. Stores all nodes and assertions being ingested and is used to remove any stale entities. """ - encoded_node_urns: List[str] = pydantic.Field(default_factory=list) - encoded_assertion_urns: List[str] = pydantic.Field(default_factory=list) - - @classmethod - def get_supported_types(cls) -> List[str]: - return ["assertion", "dataset"] - - @staticmethod - def _get_assertion_lightweight_repr(assertion_urn: str) -> str: - """Reduces the amount of text in the URNs for smaller state footprint.""" - urn = Urn.create_from_string(assertion_urn) - key = urn.get_entity_id_as_string() - assert key is not None - return key - - def _add_assertion_urn(self, assertion_urn: str) -> None: - self.encoded_assertion_urns.append( - self._get_assertion_lightweight_repr(assertion_urn) - ) - - def _get_assertion_urns_not_in( - self, checkpoint: "DbtCheckpointState" - ) -> Iterable[str]: - """ - Dbt assertion are mapped to DataHub assertion concept - """ - difference = CheckpointStateUtil.get_encoded_urns_not_in( - self.encoded_assertion_urns, checkpoint.encoded_assertion_urns - ) - for key in difference: - yield make_assertion_urn(key) - - def _get_node_urns_not_in(self, checkpoint: "DbtCheckpointState") -> Iterable[str]: - """ - Dbt node are mapped to DataHub dataset concept - """ - yield from CheckpointStateUtil.get_dataset_urns_not_in( - self.encoded_node_urns, checkpoint.encoded_node_urns - ) - - def _add_node_urn(self, node_urn: str) -> None: - self.encoded_node_urns.append( - CheckpointStateUtil.get_dataset_lightweight_repr(node_urn) - ) - - def add_checkpoint_urn(self, type: str, urn: str) -> None: - supported_entities_add_handlers: Dict[str, Callable[[str], None]] = { - "dataset": self._add_node_urn, - "assertion": self._add_assertion_urn, + _migration = pydantic_state_migrator( + { + "encoded_node_urns": "dataset", + "encoded_assertion_urns": "assertion", } - - if type not in supported_entities_add_handlers: - logger.error(f"Can not save Unknown entity {type} to checkpoint.") - - supported_entities_add_handlers[type](urn) - - def get_urns_not_in( - self, type: str, other_checkpoint_state: "DbtCheckpointState" - ) -> Iterable[str]: - assert type in self.get_supported_types() - if type == "dataset": - yield from self._get_node_urns_not_in(other_checkpoint_state) - elif type == "assertion": - yield from self._get_assertion_urns_not_in(other_checkpoint_state) - - def get_percent_entities_changed( - self, old_checkpoint_state: "DbtCheckpointState" - ) -> float: - return StaleEntityCheckpointStateBase.compute_percent_entities_changed( - [ - (self.encoded_node_urns, old_checkpoint_state.encoded_node_urns), - ( - self.encoded_assertion_urns, - old_checkpoint_state.encoded_assertion_urns, - ), - ] - ) + ) def prepare_for_commit(self) -> None: - self.encoded_node_urns = list(set(self.encoded_node_urns)) - self.encoded_assertion_urns = list(set(self.encoded_assertion_urns)) + self.urns = list(set(self.urns)) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py new file mode 100644 index 0000000000000..7caf9ee7db766 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/state/entity_removal_state.py @@ -0,0 +1,76 @@ +from typing import Dict, Iterable, List, Type + +import pydantic + +from datahub.emitter.mce_builder import make_assertion_urn, make_container_urn +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityCheckpointStateBase, +) +from datahub.utilities.checkpoint_state_util import CheckpointStateUtil +from datahub.utilities.urns.urn import guess_entity_type + + +class GenericCheckpointState(StaleEntityCheckpointStateBase["GenericCheckpointState"]): + urns: List[str] = pydantic.Field(default_factory=list) + + @classmethod + def get_supported_types(cls) -> List[str]: + return ["*"] + + def add_checkpoint_urn(self, type: str, urn: str) -> None: + # TODO: dedup + self.urns.append(urn) + + def get_urns_not_in( + self, type: str, other_checkpoint_state: "GenericCheckpointState" + ) -> Iterable[str]: + diff = set(self.urns) - set(other_checkpoint_state.urns) + + # To maintain backwards compatibility, we provide this filtering mechanism. + if type == "*": + yield from diff + else: + yield from (urn for urn in diff if guess_entity_type(urn) == type) + + def get_percent_entities_changed( + self, old_checkpoint_state: "GenericCheckpointState" + ) -> float: + return StaleEntityCheckpointStateBase.compute_percent_entities_changed( + [(self.urns, old_checkpoint_state.urns)] + ) + + +def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod: + # mapping would be something like: + # { + # 'encoded_view_urns': 'dataset', + # 'encoded_container_urns': 'container', + # } + + SUPPORTED_TYPES = [ + "dataset", + "container", + "assertion", + ] + assert set(mapping.values()) <= set(SUPPORTED_TYPES) + + def _validate_field_rename(cls: Type, values: dict) -> dict: + values.setdefault("urns", []) + + for old_field, mapped_type in mapping.items(): + if old_field not in values: + continue + + value = values.pop(old_field) + if mapped_type == "dataset": + values["urns"] += CheckpointStateUtil.get_dataset_urns_not_in(value, []) + elif mapped_type == "container": + values["urns"] += [make_container_urn(guid) for guid in value] + elif mapped_type == "assertion": + values["urns"] += [make_assertion_urn(encoded) for encoded in value] + else: + raise ValueError(f"Unsupported type {mapped_type}") + + return values + + return pydantic.root_validator(pre=True, allow_reuse=True)(_validate_field_rename) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/looker_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/looker_state.py deleted file mode 100644 index dadca4a255cf3..0000000000000 --- a/metadata-ingestion/src/datahub/ingestion/source/state/looker_state.py +++ /dev/null @@ -1,46 +0,0 @@ -import logging -from typing import Iterable, List - -import pydantic - -from datahub.ingestion.source.state.stale_entity_removal_handler import ( - StaleEntityCheckpointStateBase, -) -from datahub.utilities.urns.urn import guess_entity_type - -logger = logging.getLogger(__name__) - - -class LookerCheckpointState(StaleEntityCheckpointStateBase["LookerCheckpointState"]): - """ - Class for representing the checkpoint state for Looker sources. - Stores all datasets, charts and dashboards being ingested and is - used to remove any stale entities. - """ - - urns: List[str] = pydantic.Field(default_factory=list) - - @classmethod - def get_supported_types(cls) -> List[str]: - return ["*"] - - def add_checkpoint_urn(self, type: str, urn: str) -> None: - self.urns.append(urn) - - def get_urns_not_in( - self, type: str, other_checkpoint_state: "LookerCheckpointState" - ) -> Iterable[str]: - diff = set(self.urns) - set(other_checkpoint_state.urns) - - # To maintain backwards compatibility, we provide this filtering mechanism. - if type == "*": - yield from diff - else: - yield from (urn for urn in diff if guess_entity_type(urn) == type) - - def get_percent_entities_changed( - self, old_checkpoint_state: "LookerCheckpointState" - ) -> float: - return StaleEntityCheckpointStateBase.compute_percent_entities_changed( - [(self.urns, old_checkpoint_state.urns)] - ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/lookml_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/lookml_state.py deleted file mode 100644 index 5a013bbbbb970..0000000000000 --- a/metadata-ingestion/src/datahub/ingestion/source/state/lookml_state.py +++ /dev/null @@ -1,46 +0,0 @@ -import logging -from typing import Iterable, List - -import pydantic - -from datahub.ingestion.source.state.stale_entity_removal_handler import ( - StaleEntityCheckpointStateBase, -) -from datahub.utilities.urns.urn import guess_entity_type - -logger = logging.getLogger(__name__) - - -class LookMLCheckpointState(StaleEntityCheckpointStateBase["LookMLCheckpointState"]): - """ - Class for representing the checkpoint state for Looker sources. - Stores all datasets, charts and dashboards being ingested and is - used to remove any stale entities. - """ - - urns: List[str] = pydantic.Field(default_factory=list) - - @classmethod - def get_supported_types(cls) -> List[str]: - return ["*"] - - def add_checkpoint_urn(self, type: str, urn: str) -> None: - self.urns.append(urn) - - def get_urns_not_in( - self, type: str, other_checkpoint_state: "LookMLCheckpointState" - ) -> Iterable[str]: - diff = set(self.urns) - set(other_checkpoint_state.urns) - - # To maintain backwards compatibility, we provide this filtering mechanism. - if type == "*": - yield from diff - else: - yield from (urn for urn in diff if guess_entity_type(urn) == type) - - def get_percent_entities_changed( - self, old_checkpoint_state: "LookMLCheckpointState" - ) -> float: - return StaleEntityCheckpointStateBase.compute_percent_entities_changed( - [(self.urns, old_checkpoint_state.urns)] - ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py index 13c3a6aeadd39..c2645a761f727 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py @@ -1,142 +1,21 @@ -from typing import Iterable, List - -import pydantic - -from datahub.emitter.mce_builder import ( - assertion_urn_to_key, - container_urn_to_key, - make_assertion_urn, - make_container_urn, -) -from datahub.ingestion.source.state.stale_entity_removal_handler import ( - StaleEntityCheckpointStateBase, +from datahub.ingestion.source.state.entity_removal_state import ( + GenericCheckpointState, + pydantic_state_migrator, ) -from datahub.utilities.checkpoint_state_util import CheckpointStateUtil -class BaseSQLAlchemyCheckpointState( - StaleEntityCheckpointStateBase["BaseSQLAlchemyCheckpointState"] -): +class BaseSQLAlchemyCheckpointState(GenericCheckpointState): """ Base class for representing the checkpoint state for all SQLAlchemy based sources. Stores all tables and views being ingested and is used to remove any stale entities. Subclasses can define additional state as appropriate. """ - encoded_table_urns: List[str] = pydantic.Field(default_factory=list) - encoded_view_urns: List[str] = pydantic.Field(default_factory=list) - encoded_container_urns: List[str] = pydantic.Field(default_factory=list) - encoded_assertion_urns: List[str] = pydantic.Field(default_factory=list) - - @classmethod - def get_supported_types(cls) -> List[str]: - return ["assertion", "container", "table", "view"] - - @staticmethod - def _get_lightweight_repr(dataset_urn: str) -> str: - """Reduces the amount of text in the URNs for smaller state footprint.""" - return CheckpointStateUtil.get_dataset_lightweight_repr(dataset_urn) - - @staticmethod - def _get_container_lightweight_repr(container_urn: str) -> str: - """Reduces the amount of text in the URNs for smaller state footprint.""" - key = container_urn_to_key(container_urn) - assert key is not None - return f"{key.guid}" - - @staticmethod - def _get_container_urns_not_in( - encoded_urns_1: List[str], encoded_urns_2: List[str] - ) -> Iterable[str]: - difference = CheckpointStateUtil.get_encoded_urns_not_in( - encoded_urns_1, encoded_urns_2 - ) - for guid in difference: - yield make_container_urn(guid) - - def _get_table_urns_not_in( - self, checkpoint: "BaseSQLAlchemyCheckpointState" - ) -> Iterable[str]: - """Tables are mapped to DataHub dataset concept.""" - yield from CheckpointStateUtil.get_dataset_urns_not_in( - self.encoded_table_urns, checkpoint.encoded_table_urns - ) - - def _get_view_urns_not_in( - self, checkpoint: "BaseSQLAlchemyCheckpointState" - ) -> Iterable[str]: - """Views are mapped to DataHub dataset concept.""" - yield from CheckpointStateUtil.get_dataset_urns_not_in( - self.encoded_view_urns, checkpoint.encoded_view_urns - ) - - def _get_assertion_urns_not_in( - self, checkpoint: "BaseSQLAlchemyCheckpointState" - ) -> Iterable[str]: - """Tables are mapped to DataHub dataset concept.""" - diff = CheckpointStateUtil.get_encoded_urns_not_in( - self.encoded_assertion_urns, checkpoint.encoded_assertion_urns - ) - for assertion_id in diff: - yield make_assertion_urn(assertion_id) - - def _add_table_urn(self, table_urn: str) -> None: - self.encoded_table_urns.append(self._get_lightweight_repr(table_urn)) - - def _add_assertion_urn(self, assertion_urn: str) -> None: - key = assertion_urn_to_key(assertion_urn) - assert key is not None - self.encoded_assertion_urns.append(key.assertionId) - - def _add_view_urn(self, view_urn: str) -> None: - self.encoded_view_urns.append(self._get_lightweight_repr(view_urn)) - - def _add_container_guid(self, container_urn: str) -> None: - self.encoded_container_urns.append( - self._get_container_lightweight_repr(container_urn) - ) - - def add_checkpoint_urn(self, type: str, urn: str) -> None: - assert type in self.get_supported_types() - if type == "assertion": - self._add_assertion_urn(urn) - elif type == "container": - self._add_container_guid(urn) - elif type == "table": - self._add_table_urn(urn) - elif type == "view": - self._add_view_urn(urn) - - def get_urns_not_in( - self, type: str, other_checkpoint_state: "BaseSQLAlchemyCheckpointState" - ) -> Iterable[str]: - assert type in self.get_supported_types() - if type == "assertion": - yield from self._get_assertion_urns_not_in(other_checkpoint_state) - if type == "container": - yield from self._get_container_urns_not_in( - self.encoded_container_urns, - other_checkpoint_state.encoded_container_urns, - ) - elif type == "table": - yield from self._get_table_urns_not_in(other_checkpoint_state) - elif type == "view": - yield from self._get_view_urns_not_in(other_checkpoint_state) - - def get_percent_entities_changed( - self, old_checkpoint_state: "BaseSQLAlchemyCheckpointState" - ) -> float: - return StaleEntityCheckpointStateBase.compute_percent_entities_changed( - [ - ( - self.encoded_assertion_urns, - old_checkpoint_state.encoded_assertion_urns, - ), - ( - self.encoded_container_urns, - old_checkpoint_state.encoded_container_urns, - ), - (self.encoded_table_urns, old_checkpoint_state.encoded_table_urns), - (self.encoded_view_urns, old_checkpoint_state.encoded_view_urns), - ] - ) + _migration = pydantic_state_migrator( + { + "encoded_table_urns": "dataset", + "encoded_view_urns": "dataset", + "encoded_container_urns": "container", + "encoded_assertion_urns": "assertion", + } + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/tableau_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/tableau_state.py index 310c86fa50cbe..ae935b2d2cb09 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/tableau_state.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/tableau_state.py @@ -1,46 +1,9 @@ -import logging -from typing import Iterable, List +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState -import pydantic -from datahub.ingestion.source.state.stale_entity_removal_handler import ( - StaleEntityCheckpointStateBase, -) -from datahub.utilities.urns.urn import guess_entity_type - -logger = logging.getLogger(__name__) - - -class TableauCheckpointState(StaleEntityCheckpointStateBase["TableauCheckpointState"]): +class TableauCheckpointState(GenericCheckpointState): """ Class for representing the checkpoint state for Tableau sources. Stores all datasets, charts and dashboards being ingested and is used to remove any stale entities. """ - - urns: List[str] = pydantic.Field(default_factory=list) - - @classmethod - def get_supported_types(cls) -> List[str]: - return ["*"] - - def add_checkpoint_urn(self, type: str, urn: str) -> None: - self.urns.append(urn) - - def get_urns_not_in( - self, type: str, other_checkpoint_state: "TableauCheckpointState" - ) -> Iterable[str]: - diff = set(self.urns) - set(other_checkpoint_state.urns) - - # To maintain backwards compatibility, we provide this filtering mechanism. - if type == "*": - yield from diff - else: - yield from (urn for urn in diff if guess_entity_type(urn) == type) - - def get_percent_entities_changed( - self, old_checkpoint_state: "TableauCheckpointState" - ) -> float: - return StaleEntityCheckpointStateBase.compute_percent_entities_changed( - [(self.urns, old_checkpoint_state.urns)] - ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state_handler/__init__.py b/metadata-ingestion/src/datahub_provider/py.typed similarity index 100% rename from metadata-ingestion/src/datahub/ingestion/source/state_handler/__init__.py rename to metadata-ingestion/src/datahub_provider/py.typed diff --git a/metadata-ingestion/tests/integration/dbt/test_dbt.py b/metadata-ingestion/tests/integration/dbt/test_dbt.py index 2ae4097dad226..74477245b728f 100644 --- a/metadata-ingestion/tests/integration/dbt/test_dbt.py +++ b/metadata-ingestion/tests/integration/dbt/test_dbt.py @@ -360,7 +360,7 @@ def test_dbt_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph): state1 = cast(DbtCheckpointState, checkpoint1.state) state2 = cast(DbtCheckpointState, checkpoint2.state) difference_urns = list( - state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) + state1.get_urns_not_in(type="*", other_checkpoint_state=state2) ) assert len(difference_urns) == 2 diff --git a/metadata-ingestion/tests/integration/looker/test_looker.py b/metadata-ingestion/tests/integration/looker/test_looker.py index 537303aa4f32c..1237b174e2e73 100644 --- a/metadata-ingestion/tests/integration/looker/test_looker.py +++ b/metadata-ingestion/tests/integration/looker/test_looker.py @@ -29,7 +29,7 @@ ) from datahub.ingestion.source.looker.looker_source import LookerDashboardSource from datahub.ingestion.source.state.checkpoint import Checkpoint -from datahub.ingestion.source.state.looker_state import LookerCheckpointState +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import ( validate_all_providers_have_committed_successfully, @@ -689,8 +689,8 @@ def looker_source_config(sink_file_name): # Perform all assertions on the states. The deleted table should not be # part of the second state - state1 = cast(LookerCheckpointState, checkpoint1.state) - state2 = cast(LookerCheckpointState, checkpoint2.state) + state1 = cast(GenericCheckpointState, checkpoint1.state) + state2 = cast(GenericCheckpointState, checkpoint2.state) difference_dataset_urns = list( state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 33ddced1ddc53..4055ac1002e0d 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -10,7 +10,7 @@ from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.looker.lookml_source import LookMLSource from datahub.ingestion.source.state.checkpoint import Checkpoint -from datahub.ingestion.source.state.lookml_state import LookMLCheckpointState +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from datahub.metadata.schema_classes import ( DatasetSnapshotClass, MetadataChangeEventClass, @@ -624,8 +624,8 @@ def test_lookml_ingest_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_ # Perform all assertions on the states. The deleted table should not be # part of the second state - state1 = cast(LookMLCheckpointState, checkpoint1.state) - state2 = cast(LookMLCheckpointState, checkpoint2.state) + state1 = cast(GenericCheckpointState, checkpoint1.state) + state2 = cast(GenericCheckpointState, checkpoint2.state) difference_dataset_urns = list( state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_sql_common_state.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_sql_common_state.py index 001ed969002fa..d91c80abe245d 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_sql_common_state.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_sql_common_state.py @@ -16,15 +16,15 @@ def test_sql_common_state() -> None: state2 = BaseSQLAlchemyCheckpointState() - table_urns_diff = list( - state1.get_urns_not_in(type="table", other_checkpoint_state=state2) + dataset_urns_diff = list( + state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) ) - assert len(table_urns_diff) == 1 and table_urns_diff[0] == test_table_urn - - view_urns_diff = list( - state1.get_urns_not_in(type="view", other_checkpoint_state=state2) + assert len(dataset_urns_diff) == 2 and sorted(dataset_urns_diff) == sorted( + [ + test_table_urn, + test_view_urn, + ] ) - assert len(view_urns_diff) == 1 and view_urns_diff[0] == test_view_urn container_urns_diff = list( state1.get_urns_not_in(type="container", other_checkpoint_state=state2) @@ -32,3 +32,22 @@ def test_sql_common_state() -> None: assert ( len(container_urns_diff) == 1 and container_urns_diff[0] == test_container_urn ) + + +def test_backward_compat() -> None: + state = BaseSQLAlchemyCheckpointState.parse_obj( + dict( + encoded_table_urns=["mysql||db1.t1||PROD"], + encoded_view_urns=["mysql||db1.v1||PROD"], + encoded_container_urns=["1154d1da73a95376c9f33f47694cf1de"], + encoded_assertion_urns=["815963e1332b46a203504ba46ebfab24"], + ) + ) + assert state == BaseSQLAlchemyCheckpointState( + urns=[ + "urn:li:dataset:(urn:li:dataPlatform:mysql,db1.t1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:mysql,db1.v1,PROD)", + "urn:li:container:1154d1da73a95376c9f33f47694cf1de", + "urn:li:assertion:815963e1332b46a203504ba46ebfab24", + ] + ) diff --git a/metadata-ingestion/tests/unit/test_glue_source.py b/metadata-ingestion/tests/unit/test_glue_source.py index 9d6e64f326e34..0acd744e75fba 100644 --- a/metadata-ingestion/tests/unit/test_glue_source.py +++ b/metadata-ingestion/tests/unit/test_glue_source.py @@ -327,7 +327,7 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph): state1 = cast(BaseSQLAlchemyCheckpointState, checkpoint1.state) state2 = cast(BaseSQLAlchemyCheckpointState, checkpoint2.state) difference_urns = list( - state1.get_urns_not_in(type="table", other_checkpoint_state=state2) + state1.get_urns_not_in(type="*", other_checkpoint_state=state2) ) assert len(difference_urns) == 1