Skip to content

Commit

Permalink
fix(ingestion): fix AssertionError in base_transformer (#7702)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio Gomez Villamor <[email protected]>
  • Loading branch information
sgomezvillamor and Sergio Gomez Villamor authored Mar 29, 2023
1 parent d7ea162 commit 2580847
Showing 1 changed file with 4 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Type, Union
from typing import Any, Dict, Iterable, List, Optional, Union

import datahub.emitter.mce_builder as builder
from datahub.emitter.aspect import ASPECT_MAP
Expand All @@ -9,13 +9,10 @@
from datahub.ingestion.api.common import ControlRecord, EndOfStream, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.metadata.schema_classes import (
DataFlowSnapshotClass,
DataJobSnapshotClass,
DatasetSnapshotClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
)
from datahub.utilities.urns.urn import Urn
from datahub.utilities.urns.urn import Urn, guess_entity_type

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,11 +53,6 @@ def entity_types(self) -> List[str]:

def __init__(self):
self.entity_map: Dict[str, Dict[str, Any]] = {}
self.entity_type_mappings: Dict[str, Type] = {
"dataset": DatasetSnapshotClass,
"dataFlow": DataFlowSnapshotClass,
"dataJob": DataJobSnapshotClass,
}
mixedin = False
for mixin in [LegacyMCETransformer, SingleAspectTransformer]:
mixedin = mixedin or isinstance(self, mixin)
Expand All @@ -83,14 +75,8 @@ def _should_process(
if "*" in entity_types:
return True
if isinstance(record, MetadataChangeEventClass):
for e in entity_types:
assert (
e in self.entity_type_mappings
), f"Do not have a class mapping for {e}. Subscription to this entity will not work for transforming MCE-s"
if isinstance(record.proposedSnapshot, self.entity_type_mappings[e]):
return True
# fall through, no entity type matched
return False
entity_type = guess_entity_type(record.proposedSnapshot.urn)
return entity_type in entity_types
elif isinstance(
record, (MetadataChangeProposalWrapper, MetadataChangeProposalClass)
):
Expand Down

0 comments on commit 2580847

Please sign in to comment.