Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): Produce browse paths v2 on demand and with platform instance #8173

Merged
merged 10 commits into from
Jun 9, 2023
48 changes: 33 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from pydantic import BaseModel

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.configuration.source_common import PlatformInstanceConfigMixin
from datahub.emitter.mcp_builder import PlatformKey, mcps_from_mce
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
Expand Down Expand Up @@ -187,26 +188,15 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
self.ctx.pipeline_config
and self.ctx.pipeline_config.flags.generate_browse_path_v2
):
platform = getattr(self, "platform", None) or getattr(
self.get_config(), "platform", None
)
env = getattr(self.get_config(), "env", None)
browse_path_drop_dirs = [
platform,
platform and platform.lower(),
env,
env and env.lower(),
]
browse_path_processor = partial(
auto_browse_path_v2,
[s for s in browse_path_drop_dirs if s is not None],
browse_path_processor = self._get_browse_path_processor(
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run
)

return [
auto_status_aspect,
auto_materialize_referenced_tags,
partial(auto_workunit_reporter, self.get_report()),
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the original order made more sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to report these browse path workunits?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh are these run top to bottom? For some reason I thought the bottom was the innermost one. In that case this is fine

]

@staticmethod
Expand Down Expand Up @@ -248,6 +238,34 @@ def get_report(self) -> SourceReport:
def close(self) -> None:
pass

def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
config = self.get_config()
platform = getattr(self, "platform", None) or getattr(config, "platform", None)
env = getattr(config, "env", None)
browse_path_drop_dirs = [
platform,
platform and platform.lower(),
env,
env and env.lower(),
]

platform_key: Optional[PlatformKey] = None
if (
platform
and isinstance(config, PlatformInstanceConfigMixin)
and config.platform_instance
):
platform_key = PlatformKey(
platform=platform, instance=config.platform_instance
)

return partial(
auto_browse_path_v2,
platform_key=platform_key,
drop_dirs=[s for s in browse_path_drop_dirs if s is not None],
dry_run=dry_run,
)


class TestableSource(Source):
@staticmethod
Expand Down
187 changes: 129 additions & 58 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Callable,
Expand All @@ -9,11 +8,14 @@
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
)

from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import PlatformKey
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
Expand All @@ -25,6 +27,7 @@
StatusClass,
TagKeyClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns
Expand Down Expand Up @@ -166,68 +169,136 @@ def auto_materialize_referenced_tags(


def auto_browse_path_v2(
drop_dirs: Sequence[str],
stream: Iterable[MetadataWorkUnit],
*,
dry_run: bool = False,
drop_dirs: Sequence[str] = (),
platform_key: Optional[PlatformKey] = None,
) -> Iterable[MetadataWorkUnit]:
"""Generate BrowsePathsV2 from Container and BrowsePaths aspects."""
"""Generate BrowsePathsV2 from Container and BrowsePaths aspects.

ignore_urns: Set[str] = set()
legacy_browse_paths: Dict[str, List[str]] = defaultdict(list)
container_urns: Set[str] = set()
parent_container_map: Dict[str, str] = {}
children: Dict[str, List[str]] = defaultdict(list)
for wu in stream:
yield wu
Generates browse paths v2 on demand, rather than waiting for end of ingestion,
for better UI experience while ingestion is running.

urn = wu.get_urn()
if guess_entity_type(urn) == "container":
container_urns.add(urn)

container_aspects = wu.get_aspects_of_type(ContainerClass)
for c_aspect in container_aspects:
parent = c_aspect.container
parent_container_map[urn] = parent
children[parent].append(urn)

browse_path_aspects = wu.get_aspects_of_type(BrowsePathsClass)
for b_aspect in browse_path_aspects:
if b_aspect.paths:
path = b_aspect.paths[0] # Only take first path
legacy_browse_paths[urn] = [
p for p in path.strip("/").split("/") if p.strip() not in drop_dirs
To do this, assumes entities in container structure arrive in topological order
and that all relevant aspects (Container, BrowsePaths, BrowsePathsV2) for an urn
arrive together in a batch.

Calculates the correct BrowsePathsV2 at end of workunit stream,
and emits "corrections", i.e. a final BrowsePathsV2 for any urns that have changed.
"""

# For telemetry, to see if our sources violate assumptions
num_out_of_order = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
num_out_of_order = 0
num_containers_out_of_order = 0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name can be confusing because it's num_container_aspects_out_of_order rather than num_container_entities_out_of_order. I'll think about it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine too - I just wanted the word container in there

num_out_of_batch = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
num_out_of_batch = 0
num_aspects_out_of_batch = 0


# Set for all containers and urns with a Container aspect
# Used to construct container paths while iterating through stream
# Assumes topological order of entities in stream
paths: Dict[str, List[BrowsePathEntryClass]] = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe note that this one does not contain platform instance details


emitted_urns: Set[str] = set()
containers_used_as_parent: Set[str] = set()
for urn, batch in _batch_workunits_by_urn(stream):
container_path: Optional[List[BrowsePathEntryClass]] = None
legacy_path: Optional[List[BrowsePathEntryClass]] = None
has_browse_path_v2 = False

for wu in batch:
yield wu
if not wu.is_primary_source:
continue

container_aspect = wu.get_aspect_of_type(ContainerClass)
if container_aspect:
parent_urn = container_aspect.container
containers_used_as_parent.add(parent_urn)
paths[urn] = [
*paths.setdefault(parent_urn, []), # Guess parent has no parents
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it'd be more clear from a code readability perspective to split this into two lines

also, do we need paths.setdefault here or can we just use paths.get(parent_urn, []). the mutation is throwing me off a bit

parent_path = paths.setdefault(parent_urn, [])
paths[urn] = [...]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this used to have an impact but looking at the code, doesn't seem like it anymore. I'll just replace with the .get(...)

BrowsePathEntryClass(id=parent_urn, urn=parent_urn),
]
container_path = paths[urn]

if urn in containers_used_as_parent:
# Topological order invariant violated; we've used the previous value of paths[urn]
# TODO: Add sentry alert
num_out_of_order += 1

browse_path_aspect = wu.get_aspect_of_type(BrowsePathsClass)
if browse_path_aspect and browse_path_aspect.paths:
legacy_path = [
BrowsePathEntryClass(id=p.strip())
for p in browse_path_aspect.paths[0].strip("/").split("/")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slight nit - we call p.strip() three times, so maybe we should have an inner generator that breaks those apart

or maybe a helper method for _split_legacy_browse_path_entry?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeahh I didn't like this much, but it's solved by upgrading to python 3.8 and using the walrus operator, which we will have to do quite soon. I can add a TODO to update once we upgrade though

if p.strip() and p.strip() not in drop_dirs
]

if wu.get_aspect_of_type(BrowsePathsV2Class):
has_browse_path_v2 = True

path = container_path or legacy_path
if (path is not None or has_browse_path_v2) and urn in emitted_urns:
# Batch invariant violated
# TODO: Add sentry alert
num_out_of_batch += 1
elif has_browse_path_v2:
emitted_urns.add(urn)
elif path is not None:
emitted_urns.add(urn)
if not dry_run:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(
path=_prepend_platform_instance(path, platform_key)
),
).as_workunit()
elif urn not in emitted_urns and guess_entity_type(urn) == "container":
# Root containers have no Container aspect, so they are not handled above
emitted_urns.add(urn)
if not dry_run:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(
path=_prepend_platform_instance([], platform_key)
),
).as_workunit()

if num_out_of_batch or num_out_of_order:
properties = {
"platform": platform_key.platform if platform_key else None,
"has_platform_instance": bool(platform_key.instance)
if platform_key
else False,
"num_out_of_batch": num_out_of_batch,
"num_out_of_order": num_out_of_order,
}
telemetry.telemetry_instance.ping("incorrect_browse_path_v2", properties)


def _batch_workunits_by_urn(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[Tuple[str, List[MetadataWorkUnit]]]:
batch: List[MetadataWorkUnit] = []
batch_urn: Optional[str] = None
for wu in stream:
if wu.get_urn() != batch_urn:
if batch_urn is not None:
yield batch_urn, batch
batch = []

if wu.get_aspects_of_type(BrowsePathsV2Class):
ignore_urns.add(urn)
batch.append(wu)
batch_urn = wu.get_urn()

paths: Dict[str, List[str]] = {} # Maps urn -> list of urns in path
# Yield browse paths v2 in topological order, starting with root containers
processed_urns = set()
nodes = container_urns - parent_container_map.keys()
while nodes:
node = nodes.pop()
nodes.update(children[node])
if batch_urn is not None:
yield batch_urn, batch

if node not in parent_container_map: # root
paths[node] = []
else:
parent = parent_container_map[node]
paths[node] = [*paths[parent], parent]
if node not in ignore_urns:
yield MetadataChangeProposalWrapper(
entityUrn=node,
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass(id=urn, urn=urn) for urn in paths[node]]
),
).as_workunit()
processed_urns.add(node)

# Yield browse paths v2 based on browse paths v1 (legacy)
# Only done if the entity is not part of a container hierarchy
for urn in legacy_browse_paths.keys() - processed_urns - ignore_urns:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass(id=p) for p in legacy_browse_paths[urn]]
),
).as_workunit()

def _prepend_platform_instance(
entries: List[BrowsePathEntryClass], platform_key: Optional[PlatformKey]
) -> List[BrowsePathEntryClass]:
if platform_key and platform_key.instance:
urn = make_dataplatform_instance_urn(
platform_key.platform, platform_key.instance
)
return [BrowsePathEntryClass(id=urn, urn=urn)] + entries

return entries
12 changes: 9 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass
from typing import Iterable, List, Optional, Type, TypeVar, Union, overload
from typing import Iterable, Optional, Type, TypeVar, Union, overload

from deprecated import deprecated

Expand All @@ -11,6 +12,8 @@
)
from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect

logger = logging.getLogger(__name__)

T_Aspect = TypeVar("T_Aspect", bound=_Aspect)


Expand Down Expand Up @@ -90,7 +93,7 @@ def get_urn(self) -> str:
assert self.metadata.entityUrn
return self.metadata.entityUrn

def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]:
def get_aspect_of_type(self, aspect_cls: Type[T_Aspect]) -> Optional[T_Aspect]:
aspects: list
if isinstance(self.metadata, MetadataChangeEvent):
aspects = self.metadata.proposedSnapshot.aspects
Expand All @@ -109,7 +112,10 @@ def get_aspects_of_type(self, aspect_cls: Type[T_Aspect]) -> List[T_Aspect]:
else:
raise ValueError(f"Unexpected type {type(self.metadata)}")

return [a for a in aspects if isinstance(a, aspect_cls)]
aspects = [a for a in aspects if isinstance(a, aspect_cls)]
if len(aspects) > 1:
logger.warning(f"Found multiple aspects of type {aspect_cls} in MCE {self}")
return aspects[-1] if aspects else None

def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
from datahub.emitter.mcp_builder import mcps_from_mce
Expand Down
12 changes: 10 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ class FlagsConfig(ConfigModel):
"""

generate_browse_path_v2: bool = Field(
default=False,
default=True,
description="Generate BrowsePathsV2 aspects from container hierarchy and existing BrowsePaths aspects.",
)

generate_browse_path_v2_dry_run: bool = Field(
default=True,
description=(
"Run through browse paths v2 generation but do not actually write the aspects to DataHub. "
"Requires `generate_browse_path_v2` to also be enabled."
),
)


class PipelineConfig(ConfigModel):
# Once support for discriminated unions gets merged into Pydantic, we can
Expand All @@ -58,7 +66,7 @@ class PipelineConfig(ConfigModel):
source: SourceConfig
sink: DynamicTypedConfig
transformers: Optional[List[DynamicTypedConfig]]
flags: FlagsConfig = Field(default=FlagsConfig())
flags: FlagsConfig = Field(default=FlagsConfig(), hidden_from_docs=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pipeline config docs aren't autogenerated anywhere so this is a no-op, but still nice to have

reporting: List[ReporterConfig] = []
run_id: str = DEFAULT_RUN_ID
datahub_api: Optional[DatahubClientConfig] = None
Expand Down
Loading