Skip to content

Commit

Permalink
feat(ingest): Produce browse paths v2 on demand and with platform ins…
Browse files Browse the repository at this point in the history
…tance (#8173)

Co-authored-by: Harshal Sheth <[email protected]>
Co-authored-by: Pedro Silva <[email protected]>
  • Loading branch information
3 people authored Jun 9, 2023
1 parent 43d37ff commit f2c66fd
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 139 deletions.
48 changes: 33 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from pydantic import BaseModel

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.configuration.source_common import PlatformInstanceConfigMixin
from datahub.emitter.mcp_builder import PlatformKey, mcps_from_mce
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
Expand Down Expand Up @@ -187,26 +188,15 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
self.ctx.pipeline_config
and self.ctx.pipeline_config.flags.generate_browse_path_v2
):
platform = getattr(self, "platform", None) or getattr(
self.get_config(), "platform", None
)
env = getattr(self.get_config(), "env", None)
browse_path_drop_dirs = [
platform,
platform and platform.lower(),
env,
env and env.lower(),
]
browse_path_processor = partial(
auto_browse_path_v2,
[s for s in browse_path_drop_dirs if s is not None],
browse_path_processor = self._get_browse_path_processor(
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run
)

return [
auto_status_aspect,
auto_materialize_referenced_tags,
partial(auto_workunit_reporter, self.get_report()),
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
]

@staticmethod
Expand Down Expand Up @@ -248,6 +238,34 @@ def get_report(self) -> SourceReport:
def close(self) -> None:
pass

def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
config = self.get_config()
platform = getattr(self, "platform", None) or getattr(config, "platform", None)
env = getattr(config, "env", None)
browse_path_drop_dirs = [
platform,
platform and platform.lower(),
env,
env and env.lower(),
]

platform_key: Optional[PlatformKey] = None
if (
platform
and isinstance(config, PlatformInstanceConfigMixin)
and config.platform_instance
):
platform_key = PlatformKey(
platform=platform, instance=config.platform_instance
)

return partial(
auto_browse_path_v2,
platform_key=platform_key,
drop_dirs=[s for s in browse_path_drop_dirs if s is not None],
dry_run=dry_run,
)


class TestableSource(Source):
@staticmethod
Expand Down
187 changes: 129 additions & 58 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Callable,
Expand All @@ -9,11 +8,14 @@
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
)

from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import PlatformKey
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
Expand All @@ -25,6 +27,7 @@
StatusClass,
TagKeyClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns
Expand Down Expand Up @@ -166,68 +169,136 @@ def auto_materialize_referenced_tags(


def auto_browse_path_v2(
drop_dirs: Sequence[str],
stream: Iterable[MetadataWorkUnit],
*,
dry_run: bool = False,
drop_dirs: Sequence[str] = (),
platform_key: Optional[PlatformKey] = None,
) -> Iterable[MetadataWorkUnit]:
"""Generate BrowsePathsV2 from Container and BrowsePaths aspects."""
"""Generate BrowsePathsV2 from Container and BrowsePaths aspects.
ignore_urns: Set[str] = set()
legacy_browse_paths: Dict[str, List[str]] = defaultdict(list)
container_urns: Set[str] = set()
parent_container_map: Dict[str, str] = {}
children: Dict[str, List[str]] = defaultdict(list)
for wu in stream:
yield wu
Generates browse paths v2 on demand, rather than waiting for end of ingestion,
for better UI experience while ingestion is running.
urn = wu.get_urn()
if guess_entity_type(urn) == "container":
container_urns.add(urn)

container_aspects = wu.get_aspects_of_type(ContainerClass)
for c_aspect in container_aspects:
parent = c_aspect.container
parent_container_map[urn] = parent
children[parent].append(urn)

browse_path_aspects = wu.get_aspects_of_type(BrowsePathsClass)
for b_aspect in browse_path_aspects:
if b_aspect.paths:
path = b_aspect.paths[0] # Only take first path
legacy_browse_paths[urn] = [
p for p in path.strip("/").split("/") if p.strip() not in drop_dirs
To do this, assumes entities in container structure arrive in topological order
and that all relevant aspects (Container, BrowsePaths, BrowsePathsV2) for an urn
arrive together in a batch.
Calculates the correct BrowsePathsV2 at end of workunit stream,
and emits "corrections", i.e. a final BrowsePathsV2 for any urns that have changed.
"""

# For telemetry, to see if our sources violate assumptions
num_out_of_order = 0
num_out_of_batch = 0

# Set for all containers and urns with a Container aspect
# Used to construct container paths while iterating through stream
# Assumes topological order of entities in stream
paths: Dict[str, List[BrowsePathEntryClass]] = {}

emitted_urns: Set[str] = set()
containers_used_as_parent: Set[str] = set()
for urn, batch in _batch_workunits_by_urn(stream):
container_path: Optional[List[BrowsePathEntryClass]] = None
legacy_path: Optional[List[BrowsePathEntryClass]] = None
has_browse_path_v2 = False

for wu in batch:
yield wu
if not wu.is_primary_source:
continue

container_aspect = wu.get_aspect_of_type(ContainerClass)
if container_aspect:
parent_urn = container_aspect.container
containers_used_as_parent.add(parent_urn)
paths[urn] = [
*paths.setdefault(parent_urn, []), # Guess parent has no parents
BrowsePathEntryClass(id=parent_urn, urn=parent_urn),
]
container_path = paths[urn]

if urn in containers_used_as_parent:
# Topological order invariant violated; we've used the previous value of paths[urn]
# TODO: Add sentry alert
num_out_of_order += 1

browse_path_aspect = wu.get_aspect_of_type(BrowsePathsClass)
if browse_path_aspect and browse_path_aspect.paths:
legacy_path = [
BrowsePathEntryClass(id=p.strip())
for p in browse_path_aspect.paths[0].strip("/").split("/")
if p.strip() and p.strip() not in drop_dirs
]

if wu.get_aspect_of_type(BrowsePathsV2Class):
has_browse_path_v2 = True

path = container_path or legacy_path
if (path is not None or has_browse_path_v2) and urn in emitted_urns:
# Batch invariant violated
# TODO: Add sentry alert
num_out_of_batch += 1
elif has_browse_path_v2:
emitted_urns.add(urn)
elif path is not None:
emitted_urns.add(urn)
if not dry_run:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(
path=_prepend_platform_instance(path, platform_key)
),
).as_workunit()
elif urn not in emitted_urns and guess_entity_type(urn) == "container":
# Root containers have no Container aspect, so they are not handled above
emitted_urns.add(urn)
if not dry_run:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(
path=_prepend_platform_instance([], platform_key)
),
).as_workunit()

if num_out_of_batch or num_out_of_order:
properties = {
"platform": platform_key.platform if platform_key else None,
"has_platform_instance": bool(platform_key.instance)
if platform_key
else False,
"num_out_of_batch": num_out_of_batch,
"num_out_of_order": num_out_of_order,
}
telemetry.telemetry_instance.ping("incorrect_browse_path_v2", properties)


def _batch_workunits_by_urn(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[Tuple[str, List[MetadataWorkUnit]]]:
batch: List[MetadataWorkUnit] = []
batch_urn: Optional[str] = None
for wu in stream:
if wu.get_urn() != batch_urn:
if batch_urn is not None:
yield batch_urn, batch
batch = []

if wu.get_aspects_of_type(BrowsePathsV2Class):
ignore_urns.add(urn)
batch.append(wu)
batch_urn = wu.get_urn()

paths: Dict[str, List[str]] = {} # Maps urn -> list of urns in path
# Yield browse paths v2 in topological order, starting with root containers
processed_urns = set()
nodes = container_urns - parent_container_map.keys()
while nodes:
node = nodes.pop()
nodes.update(children[node])
if batch_urn is not None:
yield batch_urn, batch

if node not in parent_container_map: # root
paths[node] = []
else:
parent = parent_container_map[node]
paths[node] = [*paths[parent], parent]
if node not in ignore_urns:
yield MetadataChangeProposalWrapper(
entityUrn=node,
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass(id=urn, urn=urn) for urn in paths[node]]
),
).as_workunit()
processed_urns.add(node)

# Yield browse paths v2 based on browse paths v1 (legacy)
# Only done if the entity is not part of a container hierarchy
for urn in legacy_browse_paths.keys() - processed_urns - ignore_urns:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass(id=p) for p in legacy_browse_paths[urn]]
),
).as_workunit()

def _prepend_platform_instance(
entries: List[BrowsePathEntryClass], platform_key: Optional[PlatformKey]
) -> List[BrowsePathEntryClass]:
if platform_key and platform_key.instance:
urn = make_dataplatform_instance_urn(
platform_key.platform, platform_key.instance
)
return [BrowsePathEntryClass(id=urn, urn=urn)] + entries

return entries
12 changes: 9 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass
from typing import Iterable, List, Optional, Type, TypeVar, Union, overload
from typing import Iterable, Optional, Type, TypeVar, Union, overload

from deprecated import deprecated

Expand All @@ -11,6 +12,8 @@
)
from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect

logger = logging.getLogger(__name__)

T_Aspect = TypeVar("T_Aspect", bound=_Aspect)


Expand Down Expand Up @@ -90,7 +93,7 @@ def get_urn(self) -> str:
assert self.metadata.entityUrn
return self.metadata.entityUrn

def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]:
def get_aspect_of_type(self, aspect_cls: Type[T_Aspect]) -> Optional[T_Aspect]:
aspects: list
if isinstance(self.metadata, MetadataChangeEvent):
aspects = self.metadata.proposedSnapshot.aspects
Expand All @@ -109,7 +112,10 @@ def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]:
else:
raise ValueError(f"Unexpected type {type(self.metadata)}")

return [a for a in aspects if isinstance(a, aspect_cls)]
aspects = [a for a in aspects if isinstance(a, aspect_cls)]
if len(aspects) > 1:
logger.warning(f"Found multiple aspects of type {aspect_cls} in MCE {self}")
return aspects[-1] if aspects else None

def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
from datahub.emitter.mcp_builder import mcps_from_mce
Expand Down
12 changes: 10 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ class FlagsConfig(ConfigModel):
"""

generate_browse_path_v2: bool = Field(
default=False,
default=True,
description="Generate BrowsePathsV2 aspects from container hierarchy and existing BrowsePaths aspects.",
)

generate_browse_path_v2_dry_run: bool = Field(
default=True,
description=(
"Run through browse paths v2 generation but do not actually write the aspects to DataHub. "
"Requires `generate_browse_path_v2` to also be enabled."
),
)


class PipelineConfig(ConfigModel):
# Once support for discriminated unions gets merged into Pydantic, we can
Expand All @@ -58,7 +66,7 @@ class PipelineConfig(ConfigModel):
source: SourceConfig
sink: DynamicTypedConfig
transformers: Optional[List[DynamicTypedConfig]]
flags: FlagsConfig = Field(default=FlagsConfig())
flags: FlagsConfig = Field(default=FlagsConfig(), hidden_from_docs=True)
reporting: List[ReporterConfig] = []
run_id: str = DEFAULT_RUN_ID
datahub_api: Optional[DatahubClientConfig] = None
Expand Down
Loading

0 comments on commit f2c66fd

Please sign in to comment.