Skip to content

Commit

Permalink
feat(ingest): infer aspectName from aspect type in MCP (#5566)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Aug 7, 2022
1 parent c7f4778 commit 9790f3c
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 56 deletions.
97 changes: 88 additions & 9 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@
import types
import unittest.mock
from pathlib import Path
from typing import Dict, Iterable, List, Union
from typing import Any, Dict, Iterable, List, Union

import avro.schema
import click
from avrogen import write_schema_files


def load_schema_file(schema_file: str) -> str:
def load_schema_file(schema_file: Union[str, Path]) -> dict:
raw_schema_text = Path(schema_file).read_text()
return json.dumps(json.loads(raw_schema_text), indent=2)
return json.loads(raw_schema_text)


def merge_schemas(schemas: List[str]) -> str:
def merge_schemas(schemas_obj: List[Any]) -> str:
# Combine schemas.
schemas_obj = [json.loads(schema) for schema in schemas]
merged = ["null"] + schemas_obj

# Patch add_name method to NOT complain about duplicate names
Expand Down Expand Up @@ -114,11 +113,85 @@ def make_load_schema_methods(schemas: Iterable[str]) -> str:
)


def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
schema_classes_lines = schema_class_file.read_text().splitlines()
line_lookup_table = {line: i for i, line in enumerate(schema_classes_lines)}

# Create the Aspect class.
# We ensure that it cannot be instantiated directly, as
# per https://stackoverflow.com/a/7989101/5004662.
schema_classes_lines[
line_lookup_table["__SCHEMAS: Dict[str, RecordSchema] = {}"]
] += """
class _Aspect(DictWrapper):
ASPECT_NAME: str = None # type: ignore
def __init__(self):
if type(self) is _Aspect:
raise TypeError("_Aspect is an abstract class, and cannot be instantiated directly.")
super().__init__()
@classmethod
def get_aspect_name(cls) -> str:
return cls.ASPECT_NAME # type: ignore
"""

for aspect in aspects:
className = f'{aspect["name"]}Class'
aspectName = aspect["Aspect"]["name"]
class_def_original = f"class {className}(DictWrapper):"

# Make the aspects inherit from the Aspect class.
class_def_line = line_lookup_table[class_def_original]
schema_classes_lines[class_def_line] = f"class {className}(_Aspect):"

# Define the ASPECT_NAME class attribute.
# There's always an empty line between the docstring and the RECORD_SCHEMA class attribute.
# We need to find it and insert our line of code there.
empty_line = class_def_line + 1
while not (
schema_classes_lines[empty_line].strip() == ""
and schema_classes_lines[empty_line + 1]
.strip()
.startswith("RECORD_SCHEMA = ")
):
empty_line += 1
schema_classes_lines[empty_line] = f"\n ASPECT_NAME = '{aspectName}'"

schema_class_file.write_text("\n".join(schema_classes_lines))


@click.command()
@click.argument("schema_files", type=click.Path(exists=True), nargs=-1, required=True)
@click.argument(
"schemas_path", type=click.Path(exists=True, file_okay=False), required=True
)
@click.argument("outdir", type=click.Path(), required=True)
def generate(schema_files: List[str], outdir: str) -> None:
schemas: Dict[str, str] = {}
def generate(schemas_path: str, outdir: str) -> None:
required_schema_files = {
"mxe/MetadataChangeEvent.avsc",
"mxe/MetadataChangeProposal.avsc",
"usage/UsageAggregation.avsc",
"mxe/MetadataChangeLog.avsc",
"mxe/PlatformEvent.avsc",
"platform/event/v1/EntityChangeEvent.avsc",
}

# Find all the aspect schemas / other important schemas.
aspect_file_stems: List[str] = []
schema_files: List[Path] = []
for schema_file in Path(schemas_path).glob("**/*.avsc"):
relative_path = schema_file.relative_to(schemas_path).as_posix()
if relative_path in required_schema_files:
schema_files.append(schema_file)
required_schema_files.remove(relative_path)
elif load_schema_file(schema_file).get("Aspect"):
aspect_file_stems.append(schema_file.stem)
schema_files.append(schema_file)

assert not required_schema_files, f"Schema files not found: {required_schema_files}"

schemas: Dict[str, dict] = {}
for schema_file in schema_files:
schema = load_schema_file(schema_file)
schemas[Path(schema_file).stem] = schema
Expand All @@ -130,12 +203,18 @@ def generate(schema_files: List[str], outdir: str) -> None:
# Schema files post-processing.
(Path(outdir) / "__init__.py").write_text("# This file is intentionally empty.\n")
add_avro_python3_warning(Path(outdir) / "schema_classes.py")
annotate_aspects(
[schemas[aspect_file_stem] for aspect_file_stem in aspect_file_stems],
Path(outdir) / "schema_classes.py",
)

# Save raw schema files in codegen as well.
schema_save_dir = Path(outdir) / "schemas"
schema_save_dir.mkdir()
for schema_out_file, schema in schemas.items():
(schema_save_dir / f"{schema_out_file}.avsc").write_text(schema)
(schema_save_dir / f"{schema_out_file}.avsc").write_text(
json.dumps(schema, indent=2)
)

# Add load_schema method.
with open(schema_save_dir / "__init__.py", "a") as schema_dir_init:
Expand Down
22 changes: 2 additions & 20 deletions metadata-ingestion/scripts/codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,6 @@ OUTDIR=./src/datahub/metadata
# Note: this assumes that datahub has already been built with `./gradlew build`.
DATAHUB_ROOT=..
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
FILES="$SCHEMAS_ROOT/mxe/MetadataChangeEvent.avsc $SCHEMAS_ROOT/mxe/MetadataChangeProposal.avsc $SCHEMAS_ROOT/usage/UsageAggregation.avsc $SCHEMAS_ROOT/mxe/MetadataChangeLog.avsc $SCHEMAS_ROOT/mxe/PlatformEvent.avsc $SCHEMAS_ROOT/platform/event/v1/EntityChangeEvent.avsc"
# Since we depend on jq, check if jq is installed
if ! which jq > /dev/null; then
echo "jq is not installed. Please install jq and rerun (https://stedolan.github.io/jq/)"
exit 1
fi

find $SCHEMAS_ROOT -name "*.avsc" | sort | while read file
do
# Add all other files that are aspects but not included in the above
if (jq '.Aspect' -e $file > /dev/null)
then
FILES="${FILES} ${file}"
fi
echo $FILES > /tmp/codegen_files.txt
done

FILES=$(cat /tmp/codegen_files.txt)

rm -r $OUTDIR || true
python scripts/avro_codegen.py $FILES $OUTDIR
rm -r $OUTDIR 2>/dev/null || true
python scripts/avro_codegen.py $SCHEMAS_ROOT $OUTDIR
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import click
import requests
import yaml
from avrogen.dict_wrapper import DictWrapper
from pydantic import BaseModel, ValidationError
from requests.models import Response
from requests.sessions import Session
Expand Down Expand Up @@ -54,6 +53,7 @@
SubTypesClass,
UpstreamLineageClass,
ViewPropertiesClass,
_Aspect,
)
from datahub.utilities.urns.urn import Urn

Expand Down Expand Up @@ -712,7 +712,7 @@ def get_aspects_for_entity(
aspects: List[str] = [],
typed: bool = False,
cached_session_host: Optional[Tuple[Session, str]] = None,
) -> Dict[str, Union[dict, DictWrapper]]:
) -> Dict[str, Union[dict, _Aspect]]:
# Process non-timeseries aspects
non_timeseries_aspects: List[str] = [
a for a in aspects if a not in timeseries_class_to_aspect_name_map.values()
Expand Down Expand Up @@ -745,7 +745,7 @@ def get_aspects_for_entity(
aspect_cls.RECORD_SCHEMA.fullname.replace("pegasus2avro.", "")
] = aspect_value

aspect_map: Dict[str, Union[dict, DictWrapper]] = {}
aspect_map: Dict[str, Union[dict, _Aspect]] = {}
for a in aspect_list.values():
aspect_name = a["name"]
aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name(
Expand Down
5 changes: 3 additions & 2 deletions metadata-ingestion/src/datahub/cli/migration_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from avrogen.dict_wrapper import DictWrapper

from datahub.cli import cli_utils
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
ChangeTypeClass,
Expand Down Expand Up @@ -212,11 +213,11 @@ def container_modifier(

def modify_urn_list_for_aspect(
aspect_name: str,
aspect: DictWrapper,
aspect: Aspect,
relationship_type: str,
old_urn: str,
new_urn: str,
) -> DictWrapper:
) -> Aspect:

if hasattr(UrnListModifier, f"{aspect_name}_modifier"):
modifier = getattr(UrnListModifier, f"{aspect_name}_modifier")
Expand Down
7 changes: 2 additions & 5 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints

import typing_inspect
from avrogen.dict_wrapper import DictWrapper

from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION
from datahub.emitter.serialization_helper import pre_json_transform
Expand All @@ -33,9 +32,11 @@
UpstreamClass,
UpstreamLineageClass,
)
from datahub.metadata.schema_classes import _Aspect as AspectAbstract
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger = logging.getLogger(__name__)
Aspect = TypeVar("Aspect", bound=AspectAbstract)

DEFAULT_ENV = DEFAULT_ENV_CONFIGURATION
DEFAULT_FLOW_CLUSTER = "prod"
Expand Down Expand Up @@ -290,10 +291,6 @@ def make_lineage_mce(
return mce


# This bound isn't tight, but it's better than nothing.
Aspect = TypeVar("Aspect", bound=DictWrapper)


def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool:
SnapshotType = type(mce.proposedSnapshot)

Expand Down
22 changes: 20 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
KafkaAuditHeaderClass,
MetadataChangeProposalClass,
SystemMetadataClass,
_Aspect,
)


Expand All @@ -23,15 +24,32 @@ def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:

@dataclasses.dataclass
class MetadataChangeProposalWrapper:
# TODO: remove manually aspectName from the codebase
# TODO: (after) remove aspectName field from this class
# TODO: infer entityType from entityUrn
# TODO: set changeType's default to UPSERT

entityType: str
changeType: Union[str, ChangeTypeClass]
entityUrn: Union[None, str] = None
entityKeyAspect: Union[None, DictWrapper] = None
entityKeyAspect: Union[None, _Aspect] = None
auditHeader: Union[None, KafkaAuditHeaderClass] = None
aspectName: Union[None, str] = None
aspect: Union[None, DictWrapper] = None
aspect: Union[None, _Aspect] = None
systemMetadata: Union[None, SystemMetadataClass] = None

def __post_init__(self) -> None:
if not self.aspectName and self.aspect:
self.aspectName = self.aspect.get_aspect_name()
elif (
self.aspectName
and self.aspect
and self.aspectName != self.aspect.get_aspect_name()
):
raise ValueError(
f"aspectName {self.aspectName} does not match aspect type {type(self.aspect)} with name {self.aspect.get_aspect_name()}"
)

def make_mcp(self) -> MetadataChangeProposalClass:
serializedEntityKeyAspect: Union[None, GenericAspectClass] = None
if isinstance(self.entityKeyAspect, DictWrapper):
Expand Down
9 changes: 0 additions & 9 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def add_domain_to_entity_wu(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspectName="domains",
aspect=DomainsClass(domains=[domain_urn]),
)
wu = MetadataWorkUnit(id=f"{domain_urn}-to-{entity_urn}", mcp=mcp)
Expand All @@ -99,7 +98,6 @@ def add_owner_to_entity_wu(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspectName="ownership",
aspect=OwnershipClass(
owners=[
OwnerClass(
Expand All @@ -120,7 +118,6 @@ def add_tags_to_entity_wu(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspectName="globalTags",
aspect=GlobalTagsClass(
tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in tags]
),
Expand Down Expand Up @@ -148,7 +145,6 @@ def gen_containers(
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="containerProperties",
aspect=ContainerProperties(
name=name,
description=description,
Expand All @@ -164,7 +160,6 @@ def gen_containers(
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="dataPlatformInstance",
aspect=DataPlatformInstance(
platform=f"{make_data_platform_urn(container_key.platform)}",
),
Expand All @@ -180,7 +175,6 @@ def gen_containers(
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="subTypes",
aspect=SubTypesClass(typeNames=sub_types),
)
wu = MetadataWorkUnit(
Expand Down Expand Up @@ -220,7 +214,6 @@ def gen_containers(
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="container",
aspect=ContainerClass(container=parent_container_urn),
# aspect=ContainerKeyClass(guid=database_container_key.guid())
)
Expand All @@ -245,7 +238,6 @@ def add_dataset_to_container(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{dataset_urn}",
aspectName="container",
aspect=ContainerClass(container=f"{container_urn}"),
# aspect=ContainerKeyClass(guid=schema_container_key.guid())
)
Expand All @@ -263,7 +255,6 @@ def add_entity_to_container(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=entity_urn,
aspectName="container",
aspect=ContainerClass(container=f"{container_urn}"),
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{entity_urn}", mcp=mcp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
DatasetProfileClass,
DatasetPropertiesClass,
DateTypeClass,
DictWrapper,
EnumTypeClass,
ForeignKeyConstraintClass,
GlobalTagsClass,
Expand Down Expand Up @@ -711,7 +710,7 @@ def get_foreign_keys_from_field(
)

def wrap_aspect_as_workunit(
self, entityName: str, entityUrn: str, aspectName: str, aspect: DictWrapper
self, entityName: str, entityUrn: str, aspectName: str, aspect: builder.Aspect
) -> WorkUnit:
wu = MetadataWorkUnit(
id=f"{aspectName}-for-{entityUrn}",
Expand Down
Loading

0 comments on commit 9790f3c

Please sign in to comment.