diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index 25c6e632b6959..78686579d0021 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -1,6 +1,6 @@ import logging from abc import ABCMeta, abstractmethod -from typing import Any, Dict, Iterable, List, Optional, Type, Union +from typing import Any, Dict, Iterable, List, Optional, Union import datahub.emitter.mce_builder as builder from datahub.emitter.aspect import ASPECT_MAP @@ -9,13 +9,10 @@ from datahub.ingestion.api.common import ControlRecord, EndOfStream, RecordEnvelope from datahub.ingestion.api.transform import Transformer from datahub.metadata.schema_classes import ( - DataFlowSnapshotClass, - DataJobSnapshotClass, - DatasetSnapshotClass, MetadataChangeEventClass, MetadataChangeProposalClass, ) -from datahub.utilities.urns.urn import Urn +from datahub.utilities.urns.urn import Urn, guess_entity_type log = logging.getLogger(__name__) @@ -56,11 +53,6 @@ def entity_types(self) -> List[str]: def __init__(self): self.entity_map: Dict[str, Dict[str, Any]] = {} - self.entity_type_mappings: Dict[str, Type] = { - "dataset": DatasetSnapshotClass, - "dataFlow": DataFlowSnapshotClass, - "dataJob": DataJobSnapshotClass, - } mixedin = False for mixin in [LegacyMCETransformer, SingleAspectTransformer]: mixedin = mixedin or isinstance(self, mixin) @@ -83,14 +75,8 @@ def _should_process( if "*" in entity_types: return True if isinstance(record, MetadataChangeEventClass): - for e in entity_types: - assert ( - e in self.entity_type_mappings - ), f"Do not have a class mapping for {e}. Subscription to this entity will not work for transforming MCE-s" - if isinstance(record.proposedSnapshot, self.entity_type_mappings[e]): - return True - # fall through, no entity type matched - return False + entity_type = guess_entity_type(record.proposedSnapshot.urn) + return entity_type in entity_types elif isinstance( record, (MetadataChangeProposalWrapper, MetadataChangeProposalClass) ):