Skip to content

Commit

Permalink
feat(ingest/dbt): support a datahub section in meta mappings (datah…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Apr 26, 2024
1 parent 7e69247 commit 3ab4ec9
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 36 deletions.
5 changes: 3 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str:
)


def get_class_fields(_class: Type[object]) -> Iterable[str]:
def _get_enum_options(_class: Type[object]) -> Iterable[str]:
return [
f
for f in dir(_class)
Expand All @@ -378,7 +378,8 @@ def get_class_fields(_class: Type[object]) -> Iterable[str]:
def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]:
if ownership_type.startswith("urn:li:"):
return OwnershipTypeClass.CUSTOM, ownership_type
if ownership_type in get_class_fields(OwnershipTypeClass):
ownership_type = ownership_type.upper()
if ownership_type in _get_enum_options(OwnershipTypeClass):
return ownership_type, None
raise ValueError(f"Unexpected ownership type: {ownership_type}")

Expand Down
116 changes: 83 additions & 33 deletions metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
from functools import reduce
from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast

from datahub.configuration.common import ConfigModel
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import OwnerType
from datahub.emitter.mce_builder import (
OwnerType,
make_user_urn,
validate_ownership_type,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
InstitutionalMemoryClass,
Expand Down Expand Up @@ -83,6 +88,36 @@ class Constants:
SEPARATOR = "separator"


class _MappingOwner(ConfigModel):
owner: str
owner_type: str = OwnershipTypeClass.DATAOWNER


class _DatahubProps(ConfigModel):
owners: List[Union[str, _MappingOwner]]

def make_owner_category_list(self) -> List[Dict]:
res = []
for owner in self.owners:
if isinstance(owner, str):
owner_id = owner
owner_category = OwnershipTypeClass.DATAOWNER
else:
owner_id = owner.owner
owner_category = owner.owner_type
owner_id = make_user_urn(owner_id)
owner_category, owner_category_urn = validate_ownership_type(owner_category)

res.append(
{
"urn": owner_id,
"category": owner_category,
"categoryUrn": owner_category_urn,
}
)
return res


class OperationProcessor:
"""
A general class that processes a dictionary of properties and operations defined on it.
Expand Down Expand Up @@ -128,7 +163,7 @@ def __init__(
self.owner_source_type = owner_source_type
self.match_nested_props = match_nested_props

def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # noqa: C901
# Defining the following local variables -
# operations_map - the final resulting map when operations are processed.
# Against each operation the values to be applied are stored.
Expand All @@ -137,9 +172,35 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
# operation config: map which contains the parameters to carry out that operation.
# For e.g for add_tag operation config will have the tag value.
# operation_type: the type of operation (add_tag, add_term, etc.)
aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object

# Process the special "datahub" property, which supports tags, terms, and owners.
operations_map: Dict[str, list] = {}
try:
datahub_prop = raw_props.get("datahub")
if datahub_prop and isinstance(datahub_prop, dict):
if datahub_prop.get("tags"):
# Note that tags get converted to urns later because we need to support the tag prefix.
tags = datahub_prop["tags"]
operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend(
tags
)

if datahub_prop.get("terms"):
terms = datahub_prop["terms"]
operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend(
mce_builder.make_term_urn(term) for term in terms
)

if datahub_prop.get("owners"):
owners = _DatahubProps.parse_obj_allow_extras(datahub_prop)
operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend(
owners.make_owner_category_list()
)
except Exception as e:
logger.error(f"Error while processing datahub property: {e}")

# Process the actual directives.
try:
operations_map: Dict[str, Union[set, list]] = {}
for operation_key in self.operation_defs:
operation_type = self.operation_defs.get(operation_key, {}).get(
Constants.OPERATION
Expand Down Expand Up @@ -177,42 +238,36 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]:
isinstance(operation, list)
and operation_type == Constants.ADD_OWNER_OPERATION
):
operation_value_list = operations_map.get(
operation_type, list()
)
cast(List, operation_value_list).extend(
operations_map.setdefault(operation_type, []).extend(
operation
) # cast to silent the lint
operations_map[operation_type] = operation_value_list
)

elif isinstance(operation, (str, list)):
operations_value_set = operations_map.get(
operation_type, set()
operations_map.setdefault(operation_type, []).extend(
operation
if isinstance(operation, list)
else [operation]
)
if isinstance(operation, list):
operations_value_set.update(operation) # type: ignore
else:
operations_value_set.add(operation) # type: ignore
operations_map[operation_type] = operations_value_set
else:
operations_value_list = operations_map.get(
operation_type, list()
operations_map.setdefault(operation_type, []).append(
operation
)
operations_value_list.append(operation) # type: ignore
operations_map[operation_type] = operations_value_list
aspect_map = self.convert_to_aspects(operations_map)
except Exception as e:
logger.error(f"Error while processing operation defs over raw_props: {e}")

aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object
try:
aspect_map = self.convert_to_aspects(operations_map)
except Exception as e:
logger.error(f"Error while converting operations map to aspects: {e}")
return aspect_map

def convert_to_aspects(
self, operation_map: Dict[str, Union[set, list]]
) -> Dict[str, Any]:
def convert_to_aspects(self, operation_map: Dict[str, list]) -> Dict[str, Any]:
aspect_map: Dict[str, Any] = {}

if Constants.ADD_TAG_OPERATION in operation_map:
tag_aspect = mce_builder.make_global_tag_aspect_with_tag_list(
sorted(operation_map[Constants.ADD_TAG_OPERATION])
sorted(set(operation_map[Constants.ADD_TAG_OPERATION]))
)

aspect_map[Constants.ADD_TAG_OPERATION] = tag_aspect
Expand Down Expand Up @@ -240,7 +295,7 @@ def convert_to_aspects(

if Constants.ADD_TERM_OPERATION in operation_map:
term_aspect = mce_builder.make_glossary_terms_aspect_from_urn_list(
sorted(operation_map[Constants.ADD_TERM_OPERATION])
sorted(set(operation_map[Constants.ADD_TERM_OPERATION]))
)
aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect

Expand Down Expand Up @@ -319,12 +374,7 @@ def get_operation_value(
operation_config.get(Constants.OWNER_CATEGORY)
or OwnershipTypeClass.DATAOWNER
)
owner_category_urn: Optional[str] = None
if owner_category.startswith("urn:li:"):
owner_category_urn = owner_category
owner_category = OwnershipTypeClass.DATAOWNER
else:
owner_category = owner_category.upper()
owner_category, owner_category_urn = validate_ownership_type(owner_category)

if self.strip_owner_email_id:
owner_ids = [
Expand Down
51 changes: 50 additions & 1 deletion metadata-ingestion/tests/unit/test_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def test_operation_processor_ownership_category():
new_owner = ownership_aspect.owners[2]
assert new_owner.owner == "urn:li:corpuser:bob"
assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL"
assert new_owner.type == OwnershipTypeClass.DATAOWNER # dummy value
assert new_owner.type == OwnershipTypeClass.CUSTOM
assert new_owner.typeUrn == "urn:li:ownershipType:architect"


Expand Down Expand Up @@ -347,3 +347,52 @@ def test_operation_processor_matching_dot_props():
tag_aspect: GlobalTagsClass = aspect_map["add_tag"]
assert len(tag_aspect.tags) == 1
assert tag_aspect.tags[0].tag == "urn:li:tag:pii"


def test_operation_processor_datahub_props():
raw_props = {
"datahub": {
"tags": ["tag1", "tag2"],
"terms": ["term1", "term2"],
"owners": [
"owner1",
"urn:li:corpGroup:group1",
{
"owner": "owner2",
"owner_type": "urn:li:ownershipType:steward",
},
{
"owner": "urn:li:corpGroup:group2",
"owner_type": "urn:li:ownershipType:steward",
},
],
}
}

processor = OperationProcessor(
operation_defs={},
owner_source_type="SOURCE_CONTROL",
)
aspect_map = processor.process(raw_props)

assert isinstance(aspect_map["add_owner"], OwnershipClass)
assert [
(owner.owner, owner.type, owner.typeUrn)
for owner in aspect_map["add_owner"].owners
] == [
("urn:li:corpGroup:group1", "DATAOWNER", None),
("urn:li:corpGroup:group2", "CUSTOM", "urn:li:ownershipType:steward"),
("urn:li:corpuser:owner1", "DATAOWNER", None),
("urn:li:corpuser:owner2", "CUSTOM", "urn:li:ownershipType:steward"),
]

assert isinstance(aspect_map["add_tag"], GlobalTagsClass)
assert [tag_association.tag for tag_association in aspect_map["add_tag"].tags] == [
"urn:li:tag:tag1",
"urn:li:tag:tag2",
]

assert isinstance(aspect_map["add_term"], GlossaryTermsClass)
assert [
term_association.urn for term_association in aspect_map["add_term"].terms
] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"]

0 comments on commit 3ab4ec9

Please sign in to comment.