From a0787684de29d67dc7d6049ec7025d861e26522e Mon Sep 17 00:00:00 2001 From: deepgarg-visa <149145061+deepgarg-visa@users.noreply.github.com> Date: Tue, 1 Oct 2024 19:27:08 +0530 Subject: [PATCH 1/4] =?UTF-8?q?feat(businessattribute):=20filter=20schema?= =?UTF-8?q?=20rows=20on=20business-attribute=20pro=E2=80=A6=20(#11502)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Schema/__tests__/filterSchemaRows.test.ts | 96 +++++++++++++++++++ .../Dataset/Schema/utils/filterSchemaRows.ts | 25 ++++- 2 files changed, 119 insertions(+), 2 deletions(-) diff --git a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/__tests__/filterSchemaRows.test.ts b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/__tests__/filterSchemaRows.test.ts index 27c0af87fc833..87fca3b898c83 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/__tests__/filterSchemaRows.test.ts +++ b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/__tests__/filterSchemaRows.test.ts @@ -235,4 +235,100 @@ describe('filterSchemaRows', () => { expect(filteredRows).toMatchObject([{ fieldPath: 'shipment' }]); expect(expandedRowsFromFilter).toMatchObject(new Set()); }); + + it('should properly filter schema rows based on business attribute properties description', () => { + const rowsWithSchemaFieldEntity = [ + { + fieldPath: 'customer', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { properties: { description: 'customer description' } }, + }, + }, + }, + }, + { + fieldPath: 'testing', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { properties: { description: 'testing description' } }, + }, + }, + }, + }, + { + fieldPath: 'shipment', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { properties: { description: 'shipment description' } }, + }, + }, + }, + }, + ] as SchemaField[]; + const filterText = 'testing description'; + const editableSchemaMetadata = { editableSchemaFieldInfo: [] }; + const { filteredRows, expandedRowsFromFilter } = filterSchemaRows( + rowsWithSchemaFieldEntity, + editableSchemaMetadata, + filterText, + testEntityRegistry, + ); + + expect(filteredRows).toMatchObject([{ fieldPath: 'testing' }]); + expect(expandedRowsFromFilter).toMatchObject(new Set()); + }); + + it('should properly filter schema rows based on business attribute properties tags', () => { + const rowsWithSchemaFieldEntity = [ + { + fieldPath: 'customer', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { properties: { tags: { tags: [{ tag: sampleTag }] } } }, + }, + }, + }, + }, + { + fieldPath: 'testing', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { + properties: { tags: { tags: [{ tag: { properties: { name: 'otherTag' } } }] } }, + }, + }, + }, + }, + }, + { + fieldPath: 'shipment', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { + properties: { tags: { tags: [{ tag: { properties: { name: 'anotherTag' } } }] } }, + }, + }, + }, + }, + }, + ] as SchemaField[]; + const filterText = sampleTag.properties.name; + const editableSchemaMetadata = { editableSchemaFieldInfo: [] }; + const { filteredRows, expandedRowsFromFilter } = filterSchemaRows( + rowsWithSchemaFieldEntity, + editableSchemaMetadata, + filterText, + testEntityRegistry, + ); + + expect(filteredRows).toMatchObject([{ fieldPath: 'customer' }]); + expect(expandedRowsFromFilter).toMatchObject(new Set()); + }); }); diff --git a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/utils/filterSchemaRows.ts b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/utils/filterSchemaRows.ts index 96505e1bee785..53b76d53f886a 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/utils/filterSchemaRows.ts +++ b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/utils/filterSchemaRows.ts @@ -16,6 +16,25 @@ function matchesTagsOrTermsOrDescription(field: SchemaField, filterText: string, ); } +function matchesBusinessAttributesProperties(field: SchemaField, filterText: string, entityRegistry: EntityRegistry) { + if (!field.schemaFieldEntity?.businessAttributes) return false; + const businessAttributeProperties = + field.schemaFieldEntity?.businessAttributes?.businessAttribute?.businessAttribute?.properties; + return ( + businessAttributeProperties?.description?.toLocaleLowerCase().includes(filterText) || + businessAttributeProperties?.name?.toLocaleLowerCase().includes(filterText) || + businessAttributeProperties?.glossaryTerms?.terms?.find((termAssociation) => + entityRegistry + .getDisplayName(EntityType.GlossaryTerm, termAssociation.term) + .toLocaleLowerCase() + .includes(filterText), + ) || + businessAttributeProperties?.tags?.tags?.find((tagAssociation) => + entityRegistry.getDisplayName(EntityType.Tag, tagAssociation.tag).toLocaleLowerCase().includes(filterText), + ) + ); +} + // returns list of fieldPaths for fields that have Terms or Tags or Descriptions matching the filterText function getFilteredFieldPathsByMetadata(editableSchemaMetadata: any, entityRegistry, filterText) { return ( @@ -56,7 +75,8 @@ export function filterSchemaRows( if ( matchesFieldName(row.fieldPath, formattedFilterText) || matchesEditableTagsOrTermsOrDescription(row, filteredFieldPathsByEditableMetadata) || - matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) // non-editable tags, terms and description + matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) || // non-editable tags, terms and description + matchesBusinessAttributesProperties(row, formattedFilterText, entityRegistry) ) { finalFieldPaths.add(row.fieldPath); } @@ -65,7 +85,8 @@ export function filterSchemaRows( if ( matchesFieldName(fieldName, formattedFilterText) || matchesEditableTagsOrTermsOrDescription(row, filteredFieldPathsByEditableMetadata) || - matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) // non-editable tags, terms and description + matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) || // non-editable tags, terms and description + matchesBusinessAttributesProperties(row, formattedFilterText, entityRegistry) ) { // if we match specifically on this field (not just its parent), add and expand all parents splitFieldPath.reduce((previous, current) => { From 67d711605505adc5fe18bad71cd6baabeea48323 Mon Sep 17 00:00:00 2001 From: sid-acryl <155424659+sid-acryl@users.noreply.github.com> Date: Tue, 1 Oct 2024 23:56:00 +0530 Subject: [PATCH 2/4] fix(ingest/lookml): missing lineage for looker template -- if prod (#11426) --- .../source/looker/looker_dataclasses.py | 21 ++++++++++++++++--- .../source/looker/looker_file_loader.py | 10 ++++----- .../source/looker/looker_template_language.py | 20 +++++++++++++++++- .../source/looker/lookml_concept_context.py | 5 +++-- .../ingestion/source/looker/lookml_source.py | 17 ++++++++++++--- .../ingestion/source/looker/view_upstream.py | 10 +++++++-- .../tests/integration/lookml/test_lookml.py | 9 ++++++-- 7 files changed, 73 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_dataclasses.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_dataclasses.py index adaa3c4875450..7e23079156b62 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_dataclasses.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_dataclasses.py @@ -4,11 +4,14 @@ from dataclasses import dataclass from typing import Dict, List, Optional, Set -from datahub.ingestion.source.looker.lkml_patched import load_lkml from datahub.ingestion.source.looker.looker_connection import LookerConnectionDefinition +from datahub.ingestion.source.looker.looker_template_language import ( + load_and_preprocess_file, +) from datahub.ingestion.source.looker.lookml_config import ( _BASE_PROJECT_NAME, _EXPLORE_FILE_EXTENSION, + LookMLSourceConfig, LookMLSourceReport, ) @@ -43,6 +46,7 @@ def from_looker_dict( root_project_name: Optional[str], base_projects_folders: Dict[str, pathlib.Path], path: str, + source_config: LookMLSourceConfig, reporter: LookMLSourceReport, ) -> "LookerModel": logger.debug(f"Loading model from {path}") @@ -54,6 +58,7 @@ def from_looker_dict( root_project_name, base_projects_folders, path, + source_config, reporter, seen_so_far=set(), traversal_path=pathlib.Path(path).stem, @@ -68,7 +73,10 @@ def from_looker_dict( ] for included_file in explore_files: try: - parsed = load_lkml(included_file) + parsed = load_and_preprocess_file( + path=included_file, + source_config=source_config, + ) included_explores = parsed.get("explores", []) explores.extend(included_explores) except Exception as e: @@ -94,6 +102,7 @@ def resolve_includes( root_project_name: Optional[str], base_projects_folder: Dict[str, pathlib.Path], path: str, + source_config: LookMLSourceConfig, reporter: LookMLSourceReport, seen_so_far: Set[str], traversal_path: str = "", # a cosmetic parameter to aid debugging @@ -206,7 +215,10 @@ def resolve_includes( f"Will be loading {included_file}, traversed here via {traversal_path}" ) try: - parsed = load_lkml(included_file) + parsed = load_and_preprocess_file( + path=included_file, + source_config=source_config, + ) seen_so_far.add(included_file) if "includes" in parsed: # we have more includes to resolve! resolved.extend( @@ -216,6 +228,7 @@ def resolve_includes( root_project_name, base_projects_folder, included_file, + source_config, reporter, seen_so_far, traversal_path=traversal_path @@ -259,6 +272,7 @@ def from_looker_dict( root_project_name: Optional[str], base_projects_folder: Dict[str, pathlib.Path], raw_file_content: str, + source_config: LookMLSourceConfig, reporter: LookMLSourceReport, ) -> "LookerViewFile": logger.debug(f"Loading view file at {absolute_file_path}") @@ -272,6 +286,7 @@ def from_looker_dict( root_project_name, base_projects_folder, absolute_file_path, + source_config, reporter, seen_so_far=seen_so_far, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_file_loader.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_file_loader.py index 52ebcdde06a27..f894c96debc54 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_file_loader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_file_loader.py @@ -3,11 +3,10 @@ from dataclasses import replace from typing import Dict, Optional -from datahub.ingestion.source.looker.lkml_patched import load_lkml from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition from datahub.ingestion.source.looker.looker_dataclasses import LookerViewFile from datahub.ingestion.source.looker.looker_template_language import ( - process_lookml_template_language, + load_and_preprocess_file, ) from datahub.ingestion.source.looker.lookml_config import ( _EXPLORE_FILE_EXTENSION, @@ -72,10 +71,8 @@ def _load_viewfile( try: logger.debug(f"Loading viewfile {path}") - parsed = load_lkml(path) - - process_lookml_template_language( - view_lkml_file_dict=parsed, + parsed = load_and_preprocess_file( + path=path, source_config=self.source_config, ) @@ -86,6 +83,7 @@ def _load_viewfile( root_project_name=self._root_project_name, base_projects_folder=self._base_projects_folder, raw_file_content=raw_file_content, + source_config=self.source_config, reporter=reporter, ) logger.debug(f"adding viewfile for path {path} to the cache") diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py index 04f9ec081ee68..1e60c08fe00c2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py @@ -1,12 +1,14 @@ import logging +import pathlib import re from abc import ABC, abstractmethod -from typing import Any, ClassVar, Dict, List, Optional, Set +from typing import Any, ClassVar, Dict, List, Optional, Set, Union from deepmerge import always_merger from liquid import Undefined from liquid.exceptions import LiquidSyntaxError +from datahub.ingestion.source.looker.lkml_patched import load_lkml from datahub.ingestion.source.looker.looker_constant import ( DATAHUB_TRANSFORMED_SQL, DATAHUB_TRANSFORMED_SQL_TABLE_NAME, @@ -390,6 +392,7 @@ def process_lookml_template_language( source_config: LookMLSourceConfig, view_lkml_file_dict: dict, ) -> None: + if "views" not in view_lkml_file_dict: return @@ -416,3 +419,18 @@ def process_lookml_template_language( ) view_lkml_file_dict["views"] = transformed_views + + +def load_and_preprocess_file( + path: Union[str, pathlib.Path], + source_config: LookMLSourceConfig, +) -> dict: + + parsed = load_lkml(path) + + process_lookml_template_language( + view_lkml_file_dict=parsed, + source_config=source_config, + ) + + return parsed diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py index bf24f4b84679b..ce4a242027e11 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py @@ -365,8 +365,9 @@ def sql_table_name(self) -> str: return sql_table_name.lower() def datahub_transformed_sql_table_name(self) -> str: - table_name: Optional[str] = self.raw_view.get( - "datahub_transformed_sql_table_name" + # This field might be present in parent view of current view + table_name: Optional[str] = self.get_including_extends( + field="datahub_transformed_sql_table_name" ) if not table_name: diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index b00291caabbf6..e4d8dd19fb791 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -29,7 +29,6 @@ DatasetSubTypes, ) from datahub.ingestion.source.git.git_import import GitClone -from datahub.ingestion.source.looker.lkml_patched import load_lkml from datahub.ingestion.source.looker.looker_common import ( CORPUSER_DATAHUB, LookerExplore, @@ -45,6 +44,9 @@ get_connection_def_based_on_connection_string, ) from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI +from datahub.ingestion.source.looker.looker_template_language import ( + load_and_preprocess_file, +) from datahub.ingestion.source.looker.looker_view_id_cache import ( LookerModel, LookerViewFileLoader, @@ -311,13 +313,19 @@ def __init__(self, config: LookMLSourceConfig, ctx: PipelineContext): def _load_model(self, path: str) -> LookerModel: logger.debug(f"Loading model from file {path}") - parsed = load_lkml(path) + + parsed = load_and_preprocess_file( + path=path, + source_config=self.source_config, + ) + looker_model = LookerModel.from_looker_dict( parsed, _BASE_PROJECT_NAME, self.source_config.project_name, self.base_projects_folder, path, + self.source_config, self.reporter, ) return looker_model @@ -495,7 +503,10 @@ def get_project_name(self, model_name: str) -> str: def get_manifest_if_present(self, folder: pathlib.Path) -> Optional[LookerManifest]: manifest_file = folder / "manifest.lkml" if manifest_file.exists(): - manifest_dict = load_lkml(manifest_file) + + manifest_dict = load_and_preprocess_file( + path=manifest_file, source_config=self.source_config + ) manifest = LookerManifest( project_name=manifest_dict.get("project_name"), diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index de1022b5482ce..057dbca428184 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -154,6 +154,7 @@ def _generate_fully_qualified_name( sql_table_name: str, connection_def: LookerConnectionDefinition, reporter: LookMLSourceReport, + view_name: str, ) -> str: """Returns a fully qualified dataset name, resolved through a connection definition. Input sql_table_name can be in three forms: table, db.table, db.schema.table""" @@ -192,7 +193,7 @@ def _generate_fully_qualified_name( reporter.report_warning( title="Malformed Table Name", message="Table name has more than 3 parts.", - context=f"Table Name: {sql_table_name}", + context=f"view-name: {view_name}, table-name: {sql_table_name}", ) return sql_table_name.lower() @@ -280,10 +281,13 @@ def __get_upstream_dataset_urn(self) -> List[Urn]: return [] if sql_parsing_result.debug_info.table_error is not None: + logger.debug( + f"view-name={self.view_context.name()}, sql_query={self.get_sql_query()}" + ) self.reporter.report_warning( title="Table Level Lineage Missing", message="Error in parsing derived sql", - context=f"View-name: {self.view_context.name()}", + context=f"view-name: {self.view_context.name()}, platform: {self.view_context.view_connection.platform}", exc=sql_parsing_result.debug_info.table_error, ) return [] @@ -530,6 +534,7 @@ def __get_upstream_dataset_urn(self) -> Urn: sql_table_name=self.view_context.datahub_transformed_sql_table_name(), connection_def=self.view_context.view_connection, reporter=self.view_context.reporter, + view_name=self.view_context.name(), ) self.upstream_dataset_urn = make_dataset_urn_with_platform_instance( @@ -586,6 +591,7 @@ def __get_upstream_dataset_urn(self) -> List[Urn]: self.view_context.datahub_transformed_sql_table_name(), self.view_context.view_connection, self.view_context.reporter, + self.view_context.name(), ), base_folder_path=self.view_context.base_folder_path, looker_view_id_cache=self.looker_view_id_cache, diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index a5d838cb16d73..e4eb564e3e86b 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -2,6 +2,7 @@ import pathlib from typing import Any, List from unittest import mock +from unittest.mock import MagicMock import pydantic import pytest @@ -14,13 +15,13 @@ from datahub.ingestion.source.file import read_metadata_file from datahub.ingestion.source.looker.looker_template_language import ( SpecialVariable, + load_and_preprocess_file, resolve_liquid_variable, ) from datahub.ingestion.source.looker.lookml_source import ( LookerModel, LookerRefinementResolver, LookMLSourceConfig, - load_lkml, ) from datahub.metadata.schema_classes import ( DatasetSnapshotClass, @@ -870,7 +871,11 @@ def test_manifest_parser(pytestconfig: pytest.Config) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" manifest_file = test_resources_dir / "lkml_manifest_samples/complex-manifest.lkml" - manifest = load_lkml(manifest_file) + manifest = load_and_preprocess_file( + path=manifest_file, + source_config=MagicMock(), + ) + assert manifest From 660fbf8e57cac5846095c1f318ac98fe7ad56b1c Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware <159135491+sagar-salvi-apptware@users.noreply.github.com> Date: Wed, 2 Oct 2024 00:09:07 +0530 Subject: [PATCH 3/4] fix(ingestion/transformer): Add container support for ownership and domains (#11375) --- .../docs/transformer/dataset_transformer.md | 62 ++ .../transformer/add_dataset_ownership.py | 59 +- .../ingestion/transformer/dataset_domain.py | 66 +- .../tests/unit/test_transform_dataset.py | 566 ++++++++++++++++++ 4 files changed, 750 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 03a224bcf7da4..d48c6d2c1ab5b 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -197,9 +197,12 @@ transformers: | `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) | | `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. | | `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. | let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners. +If the is_container field is set to true, the module will not only attach the ownerships to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified owners. + The config, which we’d append to our ingestion recipe YAML, would look like this: ```yaml @@ -251,6 +254,35 @@ The config, which we’d append to our ingestion recipe YAML, would look like th ".*example2.*": ["urn:li:corpuser:username2"] ownership_type: "PRODUCER" ``` +- Add owner to dataset and its containers + ```yaml + transformers: + - type: "pattern_add_dataset_ownership" + config: + is_container: true + replace_existing: true # false is default behaviour + semantics: PATCH / OVERWRITE # Based on user + owner_pattern: + rules: + ".*example1.*": ["urn:li:corpuser:username1"] + ".*example2.*": ["urn:li:corpuser:username2"] + ownership_type: "PRODUCER" + ``` +⚠️ Warning: +When working with two datasets in the same container but with different owners, all owners will be added for that dataset containers. + +For example: +```yaml +transformers: + - type: "pattern_add_dataset_ownership" + config: + is_container: true + owner_pattern: + rules: + ".*example1.*": ["urn:li:corpuser:username1"] + ".*example2.*": ["urn:li:corpuser:username2"] +``` +If example1 and example2 are in the same container, then both urns urn:li:corpuser:username1 and urn:li:corpuser:username2 will be added for respective dataset containers. ## Simple Remove Dataset ownership If we wanted to clear existing owners sent by ingestion source we can use the `simple_remove_dataset_ownership` transformer which removes all owners sent by the ingestion source. @@ -1074,10 +1106,13 @@ transformers: | `domain_pattern` | ✅ | map[regx, list[union[urn, str]] | | dataset urn with regular expression and list of simple domain name or domain urn need to be apply on matching dataset urn. | | `replace_existing` | | boolean | `false` | Whether to remove domains from entity sent by ingestion source. | | `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then domains will be attached to both the dataset and its container. | Let’s suppose we’d like to append a series of domain to specific datasets. To do so, we can use the pattern_add_dataset_domain transformer that’s included in the ingestion framework. This will match the regex pattern to urn of the dataset and assign the respective domain urns given in the array. +If the is_container field is set to true, the module will not only attach the domains to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified owners. + The config, which we’d append to our ingestion recipe YAML, would look like this: Here we can set domain list to either urn (i.e. urn:li:domain:hr) or simple domain name (i.e. hr) in both of the cases domain should be provisioned on DataHub GMS @@ -1129,6 +1164,33 @@ in both of the cases domain should be provisioned on DataHub GMS 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] ``` +- Add domains to dataset and its containers + ```yaml + transformers: + - type: "pattern_add_dataset_domain" + config: + is_container: true + semantics: PATCH / OVERWRITE # Based on user + domain_pattern: + rules: + 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"] + 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] + ``` +⚠️ Warning: +When working with two datasets in the same container but with different domains, all domains will be added for that dataset containers. + +For example: +```yaml +transformers: + - type: "pattern_add_dataset_domain" + config: + is_container: true + domain_pattern: + rules: + ".*example1.*": ["hr"] + ".*example2.*": ["urn:li:domain:finance"] +``` +If example1 and example2 are in the same container, then both domains hr and finance will be added for respective dataset containers. ## Domain Mapping Based on Tags diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py index 5112a443768db..54be2e5fac1e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py @@ -1,4 +1,5 @@ -from typing import Callable, List, Optional, cast +import logging +from typing import Callable, Dict, List, Optional, Union, cast import datahub.emitter.mce_builder as builder from datahub.configuration.common import ( @@ -9,16 +10,22 @@ ) from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.emitter.mce_builder import Aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.transformer.dataset_transformer import ( DatasetOwnershipTransformer, ) from datahub.metadata.schema_classes import ( + BrowsePathsV2Class, + MetadataChangeProposalClass, OwnerClass, OwnershipClass, OwnershipTypeClass, ) +from datahub.specific.dashboard import DashboardPatchBuilder + +logger = logging.getLogger(__name__) class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel): @@ -27,6 +34,8 @@ class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel): _resolve_owner_fn = pydantic_resolve_key("get_owners_to_add") + is_container: bool = False + class AddDatasetOwnership(DatasetOwnershipTransformer): """Transformer that adds owners to datasets according to a callback function.""" @@ -70,6 +79,52 @@ def _merge_with_server_ownership( return mce_ownership + def handle_end_of_stream( + self, + ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: + if not self.config.is_container: + return [] + + logger.debug("Generating Ownership for containers") + ownership_container_mapping: Dict[str, List[OwnerClass]] = {} + for entity_urn, data_ownerships in ( + (urn, self.config.get_owners_to_add(urn)) for urn in self.entity_map.keys() + ): + if not data_ownerships: + continue + + assert self.ctx.graph + browse_paths = self.ctx.graph.get_aspect(entity_urn, BrowsePathsV2Class) + if not browse_paths: + continue + + for path in browse_paths.path: + container_urn = path.urn + + if not container_urn or not container_urn.startswith( + "urn:li:container:" + ): + continue + + if container_urn not in ownership_container_mapping: + ownership_container_mapping[container_urn] = data_ownerships + else: + ownership_container_mapping[container_urn] = list( + ownership_container_mapping[container_urn] + data_ownerships + ) + + mcps: List[ + Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] + ] = [] + + for urn, owners in ownership_container_mapping.items(): + patch_builder = DashboardPatchBuilder(urn) + for owner in owners: + patch_builder.add_owner(owner) + mcps.extend(list(patch_builder.build())) + + return mcps + def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: @@ -147,6 +202,7 @@ def create( class PatternDatasetOwnershipConfig(DatasetOwnershipBaseConfig): owner_pattern: KeyValuePattern = KeyValuePattern.all() default_actor: str = builder.make_user_urn("etl") + is_container: bool = False class PatternAddDatasetOwnership(AddDatasetOwnership): @@ -169,6 +225,7 @@ def __init__(self, config: PatternDatasetOwnershipConfig, ctx: PipelineContext): default_actor=config.default_actor, semantics=config.semantics, replace_existing=config.replace_existing, + is_container=config.is_container, ) super().__init__(generic_config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 82dd21bbdd1d1..6a83824815265 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -1,4 +1,5 @@ -from typing import Callable, List, Optional, Union, cast +import logging +from typing import Callable, Dict, List, Optional, Sequence, Union, cast from datahub.configuration.common import ( ConfigurationError, @@ -8,12 +9,19 @@ ) from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.emitter.mce_builder import Aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer -from datahub.metadata.schema_classes import DomainsClass +from datahub.metadata.schema_classes import ( + BrowsePathsV2Class, + DomainsClass, + MetadataChangeProposalClass, +) from datahub.utilities.registries.domain_registry import DomainRegistry +logger = logging.getLogger(__name__) + class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): get_domains_to_add: Union[ @@ -23,6 +31,8 @@ class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): _resolve_domain_fn = pydantic_resolve_key("get_domains_to_add") + is_container: bool = False + class SimpleDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): domains: List[str] @@ -30,6 +40,7 @@ class SimpleDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): domain_pattern: KeyValuePattern = KeyValuePattern.all() + is_container: bool = False class AddDatasetDomain(DatasetDomainTransformer): @@ -90,6 +101,56 @@ def _merge_with_server_domains( return mce_domain + def handle_end_of_stream( + self, + ) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: + domain_mcps: List[MetadataChangeProposalWrapper] = [] + container_domain_mapping: Dict[str, List[str]] = {} + + logger.debug("Generating Domains for containers") + + if not self.config.is_container: + return domain_mcps + + for entity_urn, domain_to_add in ( + (urn, self.config.get_domains_to_add(urn)) for urn in self.entity_map.keys() + ): + if not domain_to_add or not domain_to_add.domains: + continue + + assert self.ctx.graph + browse_paths = self.ctx.graph.get_aspect(entity_urn, BrowsePathsV2Class) + if not browse_paths: + continue + + for path in browse_paths.path: + container_urn = path.urn + + if not container_urn or not container_urn.startswith( + "urn:li:container:" + ): + continue + + if container_urn not in container_domain_mapping: + container_domain_mapping[container_urn] = domain_to_add.domains + else: + container_domain_mapping[container_urn] = list( + set( + container_domain_mapping[container_urn] + + domain_to_add.domains + ) + ) + + for urn, domains in container_domain_mapping.items(): + domain_mcps.append( + MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=DomainsClass(domains=domains), + ) + ) + + return domain_mcps + def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: @@ -156,6 +217,7 @@ def resolve_domain(domain_urn: str) -> DomainsClass: get_domains_to_add=resolve_domain, semantics=config.semantics, replace_existing=config.replace_existing, + is_container=config.is_container, ) super().__init__(generic_config, ctx) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 506bfd9c12674..46c6390b184d3 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -1105,6 +1105,354 @@ def test_pattern_dataset_ownership_with_invalid_type_transformation(mock_time): ) +def test_pattern_container_and_dataset_ownership_transformation( + mock_time, mock_datahub_graph +): + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> Optional[models.BrowsePathsV2Class]: + return models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ), + models.BrowsePathEntryClass( + id="container_2", urn="urn:li:container:container_2" + ), + ] + ) + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_ownership_transformation" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + # No owner aspect for the first dataset + no_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", + aspects=[models.StatusClass(removed=False)], + ), + ) + # Dataset with an existing owner + with_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", + aspects=[ + models.OwnershipClass( + owners=[ + models.OwnerClass( + owner=builder.make_user_urn("fake_owner"), + type=models.OwnershipTypeClass.DATAOWNER, + ), + ], + lastModified=models.AuditStampClass( + time=1625266033123, actor="urn:li:corpuser:datahub" + ), + ) + ], + ), + ) + + # Not a dataset, should be ignored + not_a_dataset = models.MetadataChangeEventClass( + proposedSnapshot=models.DataJobSnapshotClass( + urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", + aspects=[ + models.DataJobInfoClass( + name="User Deletions", + description="Constructs the fct_users_deleted from logging_events", + type=models.AzkabanJobTypeClass.SQL, + ) + ], + ) + ) + + inputs = [ + no_owner_aspect, + with_owner_aspect, + not_a_dataset, + EndOfStream(), + ] + + # Initialize the transformer with container support + transformer = PatternAddDatasetOwnership.create( + { + "owner_pattern": { + "rules": { + ".*example1.*": [builder.make_user_urn("person1")], + ".*example2.*": [builder.make_user_urn("person2")], + } + }, + "ownership_type": "DATAOWNER", + "is_container": True, # Enable container ownership handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + 3 + + # Check the first entry. + assert inputs[0] == outputs[0].record + + # Check the ownership for the first dataset (example1) + first_ownership_aspect = outputs[3].record.aspect + assert first_ownership_aspect + assert len(first_ownership_aspect.owners) == 1 + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in first_ownership_aspect.owners + ] + ) + + # Check the ownership for the second dataset (example2) + second_ownership_aspect = builder.get_aspect_if_available( + outputs[1].record, models.OwnershipClass + ) + assert second_ownership_aspect + assert len(second_ownership_aspect.owners) == 2 # One existing + one new + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in second_ownership_aspect.owners + ] + ) + + # Check container ownerships + for i in range(2): + container_ownership_aspect = outputs[i + 4].record.aspect + assert container_ownership_aspect + ownership = json.loads(container_ownership_aspect.value.decode("utf-8")) + assert len(ownership) == 2 + assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1") + assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2") + + # Verify that the third input (not a dataset) is unchanged + assert inputs[2] == outputs[2].record + + +def test_pattern_container_and_dataset_ownership_with_no_container( + mock_time, mock_datahub_graph +): + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> Optional[models.BrowsePathsV2Class]: + return None + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_ownership_with_no_container" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + # No owner aspect for the first dataset + no_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", + aspects=[ + models.StatusClass(removed=False), + models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ), + models.BrowsePathEntryClass( + id="container_2", urn="urn:li:container:container_2" + ), + ] + ), + ], + ), + ) + # Dataset with an existing owner + with_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", + aspects=[ + models.OwnershipClass( + owners=[ + models.OwnerClass( + owner=builder.make_user_urn("fake_owner"), + type=models.OwnershipTypeClass.DATAOWNER, + ), + ], + lastModified=models.AuditStampClass( + time=1625266033123, actor="urn:li:corpuser:datahub" + ), + ), + models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ), + models.BrowsePathEntryClass( + id="container_2", urn="urn:li:container:container_2" + ), + ] + ), + ], + ), + ) + + inputs = [ + no_owner_aspect, + with_owner_aspect, + EndOfStream(), + ] + + # Initialize the transformer with container support + transformer = PatternAddDatasetOwnership.create( + { + "owner_pattern": { + "rules": { + ".*example1.*": [builder.make_user_urn("person1")], + ".*example2.*": [builder.make_user_urn("person2")], + } + }, + "ownership_type": "DATAOWNER", + "is_container": True, # Enable container ownership handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + 1 + + # Check the ownership for the first dataset (example1) + first_ownership_aspect = outputs[2].record.aspect + assert first_ownership_aspect + assert len(first_ownership_aspect.owners) == 1 + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in first_ownership_aspect.owners + ] + ) + + # Check the ownership for the second dataset (example2) + second_ownership_aspect = builder.get_aspect_if_available( + outputs[1].record, models.OwnershipClass + ) + assert second_ownership_aspect + assert len(second_ownership_aspect.owners) == 2 # One existing + one new + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in second_ownership_aspect.owners + ] + ) + + +def test_pattern_container_and_dataset_ownership_with_no_match( + mock_time, mock_datahub_graph +): + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> models.BrowsePathsV2Class: + return models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ) + ] + ) + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_ownership_with_no_match" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + # No owner aspect for the first dataset + no_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", + aspects=[ + models.StatusClass(removed=False), + ], + ), + ) + # Dataset with an existing owner + with_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", + aspects=[ + models.OwnershipClass( + owners=[ + models.OwnerClass( + owner=builder.make_user_urn("fake_owner"), + type=models.OwnershipTypeClass.DATAOWNER, + ), + ], + lastModified=models.AuditStampClass( + time=1625266033123, actor="urn:li:corpuser:datahub" + ), + ) + ], + ), + ) + + inputs = [ + no_owner_aspect, + with_owner_aspect, + EndOfStream(), + ] + + # Initialize the transformer with container support + transformer = PatternAddDatasetOwnership.create( + { + "owner_pattern": { + "rules": { + ".*example3.*": [builder.make_user_urn("person1")], + ".*example4.*": [builder.make_user_urn("person2")], + } + }, + "ownership_type": "DATAOWNER", + "is_container": True, # Enable container ownership handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + 1 + + # Check the ownership for the first dataset (example1) + first_ownership_aspect = outputs[2].record.aspect + assert first_ownership_aspect + assert builder.make_user_urn("person1") not in first_ownership_aspect.owners + assert builder.make_user_urn("person2") not in first_ownership_aspect.owners + + # Check the ownership for the second dataset (example2) + second_ownership_aspect = builder.get_aspect_if_available( + outputs[1].record, models.OwnershipClass + ) + assert second_ownership_aspect + assert len(second_ownership_aspect.owners) == 1 + assert builder.make_user_urn("person1") not in second_ownership_aspect.owners + assert builder.make_user_urn("person2") not in second_ownership_aspect.owners + assert ( + builder.make_user_urn("fake_owner") == second_ownership_aspect.owners[0].owner + ) + + def gen_owners( owners: List[str], ownership_type: Union[ @@ -2435,6 +2783,224 @@ def fake_ownership_class(entity_urn: str) -> models.OwnershipClass: assert server_owner in owner_urns +def test_pattern_container_and_dataset_domain_transformation(mock_datahub_graph): + datahub_domain = builder.make_domain_urn("datahubproject.io") + acryl_domain = builder.make_domain_urn("acryl_domain") + server_domain = builder.make_domain_urn("server_domain") + + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> models.BrowsePathsV2Class: + return models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ), + models.BrowsePathEntryClass( + id="container_2", urn="urn:li:container:container_2" + ), + ] + ) + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_domain_transformation" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + with_domain_aspect = make_generic_dataset_mcp( + aspect=models.DomainsClass(domains=[datahub_domain]), aspect_name="domains" + ) + no_domain_aspect = make_generic_dataset_mcp( + entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)" + ) + + # Not a dataset, should be ignored + not_a_dataset = models.MetadataChangeEventClass( + proposedSnapshot=models.DataJobSnapshotClass( + urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", + aspects=[ + models.DataJobInfoClass( + name="User Deletions", + description="Constructs the fct_users_deleted from logging_events", + type=models.AzkabanJobTypeClass.SQL, + ) + ], + ) + ) + + inputs = [ + with_domain_aspect, + no_domain_aspect, + not_a_dataset, + EndOfStream(), + ] + + # Initialize the transformer with container support for domains + transformer = PatternAddDatasetDomain.create( + { + "domain_pattern": { + "rules": { + ".*example1.*": [acryl_domain, server_domain], + ".*example2.*": [server_domain], + } + }, + "is_container": True, # Enable container domain handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert ( + len(outputs) == len(inputs) + 3 + ) # MCPs for the dataset without domains and the containers + + first_domain_aspect = outputs[0].record.aspect + assert first_domain_aspect + assert len(first_domain_aspect.domains) == 3 + assert all( + domain in first_domain_aspect.domains + for domain in [datahub_domain, acryl_domain, server_domain] + ) + + second_domain_aspect = outputs[3].record.aspect + assert second_domain_aspect + assert len(second_domain_aspect.domains) == 1 + assert server_domain in second_domain_aspect.domains + + # Verify that the third input (not a dataset) is unchanged + assert inputs[2] == outputs[2].record + + # Verify conainer 1 and container 2 should contain all domains + container_1 = outputs[4].record.aspect + assert len(container_1.domains) == 2 + assert acryl_domain in container_1.domains + assert server_domain in container_1.domains + + container_2 = outputs[5].record.aspect + assert len(container_2.domains) == 2 + assert acryl_domain in container_2.domains + assert server_domain in container_2.domains + + +def test_pattern_container_and_dataset_domain_transformation_with_no_container( + mock_datahub_graph, +): + datahub_domain = builder.make_domain_urn("datahubproject.io") + acryl_domain = builder.make_domain_urn("acryl_domain") + server_domain = builder.make_domain_urn("server_domain") + + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> Optional[models.BrowsePathsV2Class]: + return None + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_domain_transformation_with_no_container" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + with_domain_aspect = make_generic_dataset_mcp( + aspect=models.DomainsClass(domains=[datahub_domain]), aspect_name="domains" + ) + no_domain_aspect = make_generic_dataset_mcp( + entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)" + ) + + inputs = [ + with_domain_aspect, + no_domain_aspect, + EndOfStream(), + ] + + # Initialize the transformer with container support for domains + transformer = PatternAddDatasetDomain.create( + { + "domain_pattern": { + "rules": { + ".*example1.*": [acryl_domain, server_domain], + ".*example2.*": [server_domain], + } + }, + "is_container": True, # Enable container domain handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + 1 + + first_domain_aspect = outputs[0].record.aspect + assert first_domain_aspect + assert len(first_domain_aspect.domains) == 3 + assert all( + domain in first_domain_aspect.domains + for domain in [datahub_domain, acryl_domain, server_domain] + ) + + second_domain_aspect = outputs[2].record.aspect + assert second_domain_aspect + assert len(second_domain_aspect.domains) == 1 + assert server_domain in second_domain_aspect.domains + + +def test_pattern_add_container_dataset_domain_no_match(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + datahub_domain = builder.make_domain_urn("datahubproject.io") + pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*" + + pipeline_context: PipelineContext = PipelineContext( + run_id="test_simple_add_dataset_domain" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> models.BrowsePathsV2Class: + return models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ) + ] + ) + + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=PatternAddDatasetDomain, + aspect=models.DomainsClass(domains=[datahub_domain]), + config={ + "replace_existing": True, + "domain_pattern": {"rules": {pattern: [acryl_domain]}}, + "is_container": True, + }, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0] is not None + assert output[0].record is not None + assert isinstance(output[0].record, MetadataChangeProposalWrapper) + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 0 + + def run_pattern_dataset_schema_terms_transformation_semantics( semantics: TransformerSemantics, mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph], From e1514d5e8eb2cbb90b31bc9963d8de9d0d45b089 Mon Sep 17 00:00:00 2001 From: skrydal Date: Tue, 1 Oct 2024 21:51:00 +0200 Subject: [PATCH 4/4] fix(ingestion/nifi): Improve nifi lineage extraction performance (#11490) --- .../src/datahub/ingestion/source/nifi.py | 187 +++++++++++++----- .../tests/unit/test_nifi_source.py | 5 +- 2 files changed, 142 insertions(+), 50 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index 52dce3a8b7599..25781cd2f1dcc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -2,10 +2,11 @@ import logging import ssl import time +from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from enum import Enum -from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union +from typing import Callable, Dict, Iterable, List, Optional, Set, Union from urllib.parse import urljoin import requests @@ -196,6 +197,75 @@ def validator_site_url(cls, site_url: str) -> str: return site_url +class BidirectionalComponentGraph: + def __init__(self): + self._outgoing: Dict[str, Set[str]] = defaultdict(set) + self._incoming: Dict[str, Set[str]] = defaultdict(set) + # this will not count duplicates/removal of non-existing connections correctly - it is only there for a quick check + self._connections_cnt = 0 + + def add_connection(self, from_component: str, to_component: str) -> None: + # this is sanity check + outgoing_duplicated = to_component in self._outgoing[from_component] + incoming_duplicated = from_component in self._incoming[to_component] + + self._outgoing[from_component].add(to_component) + self._incoming[to_component].add(from_component) + self._connections_cnt += 1 + + if outgoing_duplicated or incoming_duplicated: + logger.warning( + f"Somehow we attempted to add a connection between 2 components which already existed! Duplicated incoming: {incoming_duplicated}, duplicated outgoing: {outgoing_duplicated}. Connection from component: {from_component} to component: {to_component}" + ) + + def remove_connection(self, from_component: str, to_component: str) -> None: + self._outgoing[from_component].discard(to_component) + self._incoming[to_component].discard(from_component) + self._connections_cnt -= 1 + + def get_outgoing_connections(self, component: str) -> Set[str]: + return self._outgoing[component] + + def get_incoming_connections(self, component: str) -> Set[str]: + return self._incoming[component] + + def delete_component(self, component: str) -> None: + logger.debug(f"Deleting component with id: {component}") + incoming = self._incoming[component] + logger.debug( + f"Recognized {len(incoming)} incoming connections to the component" + ) + outgoing = self._outgoing[component] + logger.debug( + f"Recognized {len(outgoing)} outgoing connections from the component" + ) + + for i in incoming: + for o in outgoing: + self.add_connection(i, o) + + for i in incoming: + self._outgoing[i].remove(component) + for o in outgoing: + self._incoming[o].remove(component) + + added_connections_cnt = len(incoming) * len(outgoing) + deleted_connections_cnt = len(incoming) + len(outgoing) + logger.debug( + f"Deleted {deleted_connections_cnt} connections and added {added_connections_cnt}" + ) + + del self._outgoing[component] + del self._incoming[component] + + # for performance reasons we are not using `remove_connection` function when deleting an entire component, + # therefor we need to adjust the estimated count + self._connections_cnt -= deleted_connections_cnt + + def __len__(self): + return self._connections_cnt + + TOKEN_ENDPOINT = "access/token" KERBEROS_TOKEN_ENDPOINT = "access/kerberos" ABOUT_ENDPOINT = "flow/about" @@ -360,7 +430,9 @@ class NifiFlow: root_process_group: NifiProcessGroup components: Dict[str, NifiComponent] = field(default_factory=dict) remotely_accessible_ports: Dict[str, NifiComponent] = field(default_factory=dict) - connections: List[Tuple[str, str]] = field(default_factory=list) + connections: BidirectionalComponentGraph = field( + default_factory=BidirectionalComponentGraph + ) processGroups: Dict[str, NifiProcessGroup] = field(default_factory=dict) remoteProcessGroups: Dict[str, NifiRemoteProcessGroup] = field(default_factory=dict) remote_ports: Dict[str, NifiComponent] = field(default_factory=dict) @@ -416,10 +488,15 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": def get_report(self) -> SourceReport: return self.report - def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 + def update_flow( + self, pg_flow_dto: Dict, recursion_level: int = 0 + ) -> None: # noqa: C901 """ Update self.nifi_flow with contents of the input process group `pg_flow_dto` """ + logger.debug( + f"Updating flow with pg_flow_dto {pg_flow_dto.get('breadcrumb', {}).get('breadcrumb', {}).get('id')}, recursion level: {recursion_level}" + ) breadcrumb_dto = pg_flow_dto.get("breadcrumb", {}).get("breadcrumb", {}) nifi_pg = NifiProcessGroup( breadcrumb_dto.get("id"), @@ -433,6 +510,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 flow_dto = pg_flow_dto.get("flow", {}) + logger.debug(f"Processing {len(flow_dto.get('processors', []))} processors") for processor in flow_dto.get("processors", []): component = processor.get("component") self.nifi_flow.components[component.get("id")] = NifiComponent( @@ -445,6 +523,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 comments=component.get("config", {}).get("comments"), status=component.get("status", {}).get("runStatus"), ) + logger.debug(f"Processing {len(flow_dto.get('funnels', []))} funnels") for funnel in flow_dto.get("funnels", []): component = funnel.get("component") self.nifi_flow.components[component.get("id")] = NifiComponent( @@ -458,13 +537,15 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding funnel {component.get('id')}") + logger.debug(f"Processing {len(flow_dto.get('connections', []))} connections") for connection in flow_dto.get("connections", []): # Exclude self - recursive relationships if connection.get("sourceId") != connection.get("destinationId"): - self.nifi_flow.connections.append( - (connection.get("sourceId"), connection.get("destinationId")) + self.nifi_flow.connections.add_connection( + connection.get("sourceId"), connection.get("destinationId") ) + logger.debug(f"Processing {len(flow_dto.get('inputPorts', []))} inputPorts") for inputPort in flow_dto.get("inputPorts", []): component = inputPort.get("component") if inputPort.get("allowRemoteAccess"): @@ -492,6 +573,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding port {component.get('id')}") + logger.debug(f"Processing {len(flow_dto.get('outputPorts', []))} outputPorts") for outputPort in flow_dto.get("outputPorts", []): component = outputPort.get("component") if outputPort.get("allowRemoteAccess"): @@ -519,6 +601,9 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding report port {component.get('id')}") + logger.debug( + f"Processing {len(flow_dto.get('remoteProcessGroups', []))} remoteProcessGroups" + ) for rpg in flow_dto.get("remoteProcessGroups", []): rpg_component = rpg.get("component", {}) remote_ports = {} @@ -564,7 +649,13 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 self.nifi_flow.components.update(remote_ports) self.nifi_flow.remoteProcessGroups[nifi_rpg.id] = nifi_rpg + logger.debug( + f"Processing {len(flow_dto.get('processGroups', []))} processGroups" + ) for pg in flow_dto.get("processGroups", []): + logger.debug( + f"Retrieving process group: {pg.get('id')} while updating flow for {pg_flow_dto.get('breadcrumb', {}).get('breadcrumb', {}).get('id')}" + ) pg_response = self.session.get( url=urljoin(self.rest_api_base_url, PG_ENDPOINT) + pg.get("id") ) @@ -578,11 +669,24 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 pg_flow_dto = pg_response.json().get("processGroupFlow", {}) - self.update_flow(pg_flow_dto) + self.update_flow(pg_flow_dto, recursion_level=recursion_level + 1) def update_flow_keep_only_ingress_egress(self): components_to_del: List[NifiComponent] = [] - for component in self.nifi_flow.components.values(): + components = self.nifi_flow.components.values() + logger.debug( + f"Processing {len(components)} components for keep only ingress/egress" + ) + logger.debug( + f"All the connections recognized: {len(self.nifi_flow.connections)}" + ) + for index, component in enumerate(components, start=1): + logger.debug( + f"Processing {index}th component for ingress/egress pruning. Component id: {component.id}, name: {component.name}, type: {component.type}" + ) + logger.debug( + f"Current amount of connections: {len(self.nifi_flow.connections)}" + ) if ( component.nifi_type is NifiType.PROCESSOR and component.type @@ -592,47 +696,28 @@ def update_flow_keep_only_ingress_egress(self): NifiType.REMOTE_INPUT_PORT, NifiType.REMOTE_OUTPUT_PORT, ]: + self.nifi_flow.connections.delete_component(component.id) components_to_del.append(component) - incoming = list( - filter(lambda x: x[1] == component.id, self.nifi_flow.connections) - ) - outgoing = list( - filter(lambda x: x[0] == component.id, self.nifi_flow.connections) - ) - # Create new connections from incoming to outgoing - for i in incoming: - for j in outgoing: - self.nifi_flow.connections.append((i[0], j[1])) - - # Remove older connections, as we already created - # new connections bypassing component to be deleted - - for i in incoming: - self.nifi_flow.connections.remove(i) - for j in outgoing: - self.nifi_flow.connections.remove(j) - - for c in components_to_del: - if c.nifi_type is NifiType.PROCESSOR and ( - c.name.startswith("Get") - or c.name.startswith("List") - or c.name.startswith("Fetch") - or c.name.startswith("Put") + + for component in components_to_del: + if component.nifi_type is NifiType.PROCESSOR and component.name.startswith( + ("Get", "List", "Fetch", "Put") ): self.report.warning( - f"Dropping NiFi Processor of type {c.type}, id {c.id}, name {c.name} from lineage view. \ + f"Dropping NiFi Processor of type {component.type}, id {component.id}, name {component.name} from lineage view. \ This is likely an Ingress or Egress node which may be reading to/writing from external datasets \ However not currently supported in datahub", self.config.site_url, ) else: logger.debug( - f"Dropping NiFi Component of type {c.type}, id {c.id}, name {c.name} from lineage view." + f"Dropping NiFi Component of type {component.type}, id {component.id}, name {component.name} from lineage view." ) - del self.nifi_flow.components[c.id] + del self.nifi_flow.components[component.id] def create_nifi_flow(self): + logger.debug(f"Retrieving NIFI info from {ABOUT_ENDPOINT}") about_response = self.session.get( url=urljoin(self.rest_api_base_url, ABOUT_ENDPOINT) ) @@ -646,6 +731,8 @@ def create_nifi_flow(self): ) else: logger.warning("Failed to fetch version for nifi") + logger.debug(f"Retrieved nifi version: {nifi_version}") + logger.debug(f"Retrieving cluster info from {CLUSTER_ENDPOINT}") cluster_response = self.session.get( url=urljoin(self.rest_api_base_url, CLUSTER_ENDPOINT) ) @@ -654,8 +741,10 @@ def create_nifi_flow(self): clustered = ( cluster_response.json().get("clusterSummary", {}).get("clustered") ) + logger.debug(f"Retrieved cluster summary: {clustered}") else: logger.warning("Failed to fetch cluster summary for flow") + logger.debug("Retrieving ROOT Process Group") pg_response = self.session.get( url=urljoin(self.rest_api_base_url, PG_ENDPOINT) + "root" ) @@ -695,7 +784,7 @@ def fetch_provenance_events( if provenance_response.ok: provenance = provenance_response.json().get("provenance", {}) provenance_uri = provenance.get("uri") - + logger.debug(f"Retrieving provenance uri: {provenance_uri}") provenance_response = self.session.get(provenance_uri) if provenance_response.ok: provenance = provenance_response.json().get("provenance", {}) @@ -734,7 +823,9 @@ def fetch_provenance_events( total = provenance.get("results", {}).get("total") totalCount = provenance.get("results", {}).get("totalCount") + logger.debug(f"Retrieved {totalCount} of {total}") if total != str(totalCount): + logger.debug("Trying to retrieve more events for the same processor") yield from self.fetch_provenance_events( processor, eventType, startDate, oldest_event_time ) @@ -800,6 +891,7 @@ def submit_provenance_query(self, processor, eventType, startDate, endDate): return provenance_response def delete_provenance(self, provenance_uri): + logger.debug(f"Deleting provenance with uri: {provenance_uri}") delete_response = self.session.delete(provenance_uri) if not delete_response.ok: logger.error("failed to delete provenance ", provenance_uri) @@ -821,12 +913,8 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 job_name = component.name job_urn = builder.make_data_job_urn_with_flow(flow_urn, component.id) - incoming = list( - filter(lambda x: x[1] == component.id, self.nifi_flow.connections) - ) - outgoing = list( - filter(lambda x: x[0] == component.id, self.nifi_flow.connections) - ) + incoming = self.nifi_flow.connections.get_incoming_connections(component.id) + outgoing = self.nifi_flow.connections.get_outgoing_connections(component.id) inputJobs = set() jobProperties = None @@ -864,8 +952,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 datasetProperties=dataset.dataset_properties, ) - for edge in incoming: - incoming_from = edge[0] + for incoming_from in incoming: if incoming_from in self.nifi_flow.remotely_accessible_ports.keys(): dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[incoming_from].name}" dataset_urn = builder.make_dataset_urn( @@ -882,8 +969,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 builder.make_data_job_urn_with_flow(flow_urn, incoming_from) ) - for edge in outgoing: - outgoing_to = edge[1] + for outgoing_to in outgoing: if outgoing_to in self.nifi_flow.remotely_accessible_ports.keys(): dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[outgoing_to].name}" dataset_urn = builder.make_dataset_urn( @@ -977,14 +1063,19 @@ def make_flow_urn(self) -> str: ) def process_provenance_events(self): + logger.debug("Starting processing of provenance events") startDate = datetime.now(timezone.utc) - timedelta( days=self.config.provenance_days ) eventAnalyzer = NifiProcessorProvenanceEventAnalyzer() eventAnalyzer.env = self.config.env - - for component in self.nifi_flow.components.values(): + components = self.nifi_flow.components.values() + logger.debug(f"Processing {len(components)} components") + for component in components: + logger.debug( + f"Processing provenance events for component id: {component.id} name: {component.name}" + ) if component.nifi_type is NifiType.PROCESSOR: eventType = eventAnalyzer.KNOWN_INGRESS_EGRESS_PROCESORS[component.type] events = self.fetch_provenance_events(component, eventType, startDate) diff --git a/metadata-ingestion/tests/unit/test_nifi_source.py b/metadata-ingestion/tests/unit/test_nifi_source.py index 9e8bf64261ffa..30a0855d44f34 100644 --- a/metadata-ingestion/tests/unit/test_nifi_source.py +++ b/metadata-ingestion/tests/unit/test_nifi_source.py @@ -6,6 +6,7 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.nifi import ( + BidirectionalComponentGraph, NifiComponent, NifiFlow, NifiProcessGroup, @@ -55,7 +56,7 @@ def test_nifi_s3_provenance_event(): ) }, remotely_accessible_ports={}, - connections=[], + connections=BidirectionalComponentGraph(), processGroups={ "803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup( id="803ebb92-017d-1000-2961-4bdaa27a3ba0", @@ -126,7 +127,7 @@ def test_nifi_s3_provenance_event(): ) }, remotely_accessible_ports={}, - connections=[], + connections=BidirectionalComponentGraph(), processGroups={ "803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup( id="803ebb92-017d-1000-2961-4bdaa27a3ba0",