Skip to content

Commit

Permalink
Merge branch 'master' into fix-empty-description
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Oct 2, 2024
2 parents 78bf585 + e1514d5 commit 9f1bfaf
Show file tree
Hide file tree
Showing 15 changed files with 1,084 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,100 @@ describe('filterSchemaRows', () => {
expect(filteredRows).toMatchObject([{ fieldPath: 'shipment' }]);
expect(expandedRowsFromFilter).toMatchObject(new Set());
});

it('should properly filter schema rows based on business attribute properties description', () => {
const rowsWithSchemaFieldEntity = [
{
fieldPath: 'customer',
schemaFieldEntity: {
businessAttributes: {
businessAttribute: {
businessAttribute: { properties: { description: 'customer description' } },
},
},
},
},
{
fieldPath: 'testing',
schemaFieldEntity: {
businessAttributes: {
businessAttribute: {
businessAttribute: { properties: { description: 'testing description' } },
},
},
},
},
{
fieldPath: 'shipment',
schemaFieldEntity: {
businessAttributes: {
businessAttribute: {
businessAttribute: { properties: { description: 'shipment description' } },
},
},
},
},
] as SchemaField[];
const filterText = 'testing description';
const editableSchemaMetadata = { editableSchemaFieldInfo: [] };
const { filteredRows, expandedRowsFromFilter } = filterSchemaRows(
rowsWithSchemaFieldEntity,
editableSchemaMetadata,
filterText,
testEntityRegistry,
);

expect(filteredRows).toMatchObject([{ fieldPath: 'testing' }]);
expect(expandedRowsFromFilter).toMatchObject(new Set());
});

it('should properly filter schema rows based on business attribute properties tags', () => {
const rowsWithSchemaFieldEntity = [
{
fieldPath: 'customer',
schemaFieldEntity: {
businessAttributes: {
businessAttribute: {
businessAttribute: { properties: { tags: { tags: [{ tag: sampleTag }] } } },
},
},
},
},
{
fieldPath: 'testing',
schemaFieldEntity: {
businessAttributes: {
businessAttribute: {
businessAttribute: {
properties: { tags: { tags: [{ tag: { properties: { name: 'otherTag' } } }] } },
},
},
},
},
},
{
fieldPath: 'shipment',
schemaFieldEntity: {
businessAttributes: {
businessAttribute: {
businessAttribute: {
properties: { tags: { tags: [{ tag: { properties: { name: 'anotherTag' } } }] } },
},
},
},
},
},
] as SchemaField[];
const filterText = sampleTag.properties.name;
const editableSchemaMetadata = { editableSchemaFieldInfo: [] };
const { filteredRows, expandedRowsFromFilter } = filterSchemaRows(
rowsWithSchemaFieldEntity,
editableSchemaMetadata,
filterText,
testEntityRegistry,
);

expect(filteredRows).toMatchObject([{ fieldPath: 'customer' }]);
expect(expandedRowsFromFilter).toMatchObject(new Set());
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@ function matchesTagsOrTermsOrDescription(field: SchemaField, filterText: string,
);
}

function matchesBusinessAttributesProperties(field: SchemaField, filterText: string, entityRegistry: EntityRegistry) {
if (!field.schemaFieldEntity?.businessAttributes) return false;
const businessAttributeProperties =
field.schemaFieldEntity?.businessAttributes?.businessAttribute?.businessAttribute?.properties;
return (
businessAttributeProperties?.description?.toLocaleLowerCase().includes(filterText) ||
businessAttributeProperties?.name?.toLocaleLowerCase().includes(filterText) ||
businessAttributeProperties?.glossaryTerms?.terms?.find((termAssociation) =>
entityRegistry
.getDisplayName(EntityType.GlossaryTerm, termAssociation.term)
.toLocaleLowerCase()
.includes(filterText),
) ||
businessAttributeProperties?.tags?.tags?.find((tagAssociation) =>
entityRegistry.getDisplayName(EntityType.Tag, tagAssociation.tag).toLocaleLowerCase().includes(filterText),
)
);
}

// returns list of fieldPaths for fields that have Terms or Tags or Descriptions matching the filterText
function getFilteredFieldPathsByMetadata(editableSchemaMetadata: any, entityRegistry, filterText) {
return (
Expand Down Expand Up @@ -56,7 +75,8 @@ export function filterSchemaRows(
if (
matchesFieldName(row.fieldPath, formattedFilterText) ||
matchesEditableTagsOrTermsOrDescription(row, filteredFieldPathsByEditableMetadata) ||
matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) // non-editable tags, terms and description
matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) || // non-editable tags, terms and description
matchesBusinessAttributesProperties(row, formattedFilterText, entityRegistry)
) {
finalFieldPaths.add(row.fieldPath);
}
Expand All @@ -65,7 +85,8 @@ export function filterSchemaRows(
if (
matchesFieldName(fieldName, formattedFilterText) ||
matchesEditableTagsOrTermsOrDescription(row, filteredFieldPathsByEditableMetadata) ||
matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) // non-editable tags, terms and description
matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) || // non-editable tags, terms and description
matchesBusinessAttributesProperties(row, formattedFilterText, entityRegistry)
) {
// if we match specifically on this field (not just its parent), add and expand all parents
splitFieldPath.reduce((previous, current) => {
Expand Down
62 changes: 62 additions & 0 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ transformers:
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |

let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners.

If the is_container field is set to true, the module will not only attach the ownerships to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified owners.

The config, which we’d append to our ingestion recipe YAML, would look like this:

```yaml
Expand Down Expand Up @@ -251,6 +254,35 @@ The config, which we’d append to our ingestion recipe YAML, would look like th
".*example2.*": ["urn:li:corpuser:username2"]
ownership_type: "PRODUCER"
```
- Add owner to dataset and its containers
```yaml
transformers:
- type: "pattern_add_dataset_ownership"
config:
is_container: true
replace_existing: true # false is default behaviour
semantics: PATCH / OVERWRITE # Based on user
owner_pattern:
rules:
".*example1.*": ["urn:li:corpuser:username1"]
".*example2.*": ["urn:li:corpuser:username2"]
ownership_type: "PRODUCER"
```
⚠️ Warning:
When working with two datasets in the same container but with different owners, all owners will be added for that dataset containers.

For example:
```yaml
transformers:
- type: "pattern_add_dataset_ownership"
config:
is_container: true
owner_pattern:
rules:
".*example1.*": ["urn:li:corpuser:username1"]
".*example2.*": ["urn:li:corpuser:username2"]
```
If example1 and example2 are in the same container, then both urns urn:li:corpuser:username1 and urn:li:corpuser:username2 will be added for respective dataset containers.

## Simple Remove Dataset ownership
If we wanted to clear existing owners sent by ingestion source we can use the `simple_remove_dataset_ownership` transformer which removes all owners sent by the ingestion source.
Expand Down Expand Up @@ -1074,10 +1106,13 @@ transformers:
| `domain_pattern` | ✅ | map[regx, list[union[urn, str]] | | dataset urn with regular expression and list of simple domain name or domain urn need to be apply on matching dataset urn. |
| `replace_existing` | | boolean | `false` | Whether to remove domains from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then domains will be attached to both the dataset and its container. |

Let’s suppose we’d like to append a series of domain to specific datasets. To do so, we can use the pattern_add_dataset_domain transformer that’s included in the ingestion framework.
This will match the regex pattern to urn of the dataset and assign the respective domain urns given in the array.

If the is_container field is set to true, the module will not only attach the domains to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified owners.

The config, which we’d append to our ingestion recipe YAML, would look like this:
Here we can set domain list to either urn (i.e. urn:li:domain:hr) or simple domain name (i.e. hr)
in both of the cases domain should be provisioned on DataHub GMS
Expand Down Expand Up @@ -1129,6 +1164,33 @@ in both of the cases domain should be provisioned on DataHub GMS
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"]
```

- Add domains to dataset and its containers
```yaml
transformers:
- type: "pattern_add_dataset_domain"
config:
is_container: true
semantics: PATCH / OVERWRITE # Based on user
domain_pattern:
rules:
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"]
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"]
```
⚠️ Warning:
When working with two datasets in the same container but with different domains, all domains will be added for that dataset containers.

For example:
```yaml
transformers:
- type: "pattern_add_dataset_domain"
config:
is_container: true
domain_pattern:
rules:
".*example1.*": ["hr"]
".*example2.*": ["urn:li:domain:finance"]
```
If example1 and example2 are in the same container, then both domains hr and finance will be added for respective dataset containers.


## Domain Mapping Based on Tags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
from dataclasses import dataclass
from typing import Dict, List, Optional, Set

from datahub.ingestion.source.looker.lkml_patched import load_lkml
from datahub.ingestion.source.looker.looker_connection import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_template_language import (
load_and_preprocess_file,
)
from datahub.ingestion.source.looker.lookml_config import (
_BASE_PROJECT_NAME,
_EXPLORE_FILE_EXTENSION,
LookMLSourceConfig,
LookMLSourceReport,
)

Expand Down Expand Up @@ -43,6 +46,7 @@ def from_looker_dict(
root_project_name: Optional[str],
base_projects_folders: Dict[str, pathlib.Path],
path: str,
source_config: LookMLSourceConfig,
reporter: LookMLSourceReport,
) -> "LookerModel":
logger.debug(f"Loading model from {path}")
Expand All @@ -54,6 +58,7 @@ def from_looker_dict(
root_project_name,
base_projects_folders,
path,
source_config,
reporter,
seen_so_far=set(),
traversal_path=pathlib.Path(path).stem,
Expand All @@ -68,7 +73,10 @@ def from_looker_dict(
]
for included_file in explore_files:
try:
parsed = load_lkml(included_file)
parsed = load_and_preprocess_file(
path=included_file,
source_config=source_config,
)
included_explores = parsed.get("explores", [])
explores.extend(included_explores)
except Exception as e:
Expand All @@ -94,6 +102,7 @@ def resolve_includes(
root_project_name: Optional[str],
base_projects_folder: Dict[str, pathlib.Path],
path: str,
source_config: LookMLSourceConfig,
reporter: LookMLSourceReport,
seen_so_far: Set[str],
traversal_path: str = "", # a cosmetic parameter to aid debugging
Expand Down Expand Up @@ -206,7 +215,10 @@ def resolve_includes(
f"Will be loading {included_file}, traversed here via {traversal_path}"
)
try:
parsed = load_lkml(included_file)
parsed = load_and_preprocess_file(
path=included_file,
source_config=source_config,
)
seen_so_far.add(included_file)
if "includes" in parsed: # we have more includes to resolve!
resolved.extend(
Expand All @@ -216,6 +228,7 @@ def resolve_includes(
root_project_name,
base_projects_folder,
included_file,
source_config,
reporter,
seen_so_far,
traversal_path=traversal_path
Expand Down Expand Up @@ -259,6 +272,7 @@ def from_looker_dict(
root_project_name: Optional[str],
base_projects_folder: Dict[str, pathlib.Path],
raw_file_content: str,
source_config: LookMLSourceConfig,
reporter: LookMLSourceReport,
) -> "LookerViewFile":
logger.debug(f"Loading view file at {absolute_file_path}")
Expand All @@ -272,6 +286,7 @@ def from_looker_dict(
root_project_name,
base_projects_folder,
absolute_file_path,
source_config,
reporter,
seen_so_far=seen_so_far,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
from dataclasses import replace
from typing import Dict, Optional

from datahub.ingestion.source.looker.lkml_patched import load_lkml
from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_dataclasses import LookerViewFile
from datahub.ingestion.source.looker.looker_template_language import (
process_lookml_template_language,
load_and_preprocess_file,
)
from datahub.ingestion.source.looker.lookml_config import (
_EXPLORE_FILE_EXTENSION,
Expand Down Expand Up @@ -72,10 +71,8 @@ def _load_viewfile(
try:
logger.debug(f"Loading viewfile {path}")

parsed = load_lkml(path)

process_lookml_template_language(
view_lkml_file_dict=parsed,
parsed = load_and_preprocess_file(
path=path,
source_config=self.source_config,
)

Expand All @@ -86,6 +83,7 @@ def _load_viewfile(
root_project_name=self._root_project_name,
base_projects_folder=self._base_projects_folder,
raw_file_content=raw_file_content,
source_config=self.source_config,
reporter=reporter,
)
logger.debug(f"adding viewfile for path {path} to the cache")
Expand Down
Loading

0 comments on commit 9f1bfaf

Please sign in to comment.