diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 8ab979ef7a719..4a90d4b05fbc2 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -22,7 +22,8 @@ from pydantic import BaseModel from datahub.configuration.common import ConfigModel -from datahub.emitter.mcp_builder import mcps_from_mce +from datahub.configuration.source_common import PlatformInstanceConfigMixin +from datahub.emitter.mcp_builder import PlatformKey, mcps_from_mce from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit from datahub.ingestion.api.report import Report @@ -187,26 +188,15 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config and self.ctx.pipeline_config.flags.generate_browse_path_v2 ): - platform = getattr(self, "platform", None) or getattr( - self.get_config(), "platform", None - ) - env = getattr(self.get_config(), "env", None) - browse_path_drop_dirs = [ - platform, - platform and platform.lower(), - env, - env and env.lower(), - ] - browse_path_processor = partial( - auto_browse_path_v2, - [s for s in browse_path_drop_dirs if s is not None], + browse_path_processor = self._get_browse_path_processor( + self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run ) return [ auto_status_aspect, auto_materialize_referenced_tags, - partial(auto_workunit_reporter, self.get_report()), browse_path_processor, + partial(auto_workunit_reporter, self.get_report()), ] @staticmethod @@ -248,6 +238,34 @@ def get_report(self) -> SourceReport: def close(self) -> None: pass + def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor: + config = self.get_config() + platform = getattr(self, "platform", None) or getattr(config, "platform", None) + env = getattr(config, "env", None) + browse_path_drop_dirs = [ + platform, + platform and platform.lower(), + env, + env and env.lower(), + ] + + platform_key: Optional[PlatformKey] = None + if ( + platform + and isinstance(config, PlatformInstanceConfigMixin) + and config.platform_instance + ): + platform_key = PlatformKey( + platform=platform, instance=config.platform_instance + ) + + return partial( + auto_browse_path_v2, + platform_key=platform_key, + drop_dirs=[s for s in browse_path_drop_dirs if s is not None], + dry_run=dry_run, + ) + class TestableSource(Source): @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 7951f37c98076..2e6e002a55cbe 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,5 +1,4 @@ import logging -from collections import defaultdict from typing import ( TYPE_CHECKING, Callable, @@ -9,11 +8,14 @@ Optional, Sequence, Set, + Tuple, TypeVar, Union, ) +from datahub.emitter.mce_builder import make_dataplatform_instance_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import PlatformKey from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( BrowsePathEntryClass, @@ -25,6 +27,7 @@ StatusClass, TagKeyClass, ) +from datahub.telemetry import telemetry from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import guess_entity_type from datahub.utilities.urns.urn_iter import list_urns @@ -166,68 +169,136 @@ def auto_materialize_referenced_tags( def auto_browse_path_v2( - drop_dirs: Sequence[str], stream: Iterable[MetadataWorkUnit], + *, + dry_run: bool = False, + drop_dirs: Sequence[str] = (), + platform_key: Optional[PlatformKey] = None, ) -> Iterable[MetadataWorkUnit]: - """Generate BrowsePathsV2 from Container and BrowsePaths aspects.""" + """Generate BrowsePathsV2 from Container and BrowsePaths aspects. - ignore_urns: Set[str] = set() - legacy_browse_paths: Dict[str, List[str]] = defaultdict(list) - container_urns: Set[str] = set() - parent_container_map: Dict[str, str] = {} - children: Dict[str, List[str]] = defaultdict(list) - for wu in stream: - yield wu + Generates browse paths v2 on demand, rather than waiting for end of ingestion, + for better UI experience while ingestion is running. - urn = wu.get_urn() - if guess_entity_type(urn) == "container": - container_urns.add(urn) - - container_aspects = wu.get_aspects_of_type(ContainerClass) - for c_aspect in container_aspects: - parent = c_aspect.container - parent_container_map[urn] = parent - children[parent].append(urn) - - browse_path_aspects = wu.get_aspects_of_type(BrowsePathsClass) - for b_aspect in browse_path_aspects: - if b_aspect.paths: - path = b_aspect.paths[0] # Only take first path - legacy_browse_paths[urn] = [ - p for p in path.strip("/").split("/") if p.strip() not in drop_dirs + To do this, assumes entities in container structure arrive in topological order + and that all relevant aspects (Container, BrowsePaths, BrowsePathsV2) for an urn + arrive together in a batch. + + Calculates the correct BrowsePathsV2 at end of workunit stream, + and emits "corrections", i.e. a final BrowsePathsV2 for any urns that have changed. + """ + + # For telemetry, to see if our sources violate assumptions + num_out_of_order = 0 + num_out_of_batch = 0 + + # Set for all containers and urns with a Container aspect + # Used to construct container paths while iterating through stream + # Assumes topological order of entities in stream + paths: Dict[str, List[BrowsePathEntryClass]] = {} + + emitted_urns: Set[str] = set() + containers_used_as_parent: Set[str] = set() + for urn, batch in _batch_workunits_by_urn(stream): + container_path: Optional[List[BrowsePathEntryClass]] = None + legacy_path: Optional[List[BrowsePathEntryClass]] = None + has_browse_path_v2 = False + + for wu in batch: + yield wu + if not wu.is_primary_source: + continue + + container_aspect = wu.get_aspect_of_type(ContainerClass) + if container_aspect: + parent_urn = container_aspect.container + containers_used_as_parent.add(parent_urn) + paths[urn] = [ + *paths.setdefault(parent_urn, []), # Guess parent has no parents + BrowsePathEntryClass(id=parent_urn, urn=parent_urn), ] + container_path = paths[urn] + + if urn in containers_used_as_parent: + # Topological order invariant violated; we've used the previous value of paths[urn] + # TODO: Add sentry alert + num_out_of_order += 1 + + browse_path_aspect = wu.get_aspect_of_type(BrowsePathsClass) + if browse_path_aspect and browse_path_aspect.paths: + legacy_path = [ + BrowsePathEntryClass(id=p.strip()) + for p in browse_path_aspect.paths[0].strip("/").split("/") + if p.strip() and p.strip() not in drop_dirs + ] + + if wu.get_aspect_of_type(BrowsePathsV2Class): + has_browse_path_v2 = True + + path = container_path or legacy_path + if (path is not None or has_browse_path_v2) and urn in emitted_urns: + # Batch invariant violated + # TODO: Add sentry alert + num_out_of_batch += 1 + elif has_browse_path_v2: + emitted_urns.add(urn) + elif path is not None: + emitted_urns.add(urn) + if not dry_run: + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=BrowsePathsV2Class( + path=_prepend_platform_instance(path, platform_key) + ), + ).as_workunit() + elif urn not in emitted_urns and guess_entity_type(urn) == "container": + # Root containers have no Container aspect, so they are not handled above + emitted_urns.add(urn) + if not dry_run: + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=BrowsePathsV2Class( + path=_prepend_platform_instance([], platform_key) + ), + ).as_workunit() + + if num_out_of_batch or num_out_of_order: + properties = { + "platform": platform_key.platform if platform_key else None, + "has_platform_instance": bool(platform_key.instance) + if platform_key + else False, + "num_out_of_batch": num_out_of_batch, + "num_out_of_order": num_out_of_order, + } + telemetry.telemetry_instance.ping("incorrect_browse_path_v2", properties) + + +def _batch_workunits_by_urn( + stream: Iterable[MetadataWorkUnit], +) -> Iterable[Tuple[str, List[MetadataWorkUnit]]]: + batch: List[MetadataWorkUnit] = [] + batch_urn: Optional[str] = None + for wu in stream: + if wu.get_urn() != batch_urn: + if batch_urn is not None: + yield batch_urn, batch + batch = [] - if wu.get_aspects_of_type(BrowsePathsV2Class): - ignore_urns.add(urn) + batch.append(wu) + batch_urn = wu.get_urn() - paths: Dict[str, List[str]] = {} # Maps urn -> list of urns in path - # Yield browse paths v2 in topological order, starting with root containers - processed_urns = set() - nodes = container_urns - parent_container_map.keys() - while nodes: - node = nodes.pop() - nodes.update(children[node]) + if batch_urn is not None: + yield batch_urn, batch - if node not in parent_container_map: # root - paths[node] = [] - else: - parent = parent_container_map[node] - paths[node] = [*paths[parent], parent] - if node not in ignore_urns: - yield MetadataChangeProposalWrapper( - entityUrn=node, - aspect=BrowsePathsV2Class( - path=[BrowsePathEntryClass(id=urn, urn=urn) for urn in paths[node]] - ), - ).as_workunit() - processed_urns.add(node) - - # Yield browse paths v2 based on browse paths v1 (legacy) - # Only done if the entity is not part of a container hierarchy - for urn in legacy_browse_paths.keys() - processed_urns - ignore_urns: - yield MetadataChangeProposalWrapper( - entityUrn=urn, - aspect=BrowsePathsV2Class( - path=[BrowsePathEntryClass(id=p) for p in legacy_browse_paths[urn]] - ), - ).as_workunit() + +def _prepend_platform_instance( + entries: List[BrowsePathEntryClass], platform_key: Optional[PlatformKey] +) -> List[BrowsePathEntryClass]: + if platform_key and platform_key.instance: + urn = make_dataplatform_instance_urn( + platform_key.platform, platform_key.instance + ) + return [BrowsePathEntryClass(id=urn, urn=urn)] + entries + + return entries diff --git a/metadata-ingestion/src/datahub/ingestion/api/workunit.py b/metadata-ingestion/src/datahub/ingestion/api/workunit.py index 3a80b900f1444..8eea3514a22af 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/workunit.py +++ b/metadata-ingestion/src/datahub/ingestion/api/workunit.py @@ -1,5 +1,6 @@ +import logging from dataclasses import dataclass -from typing import Iterable, List, Optional, Type, TypeVar, Union, overload +from typing import Iterable, Optional, Type, TypeVar, Union, overload from deprecated import deprecated @@ -11,6 +12,8 @@ ) from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect +logger = logging.getLogger(__name__) + T_Aspect = TypeVar("T_Aspect", bound=_Aspect) @@ -90,7 +93,7 @@ def get_urn(self) -> str: assert self.metadata.entityUrn return self.metadata.entityUrn - def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]: + def get_aspect_of_type(self, aspect_cls: Type[T_Aspect]) -> Optional[T_Aspect]: aspects: list if isinstance(self.metadata, MetadataChangeEvent): aspects = self.metadata.proposedSnapshot.aspects @@ -109,7 +112,10 @@ def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]: else: raise ValueError(f"Unexpected type {type(self.metadata)}") - return [a for a in aspects if isinstance(a, aspect_cls)] + aspects = [a for a in aspects if isinstance(a, aspect_cls)] + if len(aspects) > 1: + logger.warning(f"Found multiple aspects of type {aspect_cls} in MCE {self}") + return aspects[-1] if aspects else None def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]: from datahub.emitter.mcp_builder import mcps_from_mce diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 0ae2cf307c60b..83b73d144ec52 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -45,10 +45,18 @@ class FlagsConfig(ConfigModel): """ generate_browse_path_v2: bool = Field( - default=False, + default=True, description="Generate BrowsePathsV2 aspects from container hierarchy and existing BrowsePaths aspects.", ) + generate_browse_path_v2_dry_run: bool = Field( + default=True, + description=( + "Run through browse paths v2 generation but do not actually write the aspects to DataHub. " + "Requires `generate_browse_path_v2` to also be enabled." + ), + ) + class PipelineConfig(ConfigModel): # Once support for discriminated unions gets merged into Pydantic, we can @@ -58,7 +66,7 @@ class PipelineConfig(ConfigModel): source: SourceConfig sink: DynamicTypedConfig transformers: Optional[List[DynamicTypedConfig]] - flags: FlagsConfig = Field(default=FlagsConfig()) + flags: FlagsConfig = Field(default=FlagsConfig(), hidden_from_docs=True) reporting: List[ReporterConfig] = [] run_id: str = DEFAULT_RUN_ID datahub_api: Optional[DatahubClientConfig] = None diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index f8ac967af22c5..9f88f0d635958 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -1,8 +1,14 @@ from typing import Any, Dict, Iterable, List, Union +from unittest.mock import patch import datahub.metadata.schema_classes as models -from datahub.emitter.mce_builder import make_container_urn, make_dataset_urn +from datahub.emitter.mce_builder import ( + make_container_urn, + make_dataplatform_instance_urn, + make_dataset_urn, +) from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import PlatformKey from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, auto_status_aspect, @@ -82,22 +88,38 @@ def test_auto_status_aspect(): assert list(auto_status_aspect(initial_wu)) == expected -def _create_container_aspects(d: Dict[str, Any]) -> Iterable[MetadataWorkUnit]: +def _create_container_aspects( + d: Dict[str, Any], + other_aspects: Dict[str, List[models._Aspect]] = {}, + root: bool = True, +) -> Iterable[MetadataWorkUnit]: for k, v in d.items(): + urn = make_container_urn(k) yield MetadataChangeProposalWrapper( - entityUrn=make_container_urn(k), - aspect=models.StatusClass(removed=False), + entityUrn=urn, aspect=models.StatusClass(removed=False) ).as_workunit() + for aspect in other_aspects.pop(k, []): + yield MetadataChangeProposalWrapper( + entityUrn=urn, aspect=aspect + ).as_workunit() + for child in list(v): yield MetadataChangeProposalWrapper( entityUrn=make_container_urn(child), - aspect=models.ContainerClass( - container=make_container_urn(k), - ), + aspect=models.ContainerClass(container=urn), ).as_workunit() if isinstance(v, dict): - yield from _create_container_aspects(v) + yield from _create_container_aspects( + v, other_aspects=other_aspects, root=False + ) + + if root: + for k, v in other_aspects.items(): + for aspect in v: + yield MetadataChangeProposalWrapper( + entityUrn=make_container_urn(k), aspect=aspect + ).as_workunit() def _make_container_browse_path_entries( @@ -118,14 +140,15 @@ def _get_browse_paths_from_wu( ) -> Dict[str, List[models.BrowsePathEntryClass]]: paths = {} for wu in stream: - browse_path_v2 = wu.get_aspects_of_type(models.BrowsePathsV2Class) + browse_path_v2 = wu.get_aspect_of_type(models.BrowsePathsV2Class) if browse_path_v2: name = wu.get_urn().split(":")[-1] - paths[name] = browse_path_v2[0].path + paths[name] = browse_path_v2.path return paths -def test_auto_browse_path_v2_by_container_hierarchy(): +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_by_container_hierarchy(telemetry_ping_mock): structure = { "one": { "a": {"i": ["1", "2", "3"], "ii": ["4"]}, @@ -140,12 +163,13 @@ def test_auto_browse_path_v2_by_container_hierarchy(): wus = list(auto_status_aspect(_create_container_aspects(structure))) assert ( # Sanity check - sum(len(wu.get_aspects_of_type(models.StatusClass)) for wu in wus) == 21 + sum(bool(wu.get_aspect_of_type(models.StatusClass)) for wu in wus) == 21 ) - new_wus = list(auto_browse_path_v2([], wus)) + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list assert ( - sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) == 21 ) @@ -159,38 +183,53 @@ def test_auto_browse_path_v2_by_container_hierarchy(): assert paths["d"] == _make_container_browse_path_entries(["three"]) assert paths["i"] == _make_container_browse_path_entries(["one", "a"]) + # Check urns emitted on demand -- not all at end + for urn in set(wu.get_urn() for wu in new_wus): + try: + idx = next( + i + for i, wu in enumerate(new_wus) + if wu.get_aspect_of_type(models.ContainerClass) and wu.get_urn() == urn + ) + except StopIteration: + idx = next( + i + for i, wu in enumerate(new_wus) + if wu.get_aspect_of_type(models.StatusClass) and wu.get_urn() == urn + ) + assert new_wus[idx + 1].get_aspect_of_type( + models.BrowsePathsV2Class + ) or new_wus[idx + 2].get_aspect_of_type(models.BrowsePathsV2Class) + -def test_auto_browse_path_v2_ignores_urns_already_with(): +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_ignores_urns_already_with(telemetry_ping_mock): structure = {"a": {"b": {"c": {"d": ["e"]}}}} - mcps = [ - *MetadataChangeProposalWrapper.construct_many( - entityUrn=make_container_urn("f"), - aspects=[ - models.BrowsePathsClass(paths=["/one/two"]), - models.BrowsePathsV2Class( - path=_make_browse_path_entries(["my", "path"]) - ), - ], - ), - MetadataChangeProposalWrapper( - entityUrn=make_container_urn("c"), - aspect=models.BrowsePathsV2Class( - path=_make_container_browse_path_entries(["custom", "path"]) - ), - ), - ] wus = [ *auto_status_aspect( - [ - *_create_container_aspects(structure), - *(mcp.as_workunit() for mcp in mcps), - ] + _create_container_aspects( + structure, + other_aspects={ + "f": [ + models.BrowsePathsClass(paths=["/one/two"]), + models.BrowsePathsV2Class( + path=_make_browse_path_entries(["my", "path"]) + ), + ], + "c": [ + models.BrowsePathsV2Class( + path=_make_container_browse_path_entries(["custom", "path"]) + ) + ], + }, + ), ) ] - new_wus = list(auto_browse_path_v2([], wus)) + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list assert ( - sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) == 6 ) @@ -201,7 +240,8 @@ def test_auto_browse_path_v2_ignores_urns_already_with(): assert paths["f"] == _make_browse_path_entries(["my", "path"]) -def test_auto_browse_path_v2_legacy_browse_path(): +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock): platform = "platform" env = "PROD" wus = [ @@ -218,7 +258,8 @@ def test_auto_browse_path_v2_legacy_browse_path(): aspect=models.BrowsePathsClass([f"/{platform}/one/two"]), ).as_workunit(), ] - new_wus = list(auto_browse_path_v2(["platform", "PROD", "unused"], wus)) + new_wus = list(auto_browse_path_v2(wus, drop_dirs=["platform", "PROD", "unused"])) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list assert len(new_wus) == 6 paths = _get_browse_paths_from_wu(new_wus) assert ( @@ -229,22 +270,21 @@ def test_auto_browse_path_v2_legacy_browse_path(): assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"]) -def test_auto_browse_path_v2_container_over_legacy_browse_path(): +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock): structure = {"a": {"b": ["c"]}} wus = list( auto_status_aspect( - [ - *_create_container_aspects(structure), - MetadataChangeProposalWrapper( - entityUrn=make_container_urn("b"), - aspect=models.BrowsePathsClass(paths=["/one/two"]), - ).as_workunit(), - ] + _create_container_aspects( + structure, + other_aspects={"b": [models.BrowsePathsClass(paths=["/one/two"])]}, + ), ) ) - new_wus = list(auto_browse_path_v2([], wus)) + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list assert ( - sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) == 3 ) @@ -252,3 +292,115 @@ def test_auto_browse_path_v2_container_over_legacy_browse_path(): assert paths["a"] == [] assert paths["b"] == _make_container_browse_path_entries(["a"]) assert paths["c"] == _make_container_browse_path_entries(["a", "b"]) + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_with_platform_instsance(telemetry_ping_mock): + platform = "my_platform" + platform_instance = "my_instance" + platform_key = PlatformKey(platform=platform, instance=platform_instance) + platform_instance_urn = make_dataplatform_instance_urn(platform, platform_instance) + platform_instance_entry = models.BrowsePathEntryClass( + platform_instance_urn, platform_instance_urn + ) + + structure = {"a": {"b": ["c"]}} + wus = list(auto_status_aspect(_create_container_aspects(structure))) + + new_wus = list( + auto_browse_path_v2( + wus, + platform_key=platform_key, + ) + ) + assert telemetry_ping_mock.call_count == 0 + + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 3 + ) + paths = _get_browse_paths_from_wu(new_wus) + assert paths["a"] == [platform_instance_entry] + assert paths["b"] == [ + platform_instance_entry, + *_make_container_browse_path_entries(["a"]), + ] + assert paths["c"] == [ + platform_instance_entry, + *_make_container_browse_path_entries(["a", "b"]), + ] + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_invalid_batch_telemetry(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + b_urn = make_container_urn("b") + wus = [ + *_create_container_aspects(structure), + MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect + entityUrn=b_urn, + aspect=models.BrowsePathsClass(paths=["/one/two"]), + ).as_workunit(), + ] + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + _ = list(auto_browse_path_v2(wus)) + assert telemetry_ping_mock.call_count == 1 + assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 0 + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 1 + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_no_invalid_batch_telemetry_for_unrelated_aspects( + telemetry_ping_mock, +): + structure = {"a": {"b": ["c"]}} + b_urn = make_container_urn("b") + wus = [ + *_create_container_aspects(structure), + MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect + entityUrn=b_urn, + aspect=models.ContainerPropertiesClass("container name"), + ).as_workunit(), + ] + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + _ = list(auto_browse_path_v2(wus)) + assert telemetry_ping_mock.call_count == 0 + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_invalid_order_telemetry(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + wus = list(reversed(list(_create_container_aspects(structure)))) + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + new_wus = list(auto_browse_path_v2(wus)) + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + > 0 + ) + assert telemetry_ping_mock.call_count == 1 + assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 1 + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 0 + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_dry_run(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + wus = list(reversed(list(_create_container_aspects(structure)))) + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + new_wus = list(auto_browse_path_v2(wus, dry_run=True)) + assert wus == new_wus + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 0 + ) + assert telemetry_ping_mock.call_count == 1 diff --git a/metadata-ingestion/tests/unit/test_workunit.py b/metadata-ingestion/tests/unit/test_workunit.py index e036acc9d50ea..9d31b3fc60866 100644 --- a/metadata-ingestion/tests/unit/test_workunit.py +++ b/metadata-ingestion/tests/unit/test_workunit.py @@ -19,8 +19,8 @@ def test_get_aspects_of_type_mcp(): wu = MetadataChangeProposalWrapper( entityUrn="urn:li:container:asdf", aspect=aspect ).as_workunit() - assert wu.get_aspects_of_type(StatusClass) == [aspect] - assert wu.get_aspects_of_type(ContainerClass) == [] + assert wu.get_aspect_of_type(StatusClass) == aspect + assert wu.get_aspect_of_type(ContainerClass) is None def test_get_aspects_of_type_mce(): @@ -34,9 +34,9 @@ def test_get_aspects_of_type_mce(): ) ) wu = MetadataWorkUnit(id="id", mce=mce) - assert wu.get_aspects_of_type(StatusClass) == [status_aspect, status_aspect_2] - assert wu.get_aspects_of_type(UpstreamLineageClass) == [lineage_aspect] - assert wu.get_aspects_of_type(ContainerClass) == [] + assert wu.get_aspect_of_type(StatusClass) == status_aspect_2 + assert wu.get_aspect_of_type(UpstreamLineageClass) == lineage_aspect + assert wu.get_aspect_of_type(ContainerClass) is None def test_get_aspects_of_type_mcpc(): @@ -52,8 +52,8 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [aspect] - assert wu.get_aspects_of_type(ContainerClass) == [] + assert wu.get_aspect_of_type(StatusClass) == aspect + assert wu.get_aspect_of_type(ContainerClass) is None # Failure scenarios mcpc = MetadataChangeProposalClass( @@ -67,7 +67,7 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [] + assert wu.get_aspect_of_type(StatusClass) is None mcpc = MetadataChangeProposalClass( entityUrn="urn:li:container:asdf", @@ -80,7 +80,7 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [] + assert wu.get_aspect_of_type(StatusClass) is None mcpc = MetadataChangeProposalClass( entityUrn="urn:li:container:asdf", @@ -93,7 +93,7 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [] + assert wu.get_aspect_of_type(StatusClass) is None mcpc = MetadataChangeProposalClass( entityUrn="urn:li:container:asdf", @@ -106,4 +106,4 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [] + assert wu.get_aspect_of_type(StatusClass) is None