Skip to content

Commit

Permalink
chore(ingest): remove inferred args to MCPW, part 2 (#6905)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 5, 2023
1 parent 8b1dc4b commit f651646
Show file tree
Hide file tree
Showing 16 changed files with 30 additions and 137 deletions.
99 changes: 23 additions & 76 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
from typing import Any, Dict, Iterable, List, Optional, TypeVar

from deprecated import deprecated
from pydantic.fields import Field
from pydantic.main import BaseModel

Expand All @@ -18,7 +19,6 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties
from datahub.metadata.schema_classes import (
ChangeTypeClass,
ContainerClass,
DomainsClass,
GlobalTagsClass,
Expand All @@ -31,7 +31,6 @@
TagAssociationClass,
_Aspect,
)
from datahub.utilities.urns.urn import guess_entity_type


def _stable_guid_from_dict(d: dict) -> str:
Expand Down Expand Up @@ -129,24 +128,18 @@ def default(self, obj: Any) -> Any:


def add_domain_to_entity_wu(
entity_type: str, entity_urn: str, domain_urn: str
entity_urn: str, domain_urn: str
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{entity_urn}",
aspect=DomainsClass(domains=[domain_urn]),
)
wu = MetadataWorkUnit(id=f"{domain_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()


def add_owner_to_entity_wu(
entity_type: str, entity_urn: str, owner_urn: str
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{entity_urn}",
aspect=OwnershipClass(
owners=[
Expand All @@ -156,26 +149,22 @@ def add_owner_to_entity_wu(
)
]
),
)
wu = MetadataWorkUnit(id=f"{owner_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()


def add_tags_to_entity_wu(
entity_type: str, entity_urn: str, tags: List[str]
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
yield MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspect=GlobalTagsClass(
tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in tags]
),
)
wu = MetadataWorkUnit(id=f"tags-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()


@deprecated("use MetadataChangeProposalWrapper(...).as_workunit() instead")
def wrap_aspect_as_workunit(
entityName: str,
entityUrn: str,
Expand Down Expand Up @@ -210,9 +199,7 @@ def gen_containers(
container_urn = make_container_urn(
guid=container_key.guid(),
)
mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=ContainerProperties(
Expand All @@ -229,51 +216,32 @@ def gen_containers(
if last_modified is not None
else None,
),
)
wu = MetadataWorkUnit(id=f"container-info-{name}-{container_urn}", mcp=mcp)
yield wu
).as_workunit()

# add status
yield wrap_aspect_as_workunit(
entityName="container",
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
aspect=StatusClass(removed=False),
aspectName=StatusClass.get_aspect_name(),
)
).as_workunit()

mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=DataPlatformInstance(
platform=f"{make_data_platform_urn(container_key.platform)}",
instance=f"{make_dataplatform_instance_urn(container_key.platform, container_key.instance)}"
if container_key.instance
else None,
),
)
wu = MetadataWorkUnit(
id=f"container-platforminstance-{name}-{container_urn}", mcp=mcp
)
yield wu
).as_workunit()

# Set subtype
subtype_mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=SubTypesClass(typeNames=sub_types),
)
wu = MetadataWorkUnit(
id=f"container-subtypes-{name}-{container_urn}", mcp=subtype_mcp
)
yield wu
).as_workunit()

if domain_urn:
yield from add_domain_to_entity_wu(
entity_type="container",
entity_urn=container_urn,
domain_urn=domain_urn,
)
Expand All @@ -299,39 +267,23 @@ def gen_containers(

# Set database container
parent_container_mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=ContainerClass(container=parent_container_urn),
# aspect=ContainerKeyClass(guid=database_container_key.guid())
)
wu = MetadataWorkUnit(
id=f"container-parent-container-{name}-{container_urn}-{parent_container_urn}",
mcp=parent_container_mcp,
)

yield wu
yield parent_container_mcp.as_workunit()


def add_dataset_to_container(
# FIXME: Union requires two or more type arguments
container_key: KeyType,
dataset_urn: str,
container_key: KeyType, dataset_urn: str
) -> Iterable[MetadataWorkUnit]:
container_urn = make_container_urn(
guid=container_key.guid(),
)

mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{dataset_urn}",
aspect=ContainerClass(container=f"{container_urn}"),
# aspect=ContainerKeyClass(guid=schema_container_key.guid())
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{dataset_urn}", mcp=mcp)
yield wu
).as_workunit()


def add_entity_to_container(
Expand All @@ -340,23 +292,18 @@ def add_entity_to_container(
container_urn = make_container_urn(
guid=container_key.guid(),
)
mcp = MetadataChangeProposalWrapper(
yield MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=entity_urn,
aspect=ContainerClass(container=f"{container_urn}"),
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()


def mcps_from_mce(
mce: MetadataChangeEventClass,
) -> Iterable[MetadataChangeProposalWrapper]:
for aspect in mce.proposedSnapshot.aspects:
yield MetadataChangeProposalWrapper(
entityType=guess_entity_type(mce.proposedSnapshot.urn),
changeType=ChangeTypeClass.UPSERT,
entityUrn=mce.proposedSnapshot.urn,
auditHeader=mce.auditHeader,
aspect=aspect,
Expand Down
4 changes: 1 addition & 3 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,12 +930,11 @@ def _gen_domain_urn(self, dataset_name: str) -> Optional[str]:
return None

def _get_domain_wu(
self, dataset_name: str, entity_urn: str, entity_type: str
self, dataset_name: str, entity_urn: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
Expand Down Expand Up @@ -985,7 +984,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self._get_domain_wu(
dataset_name=full_table_name,
entity_urn=dataset_urn,
entity_type="dataset",
)
yield from self.add_table_to_database_container(
dataset_urn=dataset_urn, db_name=database_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,12 +800,10 @@ def _get_domain_wu(
self,
dataset_name: str,
entity_urn: str,
entity_type: str,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
Expand Down Expand Up @@ -963,7 +961,6 @@ def gen_dataset_workunits(
yield from self._get_domain_wu(
dataset_name=str(datahub_dataset_name),
entity_urn=dataset_urn,
entity_type="dataset",
)

def gen_lineage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpGroupInfoClass,
CorpUserInfoClass,
GroupMembershipClass,
Expand Down Expand Up @@ -296,10 +295,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield wu

group_origin_mcp = MetadataChangeProposalWrapper(
entityType="corpGroup",
entityUrn=datahub_corp_group_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="origin",
aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"),
)
group_origin_wu_id = f"group-origin-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
Expand All @@ -310,10 +306,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield group_origin_wu

group_status_mcp = MetadataChangeProposalWrapper(
entityType="corpGroup",
entityUrn=datahub_corp_group_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=False),
)
group_status_wu_id = f"group-status-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
Expand Down Expand Up @@ -445,10 +438,7 @@ def ingest_ad_users(
yield wu

user_origin_mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=datahub_corp_user_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="origin",
aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"),
)
user_origin_wu_id = f"user-origin-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
Expand All @@ -457,10 +447,7 @@ def ingest_ad_users(
yield user_origin_wu

user_status_mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=datahub_corp_user_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=False),
)
user_status_wu_id = f"user-status-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ def _extract_record(

if domain_urn:
wus = add_domain_to_entity_wu(
entity_type="dataset",
entity_urn=dataset_urn,
domain_urn=domain_urn,
)
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,6 @@ def _extract_record(

if domain_urn:
wus = add_domain_to_entity_wu(
entity_type="dataset",
entity_urn=dataset_urn,
domain_urn=domain_urn,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def get_domain_workunit(

if domain_urn:
yield from add_domain_to_entity_wu(
domain_urn=domain_urn, entity_type="dataset", entity_urn=datasetUrn
domain_urn=domain_urn, entity_urn=datasetUrn
)

def get_platform_instance_workunit(self, datasetUrn: str) -> WorkUnit:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,10 @@ def wrap_aspect_as_workunit(
aspectName: str,
aspect: _Aspect,
) -> MetadataWorkUnit:
id = f"{aspectName}-for-{entityUrn}"
if "timestampMillis" in aspect._inner_dict:
id = f"{aspectName}-{aspect.timestampMillis}-for-{entityUrn}" # type: ignore
wu = MetadataWorkUnit(
id=id,
mcp=MetadataChangeProposalWrapper(
entityUrn=entityUrn,
aspect=aspect,
),
)
wu = MetadataChangeProposalWrapper(
entityUrn=entityUrn,
aspect=aspect,
).as_workunit()
self.report.report_workunit(wu)
return wu

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,6 @@ def gen_dataset_workunits(
yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
)

if table.tags:
Expand Down Expand Up @@ -1211,12 +1210,10 @@ def _get_domain_wu(
self,
dataset_name: str,
entity_urn: str,
entity_type: str,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
Expand Down
Loading

0 comments on commit f651646

Please sign in to comment.