Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(docs): tutorial for writing a custom transformer #2959

Merged
merged 10 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ function list_ids_in_directory(directory) {
return ids;
}

// note: to handle errors where you don't want a markdown file in the sidebar, add it as a comment.
// this will fix errors like `Error: File not accounted for in sidebar: ...`
module.exports = {
// users
// architects
Expand Down Expand Up @@ -73,6 +75,8 @@ module.exports = {
"docs/docker/development",
"metadata-ingestion/adding-source",
"metadata-ingestion/s3-ingestion",
//"metadata-ingestion/examples/transforms/README"
"metadata-ingestion/transformers",
//"docs/what/graph",
//"docs/what/search-index",
//"docs/how/add-new-aspect",
Expand Down
55 changes: 3 additions & 52 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -948,58 +948,9 @@ sink:

## Transformations

Beyond basic ingestion, sometimes there might exist a need to modify the source data before passing it on to the sink.
Example use cases could be to add ownership information, add extra tags etc.
If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub.

In such a scenario, it is possible to configure a recipe with a list of transformers.

```yml
transformers:
- type: "fully-qualified-class-name-of-transformer"
config:
some_property: "some.value"
```

A transformer class needs to inherit from [`Transformer`](./src/datahub/ingestion/api/transform.py).

### `simple_add_dataset_ownership`

Adds a set of owners to every dataset.

```yml
transformers:
- type: "simple_add_dataset_ownership"
config:
owner_urns:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
```

:::tip

If you'd like to add more complex logic for assigning ownership, you can use the more generic [`add_dataset_ownership` transformer](./src/datahub/ingestion/transformer/add_dataset_ownership.py), which calls a user-provided function to determine the ownership of each dataset.

:::

### `simple_add_dataset_tags`

Adds a set of tags to every dataset.

```yml
transformers:
- type: "simple_add_dataset_tags"
config:
tag_urns:
- "urn:li:tag:NeedsDocumentation"
- "urn:li:tag:Legacy"
```

:::tip

If you'd like to add more complex logic for assigning tags, you can use the more generic [`add_dataset_tags` transformer](./src/datahub/ingestion/transformer/add_dataset_tags.py), which calls a user-provided function to determine the tags for each dataset.

:::
Check out the [transformers guide](./transformers.md) for more info!

## Using as a library

Expand Down Expand Up @@ -1067,4 +1018,4 @@ In order to use this example, you must first configure the Datahub hook. Like in

## Developing

See the [developing guide](./developing.md) or the [adding a source guide](./adding-source.md).
See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and the [using transformers](./transformers.md).
kevinhu marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 5 additions & 0 deletions metadata-ingestion/examples/transforms/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Custom transformer script

This script sets up a transformer that reads in a list of owner URNs from a JSON file specified via `owners_json` and appends these owners to every MCE.

See the transformers tutorial (https://datahubproject.io/docs/metadata-ingestion/transformers) for how this module is built and run.
77 changes: 77 additions & 0 deletions metadata-ingestion/examples/transforms/custom_transform_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# see https://datahubproject.io/docs/metadata-ingestion/transformers for original tutorial
kevinhu marked this conversation as resolved.
Show resolved Hide resolved
from datahub.configuration.common import ConfigModel


class AddCustomOwnershipConfig(ConfigModel):
owners_json: str


import json
from typing import Iterable

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)


class AddCustomOwnership(Transformer):
"""Transformer that adds owners to datasets according to a callback function."""

# context param to generate run metadata such as a run ID
ctx: PipelineContext
# as defined in the previous block
config: AddCustomOwnershipConfig

def __init__(self, config: AddCustomOwnershipConfig, ctx: PipelineContext):
self.ctx = ctx
self.config = config

with open(self.config.owners_json, "r") as f:
raw_owner_urns = json.load(f)

self.owners = [
OwnerClass(owner=owner, type=OwnershipTypeClass.DATAOWNER)
for owner in raw_owner_urns
]

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddCustomOwnership":
config = AddCustomOwnershipConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:

# loop over envelopes
for envelope in record_envelopes:

# if envelope is an MCE, add the ownership classes
if isinstance(envelope.record, MetadataChangeEventClass):
envelope.record = self.transform_one(envelope.record)
yield envelope

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce

owners_to_add = self.owners

if owners_to_add:
ownership = builder.get_or_add_aspect(
mce,
OwnershipClass(
owners=[],
),
)
ownership.owners.extend(owners_to_add)

return mce
7 changes: 7 additions & 0 deletions metadata-ingestion/examples/transforms/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from setuptools import find_packages, setup

setup(
name="custom_transform_example",
version="1.0",
packages=find_packages(),
)
Loading