Skip to content

Commit

Permalink
test(ingest/dbt): add test for column meta match (#7673)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and Hyejin Yoon committed Apr 3, 2023
1 parent 0699f24 commit 466ea31
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 36 deletions.
51 changes: 27 additions & 24 deletions metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import logging
import re
from typing import Any, Dict, List, Match, Optional, Union
Expand All @@ -11,6 +12,29 @@
OwnershipTypeClass,
)

logger = logging.getLogger(__name__)


def _get_best_match(the_match: Match, group_name: str) -> str:
with contextlib.suppress(IndexError):
return the_match.group(group_name)

with contextlib.suppress(IndexError):
return the_match.group(1)

return the_match.group(0)


_match_regexp = re.compile(r"{{\s*\$match\s*}}", flags=re.MULTILINE)


def _insert_match_value(original_value: str, match_value: str) -> str:
"""
If the original value is something like "foo{{ $match }}bar", then we insert the match value
e.g. "foo<match_value>bar". Otherwise, it will leave the original value unchanged.
"""
return _match_regexp.sub(match_value, original_value)


class Constants:
ADD_TAG_OPERATION = "add_tag"
Expand Down Expand Up @@ -60,7 +84,6 @@ class OperationProcessor:
"""

operation_defs: Dict[str, Dict] = {}
logger = logging.getLogger(__name__)
tag_prefix: str = ""

def __init__(
Expand Down Expand Up @@ -128,7 +151,7 @@ def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]:

aspect_map = self.convert_to_aspects(operations_map)
except Exception as e:
self.logger.error("Error while processing operation defs over raw_props", e)
logger.error(f"Error while processing operation defs over raw_props: {e}")
return aspect_map

def convert_to_aspects(
Expand Down Expand Up @@ -171,30 +194,12 @@ def get_operation_value(
operation_config: Dict,
match: Match,
) -> Optional[Union[str, Dict, List[str]]]:
def _get_best_match(the_match: Match, group_name: str) -> str:
result = the_match.group(0)
try:
result = the_match.group(group_name)
return result
except IndexError:
pass
try:
result = the_match.group(1)
return result
except IndexError:
pass
return result

match_regexp = r"{{\s*\$match\s*}}"

if (
operation_type == Constants.ADD_TAG_OPERATION
and operation_config[Constants.TAG]
):
tag = operation_config[Constants.TAG]
tag_id = _get_best_match(match, "tag")
if isinstance(tag_id, str):
tag = re.sub(match_regexp, tag_id, tag, 0, re.MULTILINE)
tag = _insert_match_value(tag, _get_best_match(match, "tag"))

if self.tag_prefix:
tag = self.tag_prefix + tag
Expand Down Expand Up @@ -226,9 +231,7 @@ def _get_best_match(the_match: Match, group_name: str) -> str:
and operation_config[Constants.TERM]
):
term = operation_config[Constants.TERM]
captured_term_id = _get_best_match(match, "term")
if isinstance(captured_term_id, str):
term = re.sub(match_regexp, captured_term_id, term, 0, re.MULTILINE)
term = _insert_match_value(term, _get_best_match(match, "term"))
return mce_builder.make_term_urn(term)
elif operation_type == Constants.ADD_TERMS_OPERATION:
separator = operation_config.get(Constants.SEPARATOR, ",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,9 @@
{
"urn": "urn:li:glossaryTerm:customer_id"
},
{
"urn": "urn:li:glossaryTerm:maturity_beta"
},
{
"urn": "urn:li:glossaryTerm:pii"
}
Expand Down
25 changes: 13 additions & 12 deletions metadata-ingestion/tests/integration/dbt/sample_dbt_manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -7573,9 +7573,9 @@
"depends_on": {
"macros": [],
"nodes": [
"source.sample_dbt.pagila.customer",
"source.sample_dbt.pagila.city",
"source.sample_dbt.pagila.address",
"source.sample_dbt.pagila.customer",
"snapshot.sample_dbt.customer_snapshot"
]
},
Expand Down Expand Up @@ -7612,15 +7612,15 @@
"sources": [
[
"pagila",
"city"
"customer"
],
[
"pagila",
"address"
"city"
],
[
"pagila",
"customer"
"address"
]
],
"tags": [],
Expand Down Expand Up @@ -7786,12 +7786,12 @@
"depends_on": {
"macros": [],
"nodes": [
"source.sample_dbt.pagila.payment_p2020_01",
"source.sample_dbt.pagila.payment_p2020_06",
"source.sample_dbt.pagila.payment_p2020_03",
"source.sample_dbt.pagila.payment_p2020_06",
"source.sample_dbt.pagila.payment_p2020_01",
"source.sample_dbt.pagila.payment_p2020_04",
"source.sample_dbt.pagila.payment_p2020_05",
"source.sample_dbt.pagila.payment_p2020_02"
"source.sample_dbt.pagila.payment_p2020_02",
"source.sample_dbt.pagila.payment_p2020_05"
]
},
"description": "",
Expand Down Expand Up @@ -7823,27 +7823,27 @@
"sources": [
[
"pagila",
"payment_p2020_01"
"payment_p2020_03"
],
[
"pagila",
"payment_p2020_06"
],
[
"pagila",
"payment_p2020_03"
"payment_p2020_01"
],
[
"pagila",
"payment_p2020_04"
],
[
"pagila",
"payment_p2020_05"
"payment_p2020_02"
],
[
"pagila",
"payment_p2020_02"
"payment_p2020_05"
]
],
"tags": [],
Expand Down Expand Up @@ -7882,6 +7882,7 @@
"description": "description for customer_id from dbt",
"meta": {
"is_sensitive": true,
"maturity": "beta",
"terms": "pii, customer_id"
},
"name": "customer_id",
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/tests/integration/dbt/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ def set_paths(
"operation": "add_tag",
"config": {"tag": "sensitive"},
},
"maturity": {
"match": ".*",
"operation": "add_term",
"config": {"term": "maturity_{{ $match }}"},
},
},
"entities_enabled": {
"test_definitions": "NO",
Expand Down

0 comments on commit 466ea31

Please sign in to comment.