-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(docs): tutorial for writing a custom transformer (#2959)
- Loading branch information
Showing
6 changed files
with
350 additions
and
52 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# Custom transformer script | ||
|
||
This script sets up a transformer that reads in a list of owner URNs from a JSON file specified via `owners_json` and appends these owners to every MCE. | ||
|
||
See the transformers tutorial (https://datahubproject.io/docs/metadata-ingestion/transformers) for how this module is built and run. |
77 changes: 77 additions & 0 deletions
77
metadata-ingestion/examples/transforms/custom_transform_example.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# see https://datahubproject.io/docs/metadata-ingestion/transformers for original tutorial | ||
from datahub.configuration.common import ConfigModel | ||
|
||
|
||
class AddCustomOwnershipConfig(ConfigModel): | ||
owners_json: str | ||
|
||
|
||
import json | ||
from typing import Iterable | ||
|
||
import datahub.emitter.mce_builder as builder | ||
from datahub.configuration.common import ConfigModel | ||
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope | ||
from datahub.ingestion.api.transform import Transformer | ||
from datahub.metadata.schema_classes import ( | ||
DatasetSnapshotClass, | ||
MetadataChangeEventClass, | ||
OwnerClass, | ||
OwnershipClass, | ||
OwnershipTypeClass, | ||
) | ||
|
||
|
||
class AddCustomOwnership(Transformer): | ||
"""Transformer that adds owners to datasets according to a callback function.""" | ||
|
||
# context param to generate run metadata such as a run ID | ||
ctx: PipelineContext | ||
# as defined in the previous block | ||
config: AddCustomOwnershipConfig | ||
|
||
def __init__(self, config: AddCustomOwnershipConfig, ctx: PipelineContext): | ||
self.ctx = ctx | ||
self.config = config | ||
|
||
with open(self.config.owners_json, "r") as f: | ||
raw_owner_urns = json.load(f) | ||
|
||
self.owners = [ | ||
OwnerClass(owner=owner, type=OwnershipTypeClass.DATAOWNER) | ||
for owner in raw_owner_urns | ||
] | ||
|
||
@classmethod | ||
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddCustomOwnership": | ||
config = AddCustomOwnershipConfig.parse_obj(config_dict) | ||
return cls(config, ctx) | ||
|
||
def transform( | ||
self, record_envelopes: Iterable[RecordEnvelope] | ||
) -> Iterable[RecordEnvelope]: | ||
|
||
# loop over envelopes | ||
for envelope in record_envelopes: | ||
|
||
# if envelope is an MCE, add the ownership classes | ||
if isinstance(envelope.record, MetadataChangeEventClass): | ||
envelope.record = self.transform_one(envelope.record) | ||
yield envelope | ||
|
||
def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass: | ||
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass): | ||
return mce | ||
|
||
owners_to_add = self.owners | ||
|
||
if owners_to_add: | ||
ownership = builder.get_or_add_aspect( | ||
mce, | ||
OwnershipClass( | ||
owners=[], | ||
), | ||
) | ||
ownership.owners.extend(owners_to_add) | ||
|
||
return mce |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from setuptools import find_packages, setup | ||
|
||
setup( | ||
name="custom_transform_example", | ||
version="1.0", | ||
packages=find_packages(), | ||
) |
Oops, something went wrong.