Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(docs): tutorial for writing a custom transformer #2959

Merged
merged 10 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ module.exports = {
"docs/docker/development",
"metadata-ingestion/adding-source",
"metadata-ingestion/s3-ingestion",
"metadata-ingestion/transformers",
//"docs/what/graph",
//"docs/what/search-index",
//"docs/how/add-new-aspect",
Expand Down
55 changes: 2 additions & 53 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -948,58 +948,7 @@ sink:

## Transformations

Beyond basic ingestion, sometimes there might exist a need to modify the source data before passing it on to the sink.
Example use cases could be to add ownership information, add extra tags etc.

In such a scenario, it is possible to configure a recipe with a list of transformers.

```yml
transformers:
- type: "fully-qualified-class-name-of-transformer"
config:
some_property: "some.value"
```

A transformer class needs to inherit from [`Transformer`](./src/datahub/ingestion/api/transform.py).

### `simple_add_dataset_ownership`

Adds a set of owners to every dataset.

```yml
transformers:
- type: "simple_add_dataset_ownership"
config:
owner_urns:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
```

:::tip

If you'd like to add more complex logic for assigning ownership, you can use the more generic [`add_dataset_ownership` transformer](./src/datahub/ingestion/transformer/add_dataset_ownership.py), which calls a user-provided function to determine the ownership of each dataset.

:::

### `simple_add_dataset_tags`

Adds a set of tags to every dataset.

```yml
transformers:
- type: "simple_add_dataset_tags"
config:
tag_urns:
- "urn:li:tag:NeedsDocumentation"
- "urn:li:tag:Legacy"
```

:::tip

If you'd like to add more complex logic for assigning tags, you can use the more generic [`add_dataset_tags` transformer](./src/datahub/ingestion/transformer/add_dataset_tags.py), which calls a user-provided function to determine the tags for each dataset.

:::
See the [transformers guide](./transformers.md).
kevinhu marked this conversation as resolved.
Show resolved Hide resolved

## Using as a library

Expand Down Expand Up @@ -1067,4 +1016,4 @@ In order to use this example, you must first configure the Datahub hook. Like in

## Developing

See the [developing guide](./developing.md) or the [adding a source guide](./adding-source.md).
See the [developing guide](./developing.md), [adding a source guide](./adding-source.md) and the [using transformers](./transformers.md) guides.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
See the [developing guide](./developing.md), [adding a source guide](./adding-source.md) and the [using transformers](./transformers.md) guides.
See the [developing guide](./developing.md), [adding a source guide](./adding-source.md), and the [using transformers guide](./transformers.md).

76 changes: 76 additions & 0 deletions metadata-ingestion/examples/transforms/custom_transform_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from datahub.configuration.common import ConfigModel


class AddCustomOwnershipConfig(ConfigModel):
owners_json: str


import json
from typing import Iterable

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)


class AddCustomOwnership(Transformer):
"""Transformer that adds owners to datasets according to a callback function."""

# context param to generate run metadata such as a run ID
ctx: PipelineContext
# as defined in the previous block
config: AddCustomOwnershipConfig

def __init__(self, config: AddCustomOwnershipConfig, ctx: PipelineContext):
self.ctx = ctx
self.config = config

with open(self.config.owners_json, "r") as f:
raw_owner_urns = json.load(f)

self.owners = [
OwnerClass(owner=owner, type=OwnershipTypeClass.DATAOWNER)
for owner in raw_owner_urns
]

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddCustomOwnership":
config = AddCustomOwnershipConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:

# loop over envelopes
for envelope in record_envelopes:

# if envelope is an MCE, add the ownership classes
if isinstance(envelope.record, MetadataChangeEventClass):
envelope.record = self.transform_one(envelope.record)
yield envelope

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce

owners_to_add = self.owners

if owners_to_add:
ownership = builder.get_or_add_aspect(
mce,
OwnershipClass(
owners=[],
),
)
ownership.owners.extend(owners_to_add)

return mce
7 changes: 7 additions & 0 deletions metadata-ingestion/examples/transforms/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from setuptools import find_packages, setup

setup(
name="custom_transform_example",
version="1.0",
packages=find_packages(),
)
Loading