From 09ee5173ffa676027da49b55c4d2cae6aa7426fa Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Mon, 5 Jun 2023 14:59:33 -0700 Subject: [PATCH 1/7] feat(ingest): Produce browse paths v2 on demand and with platform instance --- .../src/datahub/ingestion/api/source.py | 48 +++-- .../datahub/ingestion/api/source_helpers.py | 179 ++++++++++++----- .../src/datahub/ingestion/api/workunit.py | 12 +- .../tests/unit/test_source_helpers.py | 188 +++++++++++++----- .../tests/unit/test_workunit.py | 22 +- 5 files changed, 316 insertions(+), 133 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 8ab979ef7a719..518b87103a353 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -22,7 +22,8 @@ from pydantic import BaseModel from datahub.configuration.common import ConfigModel -from datahub.emitter.mcp_builder import mcps_from_mce +from datahub.configuration.source_common import PlatformInstanceConfigMixin +from datahub.emitter.mcp_builder import PlatformKey, mcps_from_mce from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit from datahub.ingestion.api.report import Report @@ -187,26 +188,13 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config and self.ctx.pipeline_config.flags.generate_browse_path_v2 ): - platform = getattr(self, "platform", None) or getattr( - self.get_config(), "platform", None - ) - env = getattr(self.get_config(), "env", None) - browse_path_drop_dirs = [ - platform, - platform and platform.lower(), - env, - env and env.lower(), - ] - browse_path_processor = partial( - auto_browse_path_v2, - [s for s in browse_path_drop_dirs if s is not None], - ) + browse_path_processor = self._get_browse_path_processor() return [ auto_status_aspect, auto_materialize_referenced_tags, - partial(auto_workunit_reporter, self.get_report()), browse_path_processor, + partial(auto_workunit_reporter, self.get_report()), ] @staticmethod @@ -248,6 +236,34 @@ def get_report(self) -> SourceReport: def close(self) -> None: pass + def _get_browse_path_processor(self) -> MetadataWorkUnitProcessor: + config = self.get_config() + platform = getattr(self, "platform", None) or getattr(config, "platform", None) + env = getattr(config, "env", None) + browse_path_drop_dirs = [ + platform, + platform and platform.lower(), + env, + env and env.lower(), + ] + + platform_key: Optional[PlatformKey] = None + if ( + platform + and isinstance(config, PlatformInstanceConfigMixin) + and config.platform_instance + ): + platform_key = PlatformKey( + platform=platform, instance=config.platform_instance + ) + + return partial( + auto_browse_path_v2, + platform_key=platform_key, + drop_dirs=[s for s in browse_path_drop_dirs if s is not None], + pipeline_config=self.ctx.pipeline_config, + ) + class TestableSource(Source): @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 7951f37c98076..31af3566bdcc6 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,5 +1,4 @@ import logging -from collections import defaultdict from typing import ( TYPE_CHECKING, Callable, @@ -9,12 +8,16 @@ Optional, Sequence, Set, + Tuple, TypeVar, Union, ) +from datahub.emitter.mce_builder import make_dataplatform_instance_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import PlatformKey from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.run.pipeline_config import PipelineConfig from datahub.metadata.schema_classes import ( BrowsePathEntryClass, BrowsePathsClass, @@ -25,6 +28,7 @@ StatusClass, TagKeyClass, ) +from datahub.telemetry import telemetry from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import guess_entity_type from datahub.utilities.urns.urn_iter import list_urns @@ -166,68 +170,135 @@ def auto_materialize_referenced_tags( def auto_browse_path_v2( - drop_dirs: Sequence[str], stream: Iterable[MetadataWorkUnit], + *, + drop_dirs: Sequence[str] = (), + platform_key: Optional[PlatformKey] = None, + pipeline_config: Optional[PipelineConfig] = None, ) -> Iterable[MetadataWorkUnit]: - """Generate BrowsePathsV2 from Container and BrowsePaths aspects.""" + """Generate BrowsePathsV2 from Container and BrowsePaths aspects. - ignore_urns: Set[str] = set() - legacy_browse_paths: Dict[str, List[str]] = defaultdict(list) - container_urns: Set[str] = set() - parent_container_map: Dict[str, str] = {} - children: Dict[str, List[str]] = defaultdict(list) - for wu in stream: - yield wu + Generates browse paths v2 on demand, rather than waiting for end of ingestion, + for better UI experience while ingestion is running. - urn = wu.get_urn() - if guess_entity_type(urn) == "container": - container_urns.add(urn) - - container_aspects = wu.get_aspects_of_type(ContainerClass) - for c_aspect in container_aspects: - parent = c_aspect.container - parent_container_map[urn] = parent - children[parent].append(urn) - - browse_path_aspects = wu.get_aspects_of_type(BrowsePathsClass) - for b_aspect in browse_path_aspects: - if b_aspect.paths: - path = b_aspect.paths[0] # Only take first path - legacy_browse_paths[urn] = [ - p for p in path.strip("/").split("/") if p.strip() not in drop_dirs - ] + To do this, assumes entities in container structure arrive in topological order + and that all relevant aspects (Container, BrowsePaths, BrowsePathsV2) for an urn + arrive together in a batch. - if wu.get_aspects_of_type(BrowsePathsV2Class): - ignore_urns.add(urn) + Calculates the correct BrowsePathsV2 at end of workunit stream, + and emits "corrections", i.e. a final BrowsePathsV2 for any urns that have changed. + """ - paths: Dict[str, List[str]] = {} # Maps urn -> list of urns in path - # Yield browse paths v2 in topological order, starting with root containers - processed_urns = set() - nodes = container_urns - parent_container_map.keys() - while nodes: - node = nodes.pop() - nodes.update(children[node]) + telemetry_metadata = { + "platform": platform_key.platform if platform_key else None, + "run_id": pipeline_config.run_id if pipeline_config else None, + "pipeline_name": pipeline_config.pipeline_name if pipeline_config else None, + "server": pipeline_config.datahub_api.server + if pipeline_config and pipeline_config.datahub_api + else None, + } + + # Set for all containers and urns with a Container aspect + # Used to construct container paths while iterating through stream + # Assumes topological order of entities in stream + paths: Dict[str, List[BrowsePathEntryClass]] = {} + + emitted_urns: Set[str] = set() + containers_used_as_parent: Set[str] = set() + for urn, batch in _batch_workunits_by_urn(stream): + container_path: Optional[List[BrowsePathEntryClass]] = None + legacy_path: Optional[List[BrowsePathEntryClass]] = None + has_browse_path_v2 = False + + for wu in batch: + yield wu + if not wu.is_primary_source: + continue + + container_aspect = wu.get_aspect_of_type(ContainerClass) + if container_aspect: + parent_urn = container_aspect.container + containers_used_as_parent.add(parent_urn) + paths[urn] = [ + *paths.setdefault(parent_urn, []), # Guess parent has no parents + BrowsePathEntryClass(id=parent_urn, urn=parent_urn), + ] + container_path = paths[urn] + + if urn in containers_used_as_parent: + # Topological order invariant violated; we've used the previous value of paths[urn] + telemetry.telemetry_instance.ping( + "incorrect_browse_path_v2", + { + **telemetry_metadata, + "reason": f"Container aspect for {urn} emitted out of order", + }, + ) + + browse_path_aspect = wu.get_aspect_of_type(BrowsePathsClass) + if browse_path_aspect: + legacy_path = [ + BrowsePathEntryClass(id=p) + for p in browse_path_aspect.paths[0].strip("/").split("/") + if p.strip() not in drop_dirs + ] - if node not in parent_container_map: # root - paths[node] = [] - else: - parent = parent_container_map[node] - paths[node] = [*paths[parent], parent] - if node not in ignore_urns: + if wu.get_aspect_of_type(BrowsePathsV2Class): + has_browse_path_v2 = True + + path = container_path or legacy_path + if (path is not None or has_browse_path_v2) and urn in emitted_urns: + # Batch invariant violated + telemetry.telemetry_instance.ping( + "incorrect_browse_path_v2", + {**telemetry_metadata, "reason": f"Data for {urn} not in batch"}, + ) + elif has_browse_path_v2: + emitted_urns.add(urn) + elif path is not None: + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=BrowsePathsV2Class( + path=_prepend_platform_instance(path, platform_key) + ), + ).as_workunit() + emitted_urns.add(urn) + elif urn not in emitted_urns and guess_entity_type(urn) == "container": + # Root containers have no Container aspect, so they are not handled above yield MetadataChangeProposalWrapper( - entityUrn=node, + entityUrn=urn, aspect=BrowsePathsV2Class( - path=[BrowsePathEntryClass(id=urn, urn=urn) for urn in paths[node]] + path=_prepend_platform_instance([], platform_key) ), ).as_workunit() - processed_urns.add(node) + emitted_urns.add(urn) - # Yield browse paths v2 based on browse paths v1 (legacy) - # Only done if the entity is not part of a container hierarchy - for urn in legacy_browse_paths.keys() - processed_urns - ignore_urns: - yield MetadataChangeProposalWrapper( - entityUrn=urn, - aspect=BrowsePathsV2Class( - path=[BrowsePathEntryClass(id=p) for p in legacy_browse_paths[urn]] - ), - ).as_workunit() + +def _batch_workunits_by_urn( + stream: Iterable[MetadataWorkUnit], +) -> Iterable[Tuple[str, List[MetadataWorkUnit]]]: + batch: List[MetadataWorkUnit] = [] + batch_urn: Optional[str] = None + for wu in stream: + if wu.get_urn() != batch_urn: + if batch_urn is not None: + yield batch_urn, batch + batch = [] + + batch.append(wu) + batch_urn = wu.get_urn() + + if batch_urn is not None: + yield batch_urn, batch + + +def _prepend_platform_instance( + entries: List[BrowsePathEntryClass], platform_key: Optional[PlatformKey] +) -> List[BrowsePathEntryClass]: + if platform_key and platform_key.instance: + urn = make_dataplatform_instance_urn( + platform_key.platform, platform_key.instance + ) + return [BrowsePathEntryClass(id=urn, urn=urn)] + entries + + return entries diff --git a/metadata-ingestion/src/datahub/ingestion/api/workunit.py b/metadata-ingestion/src/datahub/ingestion/api/workunit.py index 3a80b900f1444..8eea3514a22af 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/workunit.py +++ b/metadata-ingestion/src/datahub/ingestion/api/workunit.py @@ -1,5 +1,6 @@ +import logging from dataclasses import dataclass -from typing import Iterable, List, Optional, Type, TypeVar, Union, overload +from typing import Iterable, Optional, Type, TypeVar, Union, overload from deprecated import deprecated @@ -11,6 +12,8 @@ ) from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect +logger = logging.getLogger(__name__) + T_Aspect = TypeVar("T_Aspect", bound=_Aspect) @@ -90,7 +93,7 @@ def get_urn(self) -> str: assert self.metadata.entityUrn return self.metadata.entityUrn - def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]: + def get_aspect_of_type(self, aspect_cls: Type[T_Aspect]) -> Optional[T_Aspect]: aspects: list if isinstance(self.metadata, MetadataChangeEvent): aspects = self.metadata.proposedSnapshot.aspects @@ -109,7 +112,10 @@ def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]: else: raise ValueError(f"Unexpected type {type(self.metadata)}") - return [a for a in aspects if isinstance(a, aspect_cls)] + aspects = [a for a in aspects if isinstance(a, aspect_cls)] + if len(aspects) > 1: + logger.warning(f"Found multiple aspects of type {aspect_cls} in MCE {self}") + return aspects[-1] if aspects else None def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]: from datahub.emitter.mcp_builder import mcps_from_mce diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index f8ac967af22c5..304c644a47a3a 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -1,4 +1,5 @@ from typing import Any, Dict, Iterable, List, Union +from unittest.mock import patch import datahub.metadata.schema_classes as models from datahub.emitter.mce_builder import make_container_urn, make_dataset_urn @@ -82,22 +83,38 @@ def test_auto_status_aspect(): assert list(auto_status_aspect(initial_wu)) == expected -def _create_container_aspects(d: Dict[str, Any]) -> Iterable[MetadataWorkUnit]: +def _create_container_aspects( + d: Dict[str, Any], + other_aspects: Dict[str, List[models._Aspect]] = {}, + root: bool = True, +) -> Iterable[MetadataWorkUnit]: for k, v in d.items(): + urn = make_container_urn(k) yield MetadataChangeProposalWrapper( - entityUrn=make_container_urn(k), - aspect=models.StatusClass(removed=False), + entityUrn=urn, aspect=models.StatusClass(removed=False) ).as_workunit() + for aspect in other_aspects.pop(k, []): + yield MetadataChangeProposalWrapper( + entityUrn=urn, aspect=aspect + ).as_workunit() + for child in list(v): yield MetadataChangeProposalWrapper( entityUrn=make_container_urn(child), - aspect=models.ContainerClass( - container=make_container_urn(k), - ), + aspect=models.ContainerClass(container=urn), ).as_workunit() if isinstance(v, dict): - yield from _create_container_aspects(v) + yield from _create_container_aspects( + v, other_aspects=other_aspects, root=False + ) + + if root: + for k, v in other_aspects.items(): + for aspect in v: + yield MetadataChangeProposalWrapper( + entityUrn=make_container_urn(k), aspect=aspect + ).as_workunit() def _make_container_browse_path_entries( @@ -118,14 +135,15 @@ def _get_browse_paths_from_wu( ) -> Dict[str, List[models.BrowsePathEntryClass]]: paths = {} for wu in stream: - browse_path_v2 = wu.get_aspects_of_type(models.BrowsePathsV2Class) + browse_path_v2 = wu.get_aspect_of_type(models.BrowsePathsV2Class) if browse_path_v2: name = wu.get_urn().split(":")[-1] - paths[name] = browse_path_v2[0].path + paths[name] = browse_path_v2.path return paths -def test_auto_browse_path_v2_by_container_hierarchy(): +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_by_container_hierarchy(telemetry_ping_mock): structure = { "one": { "a": {"i": ["1", "2", "3"], "ii": ["4"]}, @@ -140,12 +158,13 @@ def test_auto_browse_path_v2_by_container_hierarchy(): wus = list(auto_status_aspect(_create_container_aspects(structure))) assert ( # Sanity check - sum(len(wu.get_aspects_of_type(models.StatusClass)) for wu in wus) == 21 + sum(bool(wu.get_aspect_of_type(models.StatusClass)) for wu in wus) == 21 ) - new_wus = list(auto_browse_path_v2([], wus)) + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list assert ( - sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) == 21 ) @@ -159,38 +178,53 @@ def test_auto_browse_path_v2_by_container_hierarchy(): assert paths["d"] == _make_container_browse_path_entries(["three"]) assert paths["i"] == _make_container_browse_path_entries(["one", "a"]) + # Check urns emitted on demand -- not all at end + for urn in set(wu.get_urn() for wu in new_wus): + try: + idx = next( + i + for i, wu in enumerate(new_wus) + if wu.get_aspect_of_type(models.ContainerClass) and wu.get_urn() == urn + ) + except StopIteration: + idx = next( + i + for i, wu in enumerate(new_wus) + if wu.get_aspect_of_type(models.StatusClass) and wu.get_urn() == urn + ) + assert new_wus[idx + 1].get_aspect_of_type( + models.BrowsePathsV2Class + ) or new_wus[idx + 2].get_aspect_of_type(models.BrowsePathsV2Class) -def test_auto_browse_path_v2_ignores_urns_already_with(): + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_ignores_urns_already_with(telemetry_ping_mock): structure = {"a": {"b": {"c": {"d": ["e"]}}}} - mcps = [ - *MetadataChangeProposalWrapper.construct_many( - entityUrn=make_container_urn("f"), - aspects=[ - models.BrowsePathsClass(paths=["/one/two"]), - models.BrowsePathsV2Class( - path=_make_browse_path_entries(["my", "path"]) - ), - ], - ), - MetadataChangeProposalWrapper( - entityUrn=make_container_urn("c"), - aspect=models.BrowsePathsV2Class( - path=_make_container_browse_path_entries(["custom", "path"]) - ), - ), - ] wus = [ *auto_status_aspect( - [ - *_create_container_aspects(structure), - *(mcp.as_workunit() for mcp in mcps), - ] + _create_container_aspects( + structure, + other_aspects={ + "f": [ + models.BrowsePathsClass(paths=["/one/two"]), + models.BrowsePathsV2Class( + path=_make_browse_path_entries(["my", "path"]) + ), + ], + "c": [ + models.BrowsePathsV2Class( + path=_make_container_browse_path_entries(["custom", "path"]) + ) + ], + }, + ), ) ] - new_wus = list(auto_browse_path_v2([], wus)) + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list assert ( - sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) == 6 ) @@ -201,7 +235,8 @@ def test_auto_browse_path_v2_ignores_urns_already_with(): assert paths["f"] == _make_browse_path_entries(["my", "path"]) -def test_auto_browse_path_v2_legacy_browse_path(): +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock): platform = "platform" env = "PROD" wus = [ @@ -218,7 +253,8 @@ def test_auto_browse_path_v2_legacy_browse_path(): aspect=models.BrowsePathsClass([f"/{platform}/one/two"]), ).as_workunit(), ] - new_wus = list(auto_browse_path_v2(["platform", "PROD", "unused"], wus)) + new_wus = list(auto_browse_path_v2(wus, drop_dirs=["platform", "PROD", "unused"])) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list assert len(new_wus) == 6 paths = _get_browse_paths_from_wu(new_wus) assert ( @@ -229,22 +265,21 @@ def test_auto_browse_path_v2_legacy_browse_path(): assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"]) -def test_auto_browse_path_v2_container_over_legacy_browse_path(): +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock): structure = {"a": {"b": ["c"]}} wus = list( auto_status_aspect( - [ - *_create_container_aspects(structure), - MetadataChangeProposalWrapper( - entityUrn=make_container_urn("b"), - aspect=models.BrowsePathsClass(paths=["/one/two"]), - ).as_workunit(), - ] + _create_container_aspects( + structure, + other_aspects={"b": [models.BrowsePathsClass(paths=["/one/two"])]}, + ), ) ) - new_wus = list(auto_browse_path_v2([], wus)) + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list assert ( - sum(len(wu.get_aspects_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) == 3 ) @@ -252,3 +287,58 @@ def test_auto_browse_path_v2_container_over_legacy_browse_path(): assert paths["a"] == [] assert paths["b"] == _make_container_browse_path_entries(["a"]) assert paths["c"] == _make_container_browse_path_entries(["a", "b"]) + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_invalid_batch_telemetry(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + b_urn = make_container_urn("b") + wus = [ + *_create_container_aspects(structure), + MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect + entityUrn=b_urn, + aspect=models.BrowsePathsClass(paths=["/one/two"]), + ).as_workunit(), + ] + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + _ = list(auto_browse_path_v2(wus)) + assert telemetry_ping_mock.call_count == 1 + assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" + assert b_urn in telemetry_ping_mock.call_args_list[0][0][1]["reason"] + assert "batch" in telemetry_ping_mock.call_args_list[0][0][1]["reason"] + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_no_invalid_batch_telemetry_for_unrelated_aspects(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + b_urn = make_container_urn("b") + wus = [ + *_create_container_aspects(structure), + MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect + entityUrn=b_urn, + aspect=models.ContainerPropertiesClass("container name"), + ).as_workunit(), + ] + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + _ = list(auto_browse_path_v2(wus)) + assert telemetry_ping_mock.call_count == 0 + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_invalid_order_telemetry(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + wus = list(reversed(list(_create_container_aspects(structure)))) + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + _ = list(auto_browse_path_v2(wus)) + assert telemetry_ping_mock.call_count == 1 + assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" + assert ( + make_container_urn("b") in telemetry_ping_mock.call_args_list[0][0][1]["reason"] + ) + assert "order" in telemetry_ping_mock.call_args_list[0][0][1]["reason"] diff --git a/metadata-ingestion/tests/unit/test_workunit.py b/metadata-ingestion/tests/unit/test_workunit.py index e036acc9d50ea..9d31b3fc60866 100644 --- a/metadata-ingestion/tests/unit/test_workunit.py +++ b/metadata-ingestion/tests/unit/test_workunit.py @@ -19,8 +19,8 @@ def test_get_aspects_of_type_mcp(): wu = MetadataChangeProposalWrapper( entityUrn="urn:li:container:asdf", aspect=aspect ).as_workunit() - assert wu.get_aspects_of_type(StatusClass) == [aspect] - assert wu.get_aspects_of_type(ContainerClass) == [] + assert wu.get_aspect_of_type(StatusClass) == aspect + assert wu.get_aspect_of_type(ContainerClass) is None def test_get_aspects_of_type_mce(): @@ -34,9 +34,9 @@ def test_get_aspects_of_type_mce(): ) ) wu = MetadataWorkUnit(id="id", mce=mce) - assert wu.get_aspects_of_type(StatusClass) == [status_aspect, status_aspect_2] - assert wu.get_aspects_of_type(UpstreamLineageClass) == [lineage_aspect] - assert wu.get_aspects_of_type(ContainerClass) == [] + assert wu.get_aspect_of_type(StatusClass) == status_aspect_2 + assert wu.get_aspect_of_type(UpstreamLineageClass) == lineage_aspect + assert wu.get_aspect_of_type(ContainerClass) is None def test_get_aspects_of_type_mcpc(): @@ -52,8 +52,8 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [aspect] - assert wu.get_aspects_of_type(ContainerClass) == [] + assert wu.get_aspect_of_type(StatusClass) == aspect + assert wu.get_aspect_of_type(ContainerClass) is None # Failure scenarios mcpc = MetadataChangeProposalClass( @@ -67,7 +67,7 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [] + assert wu.get_aspect_of_type(StatusClass) is None mcpc = MetadataChangeProposalClass( entityUrn="urn:li:container:asdf", @@ -80,7 +80,7 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [] + assert wu.get_aspect_of_type(StatusClass) is None mcpc = MetadataChangeProposalClass( entityUrn="urn:li:container:asdf", @@ -93,7 +93,7 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [] + assert wu.get_aspect_of_type(StatusClass) is None mcpc = MetadataChangeProposalClass( entityUrn="urn:li:container:asdf", @@ -106,4 +106,4 @@ def test_get_aspects_of_type_mcpc(): ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) - assert wu.get_aspects_of_type(StatusClass) == [] + assert wu.get_aspect_of_type(StatusClass) is None From ab20401a71cd8a813808c9db96014100ce480ceb Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Mon, 5 Jun 2023 15:05:21 -0700 Subject: [PATCH 2/7] add platform instance test --- .../tests/unit/test_source_helpers.py | 52 +++++++++++++++++-- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index 304c644a47a3a..fe7cac730bc91 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -2,8 +2,13 @@ from unittest.mock import patch import datahub.metadata.schema_classes as models -from datahub.emitter.mce_builder import make_container_urn, make_dataset_urn +from datahub.emitter.mce_builder import ( + make_container_urn, + make_dataset_urn, + make_dataplatform_instance_urn, +) from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import PlatformKey from datahub.ingestion.api.source_helpers import ( auto_browse_path_v2, auto_status_aspect, @@ -290,7 +295,44 @@ def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mo @patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_invalid_batch_telemetry(telemetry_ping_mock): +def test_auto_browse_path_v2_with_platform_instsance(telemetry_ping_mock): + platform = "my_platform" + platform_instance = "my_instance" + platform_key = PlatformKey(platform=platform, instance=platform_instance) + platform_instance_urn = make_dataplatform_instance_urn(platform, platform_instance) + platform_instance_entry = models.BrowsePathEntryClass( + platform_instance_urn, platform_instance_urn + ) + + structure = {"a": {"b": ["c"]}} + wus = list(auto_status_aspect(_create_container_aspects(structure))) + + new_wus = list( + auto_browse_path_v2( + wus, + platform_key=platform_key, + ) + ) + assert telemetry_ping_mock.call_count == 0 + + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 3 + ) + paths = _get_browse_paths_from_wu(new_wus) + assert paths["a"] == [platform_instance_entry] + assert paths["b"] == [ + platform_instance_entry, + *_make_container_browse_path_entries(["a"]), + ] + assert paths["c"] == [ + platform_instance_entry, + *_make_container_browse_path_entries(["a", "b"]), + ] + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_invalid_batch_telemetry(telemetry_ping_mock): structure = {"a": {"b": ["c"]}} b_urn = make_container_urn("b") wus = [ @@ -311,7 +353,9 @@ def test_invalid_batch_telemetry(telemetry_ping_mock): @patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_no_invalid_batch_telemetry_for_unrelated_aspects(telemetry_ping_mock): +def test_auto_browse_path_v2_no_invalid_batch_telemetry_for_unrelated_aspects( + telemetry_ping_mock, +): structure = {"a": {"b": ["c"]}} b_urn = make_container_urn("b") wus = [ @@ -329,7 +373,7 @@ def test_no_invalid_batch_telemetry_for_unrelated_aspects(telemetry_ping_mock): @patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_invalid_order_telemetry(telemetry_ping_mock): +def test_auto_browse_path_v2_invalid_order_telemetry(telemetry_ping_mock): structure = {"a": {"b": ["c"]}} wus = list(reversed(list(_create_container_aspects(structure)))) wus = list(auto_status_aspect(wus)) From 24aa6a62758a3cfac6ffc11e10495c9a1b9695f2 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Mon, 5 Jun 2023 15:18:28 -0700 Subject: [PATCH 3/7] lint --- metadata-ingestion/tests/unit/test_source_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index fe7cac730bc91..56e255943fbfb 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -4,8 +4,8 @@ import datahub.metadata.schema_classes as models from datahub.emitter.mce_builder import ( make_container_urn, - make_dataset_urn, make_dataplatform_instance_urn, + make_dataset_urn, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import PlatformKey From da092cb5b7d45d1075d8e5ee3a9794711c01a0ad Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 7 Jun 2023 01:54:08 -0700 Subject: [PATCH 4/7] simplify telemetry --- .../src/datahub/ingestion/api/source.py | 1 - .../datahub/ingestion/api/source_helpers.py | 39 +++++++++---------- .../tests/unit/test_source_helpers.py | 10 ++--- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 518b87103a353..7f34579190c11 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -261,7 +261,6 @@ def _get_browse_path_processor(self) -> MetadataWorkUnitProcessor: auto_browse_path_v2, platform_key=platform_key, drop_dirs=[s for s in browse_path_drop_dirs if s is not None], - pipeline_config=self.ctx.pipeline_config, ) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 31af3566bdcc6..1f9bf646ee9a0 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -17,7 +17,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import PlatformKey from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.run.pipeline_config import PipelineConfig from datahub.metadata.schema_classes import ( BrowsePathEntryClass, BrowsePathsClass, @@ -174,7 +173,6 @@ def auto_browse_path_v2( *, drop_dirs: Sequence[str] = (), platform_key: Optional[PlatformKey] = None, - pipeline_config: Optional[PipelineConfig] = None, ) -> Iterable[MetadataWorkUnit]: """Generate BrowsePathsV2 from Container and BrowsePaths aspects. @@ -189,14 +187,9 @@ def auto_browse_path_v2( and emits "corrections", i.e. a final BrowsePathsV2 for any urns that have changed. """ - telemetry_metadata = { - "platform": platform_key.platform if platform_key else None, - "run_id": pipeline_config.run_id if pipeline_config else None, - "pipeline_name": pipeline_config.pipeline_name if pipeline_config else None, - "server": pipeline_config.datahub_api.server - if pipeline_config and pipeline_config.datahub_api - else None, - } + # For telemetry, to see if our sources violate assumptions + num_out_of_order = 0 + num_out_of_batch = 0 # Set for all containers and urns with a Container aspect # Used to construct container paths while iterating through stream @@ -227,13 +220,8 @@ def auto_browse_path_v2( if urn in containers_used_as_parent: # Topological order invariant violated; we've used the previous value of paths[urn] - telemetry.telemetry_instance.ping( - "incorrect_browse_path_v2", - { - **telemetry_metadata, - "reason": f"Container aspect for {urn} emitted out of order", - }, - ) + # TODO: Add sentry alert + num_out_of_order += 1 browse_path_aspect = wu.get_aspect_of_type(BrowsePathsClass) if browse_path_aspect: @@ -249,10 +237,8 @@ def auto_browse_path_v2( path = container_path or legacy_path if (path is not None or has_browse_path_v2) and urn in emitted_urns: # Batch invariant violated - telemetry.telemetry_instance.ping( - "incorrect_browse_path_v2", - {**telemetry_metadata, "reason": f"Data for {urn} not in batch"}, - ) + # TODO: Add sentry alert + num_out_of_batch += 1 elif has_browse_path_v2: emitted_urns.add(urn) elif path is not None: @@ -273,6 +259,17 @@ def auto_browse_path_v2( ).as_workunit() emitted_urns.add(urn) + if num_out_of_batch or num_out_of_order: + properties = { + "platform": platform_key.platform if platform_key else None, + "has_platform_instance": bool(platform_key.instance) + if platform_key + else False, + "num_out_of_batch": num_out_of_batch, + "num_out_of_order": num_out_of_order, + } + telemetry.telemetry_instance.ping("incorrect_browse_path_v2", properties) + def _batch_workunits_by_urn( stream: Iterable[MetadataWorkUnit], diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index 56e255943fbfb..cefb80cf8be7e 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -348,8 +348,8 @@ def test_auto_browse_path_v2_invalid_batch_telemetry(telemetry_ping_mock): _ = list(auto_browse_path_v2(wus)) assert telemetry_ping_mock.call_count == 1 assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" - assert b_urn in telemetry_ping_mock.call_args_list[0][0][1]["reason"] - assert "batch" in telemetry_ping_mock.call_args_list[0][0][1]["reason"] + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 0 + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 1 @patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") @@ -382,7 +382,5 @@ def test_auto_browse_path_v2_invalid_order_telemetry(telemetry_ping_mock): _ = list(auto_browse_path_v2(wus)) assert telemetry_ping_mock.call_count == 1 assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" - assert ( - make_container_urn("b") in telemetry_ping_mock.call_args_list[0][0][1]["reason"] - ) - assert "order" in telemetry_ping_mock.call_args_list[0][0][1]["reason"] + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 1 + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 0 From 6c519cec902961645946fab73309cfab8928e708 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 7 Jun 2023 10:40:48 -0700 Subject: [PATCH 5/7] add if check --- .../src/datahub/ingestion/api/source_helpers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 1f9bf646ee9a0..384e619b6fec0 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -224,11 +224,11 @@ def auto_browse_path_v2( num_out_of_order += 1 browse_path_aspect = wu.get_aspect_of_type(BrowsePathsClass) - if browse_path_aspect: + if browse_path_aspect and browse_path_aspect.paths: legacy_path = [ - BrowsePathEntryClass(id=p) + BrowsePathEntryClass(id=p.strip()) for p in browse_path_aspect.paths[0].strip("/").split("/") - if p.strip() not in drop_dirs + if p.strip() and p.strip() not in drop_dirs ] if wu.get_aspect_of_type(BrowsePathsV2Class): From a14fc5bc365b5f9dfaa9aeda0e053882e857cc92 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 7 Jun 2023 13:40:12 -0700 Subject: [PATCH 6/7] dry run; hide flags from docs --- .../src/datahub/ingestion/api/source.py | 1 + .../datahub/ingestion/api/source_helpers.py | 27 ++++++++++--------- .../datahub/ingestion/run/pipeline_config.py | 12 +++++++-- .../tests/unit/test_source_helpers.py | 22 ++++++++++++++- 4 files changed, 47 insertions(+), 15 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 7f34579190c11..3979b1eea0e4c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -261,6 +261,7 @@ def _get_browse_path_processor(self) -> MetadataWorkUnitProcessor: auto_browse_path_v2, platform_key=platform_key, drop_dirs=[s for s in browse_path_drop_dirs if s is not None], + dry_run=self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run, ) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 384e619b6fec0..2e6e002a55cbe 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -171,6 +171,7 @@ def auto_materialize_referenced_tags( def auto_browse_path_v2( stream: Iterable[MetadataWorkUnit], *, + dry_run: bool = False, drop_dirs: Sequence[str] = (), platform_key: Optional[PlatformKey] = None, ) -> Iterable[MetadataWorkUnit]: @@ -242,22 +243,24 @@ def auto_browse_path_v2( elif has_browse_path_v2: emitted_urns.add(urn) elif path is not None: - yield MetadataChangeProposalWrapper( - entityUrn=urn, - aspect=BrowsePathsV2Class( - path=_prepend_platform_instance(path, platform_key) - ), - ).as_workunit() emitted_urns.add(urn) + if not dry_run: + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=BrowsePathsV2Class( + path=_prepend_platform_instance(path, platform_key) + ), + ).as_workunit() elif urn not in emitted_urns and guess_entity_type(urn) == "container": # Root containers have no Container aspect, so they are not handled above - yield MetadataChangeProposalWrapper( - entityUrn=urn, - aspect=BrowsePathsV2Class( - path=_prepend_platform_instance([], platform_key) - ), - ).as_workunit() emitted_urns.add(urn) + if not dry_run: + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=BrowsePathsV2Class( + path=_prepend_platform_instance([], platform_key) + ), + ).as_workunit() if num_out_of_batch or num_out_of_order: properties = { diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py index 0ae2cf307c60b..83b73d144ec52 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py @@ -45,10 +45,18 @@ class FlagsConfig(ConfigModel): """ generate_browse_path_v2: bool = Field( - default=False, + default=True, description="Generate BrowsePathsV2 aspects from container hierarchy and existing BrowsePaths aspects.", ) + generate_browse_path_v2_dry_run: bool = Field( + default=True, + description=( + "Run through browse paths v2 generation but do not actually write the aspects to DataHub. " + "Requires `generate_browse_path_v2` to also be enabled." + ), + ) + class PipelineConfig(ConfigModel): # Once support for discriminated unions gets merged into Pydantic, we can @@ -58,7 +66,7 @@ class PipelineConfig(ConfigModel): source: SourceConfig sink: DynamicTypedConfig transformers: Optional[List[DynamicTypedConfig]] - flags: FlagsConfig = Field(default=FlagsConfig()) + flags: FlagsConfig = Field(default=FlagsConfig(), hidden_from_docs=True) reporting: List[ReporterConfig] = [] run_id: str = DEFAULT_RUN_ID datahub_api: Optional[DatahubClientConfig] = None diff --git a/metadata-ingestion/tests/unit/test_source_helpers.py b/metadata-ingestion/tests/unit/test_source_helpers.py index cefb80cf8be7e..9f88f0d635958 100644 --- a/metadata-ingestion/tests/unit/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/test_source_helpers.py @@ -379,8 +379,28 @@ def test_auto_browse_path_v2_invalid_order_telemetry(telemetry_ping_mock): wus = list(auto_status_aspect(wus)) assert telemetry_ping_mock.call_count == 0 - _ = list(auto_browse_path_v2(wus)) + new_wus = list(auto_browse_path_v2(wus)) + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + > 0 + ) assert telemetry_ping_mock.call_count == 1 assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 1 assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 0 + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_dry_run(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + wus = list(reversed(list(_create_container_aspects(structure)))) + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + new_wus = list(auto_browse_path_v2(wus, dry_run=True)) + assert wus == new_wus + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 0 + ) + assert telemetry_ping_mock.call_count == 1 From fb1fee7b962059b1069a95b3838b95f4e9f9c8e9 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 7 Jun 2023 14:09:14 -0700 Subject: [PATCH 7/7] lint --- metadata-ingestion/src/datahub/ingestion/api/source.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 3979b1eea0e4c..4a90d4b05fbc2 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -188,7 +188,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: self.ctx.pipeline_config and self.ctx.pipeline_config.flags.generate_browse_path_v2 ): - browse_path_processor = self._get_browse_path_processor() + browse_path_processor = self._get_browse_path_processor( + self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run + ) return [ auto_status_aspect, @@ -236,7 +238,7 @@ def get_report(self) -> SourceReport: def close(self) -> None: pass - def _get_browse_path_processor(self) -> MetadataWorkUnitProcessor: + def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor: config = self.get_config() platform = getattr(self, "platform", None) or getattr(config, "platform", None) env = getattr(config, "env", None) @@ -261,7 +263,7 @@ def _get_browse_path_processor(self) -> MetadataWorkUnitProcessor: auto_browse_path_v2, platform_key=platform_key, drop_dirs=[s for s in browse_path_drop_dirs if s is not None], - dry_run=self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run, + dry_run=dry_run, )