diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 1c29d38273d82..ccc1267cd97f7 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -367,7 +367,7 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str: ) -def get_class_fields(_class: Type[object]) -> Iterable[str]: +def _get_enum_options(_class: Type[object]) -> Iterable[str]: return [ f for f in dir(_class) @@ -378,7 +378,8 @@ def get_class_fields(_class: Type[object]) -> Iterable[str]: def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]: if ownership_type.startswith("urn:li:"): return OwnershipTypeClass.CUSTOM, ownership_type - if ownership_type in get_class_fields(OwnershipTypeClass): + ownership_type = ownership_type.upper() + if ownership_type in _get_enum_options(OwnershipTypeClass): return ownership_type, None raise ValueError(f"Unexpected ownership type: {ownership_type}") diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index 5d26c3af54d5e..391657f841167 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -6,8 +6,13 @@ from functools import reduce from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast +from datahub.configuration.common import ConfigModel from datahub.emitter import mce_builder -from datahub.emitter.mce_builder import OwnerType +from datahub.emitter.mce_builder import ( + OwnerType, + make_user_urn, + validate_ownership_type, +) from datahub.metadata.schema_classes import ( AuditStampClass, InstitutionalMemoryClass, @@ -83,6 +88,36 @@ class Constants: SEPARATOR = "separator" +class _MappingOwner(ConfigModel): + owner: str + owner_type: str = OwnershipTypeClass.DATAOWNER + + +class _DatahubProps(ConfigModel): + owners: List[Union[str, _MappingOwner]] + + def make_owner_category_list(self) -> List[Dict]: + res = [] + for owner in self.owners: + if isinstance(owner, str): + owner_id = owner + owner_category = OwnershipTypeClass.DATAOWNER + else: + owner_id = owner.owner + owner_category = owner.owner_type + owner_id = make_user_urn(owner_id) + owner_category, owner_category_urn = validate_ownership_type(owner_category) + + res.append( + { + "urn": owner_id, + "category": owner_category, + "categoryUrn": owner_category_urn, + } + ) + return res + + class OperationProcessor: """ A general class that processes a dictionary of properties and operations defined on it. @@ -128,7 +163,7 @@ def __init__( self.owner_source_type = owner_source_type self.match_nested_props = match_nested_props - def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: + def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # noqa: C901 # Defining the following local variables - # operations_map - the final resulting map when operations are processed. # Against each operation the values to be applied are stored. @@ -137,9 +172,35 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # operation config: map which contains the parameters to carry out that operation. # For e.g for add_tag operation config will have the tag value. # operation_type: the type of operation (add_tag, add_term, etc.) - aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object + + # Process the special "datahub" property, which supports tags, terms, and owners. + operations_map: Dict[str, list] = {} + try: + datahub_prop = raw_props.get("datahub") + if datahub_prop and isinstance(datahub_prop, dict): + if datahub_prop.get("tags"): + # Note that tags get converted to urns later because we need to support the tag prefix. + tags = datahub_prop["tags"] + operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend( + tags + ) + + if datahub_prop.get("terms"): + terms = datahub_prop["terms"] + operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend( + mce_builder.make_term_urn(term) for term in terms + ) + + if datahub_prop.get("owners"): + owners = _DatahubProps.parse_obj_allow_extras(datahub_prop) + operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend( + owners.make_owner_category_list() + ) + except Exception as e: + logger.error(f"Error while processing datahub property: {e}") + + # Process the actual directives. try: - operations_map: Dict[str, Union[set, list]] = {} for operation_key in self.operation_defs: operation_type = self.operation_defs.get(operation_key, {}).get( Constants.OPERATION @@ -177,42 +238,36 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: isinstance(operation, list) and operation_type == Constants.ADD_OWNER_OPERATION ): - operation_value_list = operations_map.get( - operation_type, list() - ) - cast(List, operation_value_list).extend( + operations_map.setdefault(operation_type, []).extend( operation - ) # cast to silent the lint - operations_map[operation_type] = operation_value_list + ) elif isinstance(operation, (str, list)): - operations_value_set = operations_map.get( - operation_type, set() + operations_map.setdefault(operation_type, []).extend( + operation + if isinstance(operation, list) + else [operation] ) - if isinstance(operation, list): - operations_value_set.update(operation) # type: ignore - else: - operations_value_set.add(operation) # type: ignore - operations_map[operation_type] = operations_value_set else: - operations_value_list = operations_map.get( - operation_type, list() + operations_map.setdefault(operation_type, []).append( + operation ) - operations_value_list.append(operation) # type: ignore - operations_map[operation_type] = operations_value_list - aspect_map = self.convert_to_aspects(operations_map) except Exception as e: logger.error(f"Error while processing operation defs over raw_props: {e}") + + aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object + try: + aspect_map = self.convert_to_aspects(operations_map) + except Exception as e: + logger.error(f"Error while converting operations map to aspects: {e}") return aspect_map - def convert_to_aspects( - self, operation_map: Dict[str, Union[set, list]] - ) -> Dict[str, Any]: + def convert_to_aspects(self, operation_map: Dict[str, list]) -> Dict[str, Any]: aspect_map: Dict[str, Any] = {} if Constants.ADD_TAG_OPERATION in operation_map: tag_aspect = mce_builder.make_global_tag_aspect_with_tag_list( - sorted(operation_map[Constants.ADD_TAG_OPERATION]) + sorted(set(operation_map[Constants.ADD_TAG_OPERATION])) ) aspect_map[Constants.ADD_TAG_OPERATION] = tag_aspect @@ -240,7 +295,7 @@ def convert_to_aspects( if Constants.ADD_TERM_OPERATION in operation_map: term_aspect = mce_builder.make_glossary_terms_aspect_from_urn_list( - sorted(operation_map[Constants.ADD_TERM_OPERATION]) + sorted(set(operation_map[Constants.ADD_TERM_OPERATION])) ) aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect @@ -319,12 +374,7 @@ def get_operation_value( operation_config.get(Constants.OWNER_CATEGORY) or OwnershipTypeClass.DATAOWNER ) - owner_category_urn: Optional[str] = None - if owner_category.startswith("urn:li:"): - owner_category_urn = owner_category - owner_category = OwnershipTypeClass.DATAOWNER - else: - owner_category = owner_category.upper() + owner_category, owner_category_urn = validate_ownership_type(owner_category) if self.strip_owner_email_id: owner_ids = [ diff --git a/metadata-ingestion/tests/unit/test_mapping.py b/metadata-ingestion/tests/unit/test_mapping.py index 755a62fa32912..42b13f6dbefc7 100644 --- a/metadata-ingestion/tests/unit/test_mapping.py +++ b/metadata-ingestion/tests/unit/test_mapping.py @@ -235,7 +235,7 @@ def test_operation_processor_ownership_category(): new_owner = ownership_aspect.owners[2] assert new_owner.owner == "urn:li:corpuser:bob" assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL" - assert new_owner.type == OwnershipTypeClass.DATAOWNER # dummy value + assert new_owner.type == OwnershipTypeClass.CUSTOM assert new_owner.typeUrn == "urn:li:ownershipType:architect" @@ -347,3 +347,52 @@ def test_operation_processor_matching_dot_props(): tag_aspect: GlobalTagsClass = aspect_map["add_tag"] assert len(tag_aspect.tags) == 1 assert tag_aspect.tags[0].tag == "urn:li:tag:pii" + + +def test_operation_processor_datahub_props(): + raw_props = { + "datahub": { + "tags": ["tag1", "tag2"], + "terms": ["term1", "term2"], + "owners": [ + "owner1", + "urn:li:corpGroup:group1", + { + "owner": "owner2", + "owner_type": "urn:li:ownershipType:steward", + }, + { + "owner": "urn:li:corpGroup:group2", + "owner_type": "urn:li:ownershipType:steward", + }, + ], + } + } + + processor = OperationProcessor( + operation_defs={}, + owner_source_type="SOURCE_CONTROL", + ) + aspect_map = processor.process(raw_props) + + assert isinstance(aspect_map["add_owner"], OwnershipClass) + assert [ + (owner.owner, owner.type, owner.typeUrn) + for owner in aspect_map["add_owner"].owners + ] == [ + ("urn:li:corpGroup:group1", "DATAOWNER", None), + ("urn:li:corpGroup:group2", "CUSTOM", "urn:li:ownershipType:steward"), + ("urn:li:corpuser:owner1", "DATAOWNER", None), + ("urn:li:corpuser:owner2", "CUSTOM", "urn:li:ownershipType:steward"), + ] + + assert isinstance(aspect_map["add_tag"], GlobalTagsClass) + assert [tag_association.tag for tag_association in aspect_map["add_tag"].tags] == [ + "urn:li:tag:tag1", + "urn:li:tag:tag2", + ] + + assert isinstance(aspect_map["add_term"], GlossaryTermsClass) + assert [ + term_association.urn for term_association in aspect_map["add_term"].terms + ] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"]