Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ingest): remove inferred args to MCPW, part 2 #6905

Merged
merged 2 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 23 additions & 76 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
from typing import Any, Dict, Iterable, List, Optional, TypeVar

from deprecated import deprecated
from pydantic.fields import Field
from pydantic.main import BaseModel

Expand All @@ -18,7 +19,6 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties
from datahub.metadata.schema_classes import (
ChangeTypeClass,
ContainerClass,
DomainsClass,
GlobalTagsClass,
Expand All @@ -31,7 +31,6 @@
TagAssociationClass,
_Aspect,
)
from datahub.utilities.urns.urn import guess_entity_type


def _stable_guid_from_dict(d: dict) -> str:
Expand Down Expand Up @@ -129,24 +128,18 @@ def default(self, obj: Any) -> Any:


def add_domain_to_entity_wu(
entity_type: str, entity_urn: str, domain_urn: str
entity_urn: str, domain_urn: str
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{entity_urn}",
aspect=DomainsClass(domains=[domain_urn]),
)
wu = MetadataWorkUnit(id=f"{domain_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()


def add_owner_to_entity_wu(
entity_type: str, entity_urn: str, owner_urn: str
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{entity_urn}",
aspect=OwnershipClass(
owners=[
Expand All @@ -156,26 +149,22 @@ def add_owner_to_entity_wu(
)
]
),
)
wu = MetadataWorkUnit(id=f"{owner_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()


def add_tags_to_entity_wu(
entity_type: str, entity_urn: str, tags: List[str]
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
yield MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspect=GlobalTagsClass(
tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in tags]
),
)
wu = MetadataWorkUnit(id=f"tags-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()


@deprecated("use MetadataChangeProposalWrapper(...).as_workunit() instead")
def wrap_aspect_as_workunit(
entityName: str,
entityUrn: str,
Expand Down Expand Up @@ -210,9 +199,7 @@ def gen_containers(
container_urn = make_container_urn(
guid=container_key.guid(),
)
mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=ContainerProperties(
Expand All @@ -229,51 +216,32 @@ def gen_containers(
if last_modified is not None
else None,
),
)
wu = MetadataWorkUnit(id=f"container-info-{name}-{container_urn}", mcp=mcp)
yield wu
).as_workunit()

# add status
yield wrap_aspect_as_workunit(
entityName="container",
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
aspect=StatusClass(removed=False),
aspectName=StatusClass.get_aspect_name(),
)
).as_workunit()

mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=DataPlatformInstance(
platform=f"{make_data_platform_urn(container_key.platform)}",
instance=f"{make_dataplatform_instance_urn(container_key.platform, container_key.instance)}"
if container_key.instance
else None,
),
)
wu = MetadataWorkUnit(
id=f"container-platforminstance-{name}-{container_urn}", mcp=mcp
)
yield wu
).as_workunit()

# Set subtype
subtype_mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=SubTypesClass(typeNames=sub_types),
)
wu = MetadataWorkUnit(
id=f"container-subtypes-{name}-{container_urn}", mcp=subtype_mcp
)
yield wu
).as_workunit()

if domain_urn:
yield from add_domain_to_entity_wu(
entity_type="container",
entity_urn=container_urn,
domain_urn=domain_urn,
)
Expand All @@ -299,39 +267,23 @@ def gen_containers(

# Set database container
parent_container_mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=ContainerClass(container=parent_container_urn),
# aspect=ContainerKeyClass(guid=database_container_key.guid())
)
wu = MetadataWorkUnit(
id=f"container-parent-container-{name}-{container_urn}-{parent_container_urn}",
mcp=parent_container_mcp,
)

yield wu
yield parent_container_mcp.as_workunit()


def add_dataset_to_container(
# FIXME: Union requires two or more type arguments
container_key: KeyType,
dataset_urn: str,
container_key: KeyType, dataset_urn: str
) -> Iterable[MetadataWorkUnit]:
container_urn = make_container_urn(
guid=container_key.guid(),
)

mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{dataset_urn}",
aspect=ContainerClass(container=f"{container_urn}"),
# aspect=ContainerKeyClass(guid=schema_container_key.guid())
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{dataset_urn}", mcp=mcp)
yield wu
).as_workunit()


def add_entity_to_container(
Expand All @@ -340,23 +292,18 @@ def add_entity_to_container(
container_urn = make_container_urn(
guid=container_key.guid(),
)
mcp = MetadataChangeProposalWrapper(
yield MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=entity_urn,
aspect=ContainerClass(container=f"{container_urn}"),
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()


def mcps_from_mce(
mce: MetadataChangeEventClass,
) -> Iterable[MetadataChangeProposalWrapper]:
for aspect in mce.proposedSnapshot.aspects:
yield MetadataChangeProposalWrapper(
entityType=guess_entity_type(mce.proposedSnapshot.urn),
changeType=ChangeTypeClass.UPSERT,
entityUrn=mce.proposedSnapshot.urn,
auditHeader=mce.auditHeader,
aspect=aspect,
Expand Down
4 changes: 1 addition & 3 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,12 +930,11 @@ def _gen_domain_urn(self, dataset_name: str) -> Optional[str]:
return None

def _get_domain_wu(
self, dataset_name: str, entity_urn: str, entity_type: str
self, dataset_name: str, entity_urn: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
Expand Down Expand Up @@ -985,7 +984,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self._get_domain_wu(
dataset_name=full_table_name,
entity_urn=dataset_urn,
entity_type="dataset",
)
yield from self.add_table_to_database_container(
dataset_urn=dataset_urn, db_name=database_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,12 +800,10 @@ def _get_domain_wu(
self,
dataset_name: str,
entity_urn: str,
entity_type: str,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
Expand Down Expand Up @@ -963,7 +961,6 @@ def gen_dataset_workunits(
yield from self._get_domain_wu(
dataset_name=str(datahub_dataset_name),
entity_urn=dataset_urn,
entity_type="dataset",
)

def gen_lineage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpGroupInfoClass,
CorpUserInfoClass,
GroupMembershipClass,
Expand Down Expand Up @@ -296,10 +295,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield wu

group_origin_mcp = MetadataChangeProposalWrapper(
entityType="corpGroup",
entityUrn=datahub_corp_group_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="origin",
aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"),
)
group_origin_wu_id = f"group-origin-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
Expand All @@ -310,10 +306,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield group_origin_wu

group_status_mcp = MetadataChangeProposalWrapper(
entityType="corpGroup",
entityUrn=datahub_corp_group_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=False),
)
group_status_wu_id = f"group-status-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
Expand Down Expand Up @@ -445,10 +438,7 @@ def ingest_ad_users(
yield wu

user_origin_mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=datahub_corp_user_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="origin",
aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"),
)
user_origin_wu_id = f"user-origin-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
Expand All @@ -457,10 +447,7 @@ def ingest_ad_users(
yield user_origin_wu

user_status_mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=datahub_corp_user_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=False),
)
user_status_wu_id = f"user-status-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ def _extract_record(

if domain_urn:
wus = add_domain_to_entity_wu(
entity_type="dataset",
entity_urn=dataset_urn,
domain_urn=domain_urn,
)
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,6 @@ def _extract_record(

if domain_urn:
wus = add_domain_to_entity_wu(
entity_type="dataset",
entity_urn=dataset_urn,
domain_urn=domain_urn,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def get_domain_workunit(

if domain_urn:
yield from add_domain_to_entity_wu(
domain_urn=domain_urn, entity_type="dataset", entity_urn=datasetUrn
domain_urn=domain_urn, entity_urn=datasetUrn
)

def get_platform_instance_workunit(self, datasetUrn: str) -> WorkUnit:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,10 @@ def wrap_aspect_as_workunit(
aspectName: str,
aspect: _Aspect,
) -> MetadataWorkUnit:
id = f"{aspectName}-for-{entityUrn}"
if "timestampMillis" in aspect._inner_dict:
id = f"{aspectName}-{aspect.timestampMillis}-for-{entityUrn}" # type: ignore
wu = MetadataWorkUnit(
id=id,
mcp=MetadataChangeProposalWrapper(
entityUrn=entityUrn,
aspect=aspect,
),
)
wu = MetadataChangeProposalWrapper(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear why we were doing this in the first place

entityUrn=entityUrn,
aspect=aspect,
).as_workunit()
self.report.report_workunit(wu)
return wu

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,6 @@ def gen_dataset_workunits(
yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
)

if (
Expand Down Expand Up @@ -1082,12 +1081,10 @@ def _get_domain_wu(
self,
dataset_name: str,
entity_urn: str,
entity_type: str,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
Expand Down
Loading