Skip to content

Commit

Permalink
fix: implement tags-to-terms transformation based on tags
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Jun 20, 2024
1 parent d699660 commit 3b0eeef
Show file tree
Hide file tree
Showing 5 changed files with 421 additions and 1 deletion.
52 changes: 51 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)<br/> - [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)<br/> - [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)<br/> - [Extract Ownership from Tags](#extract-ownership-from-tags)<br/> - [Clean suffix prefix from Ownership](#clean-suffix-prefix-from-ownership) |
| `globalTags` | - [Simple Add Dataset globalTags ](#simple-add-dataset-globaltags)<br/> - [Pattern Add Dataset globalTags](#pattern-add-dataset-globaltags)<br/> - [Add Dataset globalTags](#add-dataset-globaltags) |
| `browsePaths` | - [Set Dataset browsePath](#set-dataset-browsepath) |
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) |
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms)<br/> - [Tags to Term Mapping](#tags-to-term-mapping) |
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains)<br/> - [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) |
Expand Down Expand Up @@ -668,6 +668,56 @@ We can add glossary terms to datasets based on a regex filter.
".*example1.*": ["urn:li:glossaryTerm:Email", "urn:li:glossaryTerm:Address"]
".*example2.*": ["urn:li:glossaryTerm:PostalCode"]
```

## Tags to Term Mapping
### Config Details

| Field | Required | Type | Default | Description |
|---------------|----------|--------------------|-------------|-------------------------------------------------------------------------------------------------------|
| `tags` | ✅ | List[str] | | List of tag names based on which terms will be created and associated with the dataset. |
| `semantics` | | enum | "OVERWRITE" | Determines whether to OVERWRITE or PATCH the terms associated with the dataset on DataHub GMS. |

<br/>

The `tags_to_term` transformer is designed to map specific tags to glossary terms within DataHub. It takes a configuration of tags should be translated into corresponding glossaryTerm. This transformer can apply these mappings to any tags found either at column level of dataset or dataset top level.

When specifying tags in the configuration, use the tag's simple name rather than the full tag URN.

For example, instead of using the tag URN `urn:li:tag:snowflakedb.snowflakeschema.tag_name:tag_value`, you should specify just the tag name `tag_name` in the mapping configuration

```yaml
transformers:
- type: "tags_to_term"
config:
semantics: OVERWRITE # OVERWRITE is the default behavior
tags:
- "tag_name"
```

`tags_to_term` can be configured in below different way

- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: OVERWRITE # OVERWRITE is default behaviour
tags:
- "example1"
- "example2"
- "example3"
```
- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: PATCH
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```

## Pattern Add Dataset Schema Field glossaryTerms
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
"tags_to_term = datahub.ingestion.transformer.tags_to_terms:TagsToTermMapper",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ def entity_types(self) -> List[str]:
return ["dataset"]


class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transform sequentially on each tag."""

def __init__(self):
super().__init__()

def entity_types(self) -> List[str]:
return ["dataset", "container"]


class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "ownership"
Expand Down Expand Up @@ -128,3 +138,8 @@ def aspect_name(self) -> str:
class DatasetUsageStatisticsTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "datasetUsageStatistics"


class TagsToTermTransformer(TagTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "glossaryTerms"
143 changes: 143 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from typing import List, Optional, Set, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.emitter.mce_builder import Aspect, make_term_urn
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import TagsToTermTransformer
from datahub.metadata.schema_classes import (
AuditStampClass,
GlobalTagsClass,
GlossaryTermAssociationClass,
GlossaryTermsClass,
SchemaMetadataClass,
)


class TagsToTermMapperConfig(TransformerSemanticsConfigModel):
tags: List[str]


class TagsToTermMapper(TagsToTermTransformer):
"""This transformer maps specified tags to corresponding glossary terms for a dataset."""

def __init__(self, config: TagsToTermMapperConfig, ctx: PipelineContext):
super().__init__()
self.ctx: PipelineContext = ctx
self.config: TagsToTermMapperConfig = config

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "TagsToTermMapper":
config = TagsToTermMapperConfig.parse_obj(config_dict)
return cls(config, ctx)

@staticmethod
def _merge_with_server_glossary_terms(
graph: DataHubGraph,
urn: str,
glossary_terms_aspect: Optional[GlossaryTermsClass],
) -> Optional[GlossaryTermsClass]:
if not glossary_terms_aspect or not glossary_terms_aspect.terms:
# nothing to add, no need to consult server
return None

# Merge the transformed terms with existing server terms.
# The transformed terms takes precedence, which may change the term context.
server_glossary_terms_aspect = graph.get_glossary_terms(entity_urn=urn)
if server_glossary_terms_aspect is not None:
glossary_terms_aspect.terms = list(
{
**{term.urn: term for term in server_glossary_terms_aspect.terms},
**{term.urn: term for term in glossary_terms_aspect.terms},
}.values()
)

return glossary_terms_aspect

@staticmethod
def get_tags_from_global_tags(global_tags: Optional[GlobalTagsClass]) -> Set[str]:
"""Extracts tags urn from GlobalTagsClass."""
if not global_tags or not global_tags.tags:
return set()

return {tag_assoc.tag for tag_assoc in global_tags.tags}

@staticmethod
def get_tags_from_schema_metadata(
schema_metadata: Optional[SchemaMetadataClass],
) -> Set[str]:
"""Extracts globalTags from all fields in SchemaMetadataClass."""
if not schema_metadata or not schema_metadata.fields:
return set()
tags = set()
for field in schema_metadata.fields:
if field.globalTags:
tags.update(
TagsToTermMapper.get_tags_from_global_tags(field.globalTags)
)
return tags

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:

in_glossary_terms: Optional[GlossaryTermsClass] = cast(
Optional[GlossaryTermsClass], aspect
)

assert self.ctx.graph
in_global_tags_aspect: GlobalTagsClass = self.ctx.graph.get_tags(entity_urn)
in_schema_metadata_aspect: Optional[
SchemaMetadataClass
] = self.ctx.graph.get_schema_metadata(entity_urn)

if in_global_tags_aspect is None and in_schema_metadata_aspect is None:
return in_glossary_terms

global_tags = TagsToTermMapper.get_tags_from_global_tags(in_global_tags_aspect)
schema_metadata_tags = TagsToTermMapper.get_tags_from_schema_metadata(
in_schema_metadata_aspect
)

# Combine tags from both global and schema level
combined_tags = global_tags.union(schema_metadata_tags)

tag_set = set(self.config.tags)
terms_to_add = set()
tags_to_delete = set()

# Check each global tag against the configured tag list and prepare terms
for full_tag in combined_tags:
tag_name = full_tag.split("urn:li:tag:")[-1].split(".")[-1].split(":")[0]
if tag_name in tag_set:
term_urn = make_term_urn(tag_name)
terms_to_add.add(term_urn)
tags_to_delete.add(full_tag) # Full URN for deletion

if not terms_to_add:
return in_glossary_terms # No new terms to add

for tag_urn in tags_to_delete:
self.ctx.graph.hard_delete_entity(urn=tag_urn)

# Initialize the Glossary Terms properly
out_glossary_terms = GlossaryTermsClass(
terms=[GlossaryTermAssociationClass(urn=term) for term in terms_to_add],
auditStamp=AuditStampClass(
time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter"
),
)

if self.config.semantics == TransformerSemantics.PATCH:
patch_glossary_terms: Optional[
GlossaryTermsClass
] = TagsToTermMapper._merge_with_server_glossary_terms(
self.ctx.graph, entity_urn, out_glossary_terms
)
return cast(Optional[Aspect], patch_glossary_terms)
else:
return cast(Aspect, out_glossary_terms)
Loading

0 comments on commit 3b0eeef

Please sign in to comment.