From 237216d6466faa63d886b52e6ff04bb4a6a52327 Mon Sep 17 00:00:00 2001 From: alex-magno Date: Thu, 23 Feb 2023 13:46:35 +0000 Subject: [PATCH 1/4] fix(ingest/dbt): introduce lowercase urn option --- .../datahub/ingestion/source/dbt/dbt_common.py | 14 +++++++++++++- .../datahub/ingestion/source/dbt/dbt_core.py | 17 +++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 10c1e76b1ec98..29727ca1140a0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -281,6 +281,10 @@ class DBTCommonConfig(StatefulIngestionConfigBase, LineageConfig): stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( default=None, description="DBT Stateful Ingestion Config." ) + convert_urns_to_lowercase: bool = Field( + default=False, + description="When enabled, converts all URNs to lowercase to ensure cross-platform compatibility." + ) @validator("target_platform") def validate_target_platform_value(cls, target_platform: str) -> str: @@ -394,9 +398,10 @@ def get_urn( target_platform: str, env: str, data_platform_instance: Optional[str], + convert_urns_to_lowercase: bool, ) -> str: db_fqn = self.get_db_fqn() - if target_platform != DBT_PLATFORM: + if (target_platform != DBT_PLATFORM) and convert_urns_to_lowercase: db_fqn = db_fqn.lower() return mce_builder.make_dataset_urn_with_platform_instance( platform=target_platform, @@ -439,6 +444,7 @@ def get_upstreams( environment: str, platform_instance: Optional[str], legacy_skip_source_lineage: Optional[bool], + convert_urns_to_lowercase: bool, ) -> List[str]: upstream_urns = [] @@ -470,6 +476,7 @@ def get_upstreams( platform_value, environment, platform_instance_value, + convert_urns_to_lowercase, ) ) return upstream_urns @@ -733,6 +740,7 @@ def create_test_entity_mcps( environment=self.config.env, platform_instance=None, legacy_skip_source_lineage=self.config.backcompat_skip_source_on_lineage_edge, + convert_urns_to_lowercase=self.config.convert_urns_to_lowercase, ) for upstream_urn in sorted(upstream_urns): @@ -975,6 +983,7 @@ def create_platform_mces( mce_platform, self.config.env, mce_platform_instance, + self.config.convert_urns_to_lowercase, ) if not self.config.entities_enabled.can_emit_node_type(node.node_type): logger.debug( @@ -1033,6 +1042,7 @@ def create_platform_mces( DBT_PLATFORM, self.config.env, self.config.platform_instance, + self.config.convert_urns_to_lowercase, ) upstreams_lineage_class = get_upstream_lineage([upstream_dbt_urn]) if self.config.incremental_lineage: @@ -1368,6 +1378,7 @@ def _create_lineage_aspect_for_dbt_node( self.config.env, self.config.platform_instance, self.config.backcompat_skip_source_on_lineage_edge, + self.config.convert_urns_to_lowercase, ) # if a node is of type source in dbt, its upstream lineage should have the corresponding table/view @@ -1378,6 +1389,7 @@ def _create_lineage_aspect_for_dbt_node( self.config.target_platform, self.config.env, self.config.target_platform_instance, + self.config.convert_urns_to_lowercase, ) ) if upstream_urns: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index d4492bbf1e911..e6f70ab8ae045 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -90,7 +90,10 @@ def aws_connection_needed_if_s3_uris_present( def get_columns( - catalog_node: dict, manifest_node: dict, tag_prefix: str + catalog_node: dict, + manifest_node: dict, + tag_prefix: str, + convert_urns_to_lowercase: bool, ) -> List[DBTColumn]: columns = [] @@ -109,6 +112,9 @@ def get_columns( tags = manifest_column.get("tags", []) tags = [tag_prefix + tag for tag in tags] + if convert_urns_to_lowercase: + catalog_column["name"] = catalog_column["name"].lower() + dbtCol = DBTColumn( name=catalog_column["name"], comment=catalog_column.get("comment", ""), @@ -129,6 +135,7 @@ def extract_dbt_entities( manifest_adapter: str, use_identifiers: bool, tag_prefix: str, + convert_urns_to_lowercase: bool, report: DBTSourceReport, ) -> List[DBTNode]: sources_by_id = {x["unique_id"]: x for x in sources_results} @@ -217,6 +224,11 @@ def extract_dbt_entities( kw_args=kw_args, ) + if convert_urns_to_lowercase: + name = name.lower() + manifest_node["database"] = manifest_node["database"].lower() + manifest_node["schema"] = manifest_node["schema"].lower() + dbtNode = DBTNode( dbt_name=key, dbt_adapter=manifest_adapter, @@ -256,7 +268,7 @@ def extract_dbt_entities( logger.debug(f"Loading schema info for {dbtNode.dbt_name}") if catalog_node is not None: # We already have done the reporting for catalog_node being None above. - dbtNode.columns = get_columns(catalog_node, manifest_node, tag_prefix) + dbtNode.columns = get_columns(catalog_node, manifest_node, tag_prefix, convert_urns_to_lowercase) else: dbtNode.columns = [] @@ -438,6 +450,7 @@ def loadManifestAndCatalog( manifest_adapter, self.config.use_identifiers, self.config.tag_prefix, + self.config.convert_urns_to_lowercase, self.report, ) From 528702fcad4a30c93f774f11592f79cef6a8cd91 Mon Sep 17 00:00:00 2001 From: alex-magno Date: Wed, 15 Mar 2023 12:52:42 +0000 Subject: [PATCH 2/4] fix(ingest/dbt): add covnert_columns_urns_to_lowercase option --- .../datahub/ingestion/source/dbt/dbt_cloud.py | 16 +++++++++-- .../ingestion/source/dbt/dbt_common.py | 15 +++------- .../datahub/ingestion/source/dbt/dbt_core.py | 28 +++++++++++-------- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 4ff6021f2cfca..208c5f69a7498 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -322,7 +322,11 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: if "columns" in node: # columns will be empty for ephemeral models columns = [ - self._parse_into_dbt_column(column) + self._parse_into_dbt_column( + column, + self.config.convert_column_urns_to_lowercase, + self.config.target_platform, + ) for column in sorted(node["columns"], key=lambda c: c["index"]) ] @@ -401,7 +405,15 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: test_result=test_result, ) - def _parse_into_dbt_column(self, column: Dict) -> DBTColumn: + def _parse_into_dbt_column( + self, + column: Dict, + convert_column_urns_to_lowercase: str, + target_platform: str, + ) -> DBTColumn: + if convert_column_urns_to_lowercase or target_platform.lower() == "snowflake": + column["name"] = column["name"].lower() + return DBTColumn( name=column["name"], comment=column.get("comment", ""), diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 1159e72b28505..35a8421a01648 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -279,9 +279,10 @@ class DBTCommonConfig( stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( default=None, description="DBT Stateful Ingestion Config." ) - convert_urns_to_lowercase: bool = Field( + convert_column_urns_to_lowercase: bool = Field( default=False, - description="When enabled, converts all URNs to lowercase to ensure cross-platform compatibility." + description="When enabled, converts column URNs to lowercase to ensure cross-platform compatibility. Columns are automatically " + "converted to lowercase if target platform is Snowflake.", ) @validator("target_platform") @@ -396,10 +397,9 @@ def get_urn( target_platform: str, env: str, data_platform_instance: Optional[str], - convert_urns_to_lowercase: bool, ) -> str: db_fqn = self.get_db_fqn() - if (target_platform != DBT_PLATFORM) and convert_urns_to_lowercase: + if target_platform != DBT_PLATFORM: db_fqn = db_fqn.lower() return mce_builder.make_dataset_urn_with_platform_instance( platform=target_platform, @@ -441,7 +441,6 @@ def get_upstreams( target_platform_instance: Optional[str], environment: str, platform_instance: Optional[str], - convert_urns_to_lowercase: bool, ) -> List[str]: upstream_urns = [] @@ -470,7 +469,6 @@ def get_upstreams( platform_value, environment, platform_instance_value, - convert_urns_to_lowercase, ) ) return upstream_urns @@ -736,7 +734,6 @@ def create_test_entity_mcps( target_platform_instance=self.config.target_platform_instance, environment=self.config.env, platform_instance=None, - convert_urns_to_lowercase=self.config.convert_urns_to_lowercase, ) for upstream_urn in sorted(upstream_urns): @@ -979,7 +976,6 @@ def create_platform_mces( mce_platform, self.config.env, mce_platform_instance, - self.config.convert_urns_to_lowercase, ) if not self.config.entities_enabled.can_emit_node_type(node.node_type): logger.debug( @@ -1038,7 +1034,6 @@ def create_platform_mces( DBT_PLATFORM, self.config.env, self.config.platform_instance, - self.config.convert_urns_to_lowercase, ) upstreams_lineage_class = get_upstream_lineage([upstream_dbt_urn]) if self.config.incremental_lineage: @@ -1373,7 +1368,6 @@ def _create_lineage_aspect_for_dbt_node( self.config.target_platform_instance, self.config.env, self.config.platform_instance, - self.config.convert_urns_to_lowercase, ) # if a node is of type source in dbt, its upstream lineage should have the corresponding table/view @@ -1384,7 +1378,6 @@ def _create_lineage_aspect_for_dbt_node( self.config.target_platform, self.config.env, self.config.target_platform_instance, - self.config.convert_urns_to_lowercase, ) ) if upstream_urns: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index a13af523c6a6c..92ac6001ab93e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -90,10 +90,11 @@ def aws_connection_needed_if_s3_uris_present( def get_columns( - catalog_node: dict, - manifest_node: dict, + catalog_node: dict, + manifest_node: dict, tag_prefix: str, - convert_urns_to_lowercase: bool, + convert_column_urns_to_lowercase: bool, + target_platform: str, ) -> List[DBTColumn]: columns = [] @@ -112,7 +113,7 @@ def get_columns( tags = manifest_column.get("tags", []) tags = [tag_prefix + tag for tag in tags] - if convert_urns_to_lowercase: + if convert_column_urns_to_lowercase or target_platform.lower() == "snowflake": catalog_column["name"] = catalog_column["name"].lower() dbtCol = DBTColumn( @@ -135,7 +136,8 @@ def extract_dbt_entities( manifest_adapter: str, use_identifiers: bool, tag_prefix: str, - convert_urns_to_lowercase: bool, + convert_column_urns_to_lowercase: bool, + target_platform: str, report: DBTSourceReport, ) -> List[DBTNode]: sources_by_id = {x["unique_id"]: x for x in sources_results} @@ -224,11 +226,6 @@ def extract_dbt_entities( kw_args=kw_args, ) - if convert_urns_to_lowercase: - name = name.lower() - manifest_node["database"] = manifest_node["database"].lower() - manifest_node["schema"] = manifest_node["schema"].lower() - dbtNode = DBTNode( dbt_name=key, dbt_adapter=manifest_adapter, @@ -268,7 +265,13 @@ def extract_dbt_entities( logger.debug(f"Loading schema info for {dbtNode.dbt_name}") if catalog_node is not None: # We already have done the reporting for catalog_node being None above. - dbtNode.columns = get_columns(catalog_node, manifest_node, tag_prefix, convert_urns_to_lowercase) + dbtNode.columns = get_columns( + catalog_node, + manifest_node, + tag_prefix, + convert_column_urns_to_lowercase, + target_platform, + ) else: dbtNode.columns = [] @@ -450,7 +453,8 @@ def loadManifestAndCatalog( manifest_adapter, self.config.use_identifiers, self.config.tag_prefix, - self.config.convert_urns_to_lowercase, + self.config.convert_column_urns_to_lowercase, + self.config.target_platform, self.report, ) From dfc70f416199873dd6ef6b4546ada28da0773d11 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 17 Mar 2023 13:24:04 -0700 Subject: [PATCH 3/4] simplify + add test --- .../datahub/ingestion/source/dbt/dbt_cloud.py | 11 +------- .../ingestion/source/dbt/dbt_common.py | 16 +++++++++--- .../datahub/ingestion/source/dbt/dbt_core.py | 11 -------- .../tests/unit/test_dbt_source.py | 25 +++++++++++++++++++ 4 files changed, 39 insertions(+), 24 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 208c5f69a7498..6a4fa7332850e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -322,11 +322,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: if "columns" in node: # columns will be empty for ephemeral models columns = [ - self._parse_into_dbt_column( - column, - self.config.convert_column_urns_to_lowercase, - self.config.target_platform, - ) + self._parse_into_dbt_column(column) for column in sorted(node["columns"], key=lambda c: c["index"]) ] @@ -408,12 +404,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: def _parse_into_dbt_column( self, column: Dict, - convert_column_urns_to_lowercase: str, - target_platform: str, ) -> DBTColumn: - if convert_column_urns_to_lowercase or target_platform.lower() == "snowflake": - column["name"] = column["name"].lower() - return DBTColumn( name=column["name"], comment=column.get("comment", ""), diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 35a8421a01648..97fef76fadb5b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -281,8 +281,8 @@ class DBTCommonConfig( ) convert_column_urns_to_lowercase: bool = Field( default=False, - description="When enabled, converts column URNs to lowercase to ensure cross-platform compatibility. Columns are automatically " - "converted to lowercase if target platform is Snowflake.", + description="When enabled, converts column URNs to lowercase to ensure cross-platform compatibility. " + "If `target_platform` is Snowflake, the default is True.", ) @validator("target_platform") @@ -294,6 +294,12 @@ def validate_target_platform_value(cls, target_platform: str) -> str: ) return target_platform + @root_validator(pre=True) + def set_convert_column_urns_to_lowercase_default_for_snowflake(cls, values: dict) -> dict: + if values.get("target_platform", "").lower() == "snowflake": + values.setdefault("convert_column_urns_to_lowercase", True) + return values + @validator("write_semantics") def validate_write_semantics(cls, write_semantics: str) -> str: if write_semantics.lower() not in {"patch", "override"}: @@ -1258,8 +1264,12 @@ def get_schema_metadata( if meta_aspects.get(Constants.ADD_TERM_OPERATION): glossaryTerms = meta_aspects.get(Constants.ADD_TERM_OPERATION) + field_name = column.name + if self.config.convert_column_urns_to_lowercase: + field_name = field_name.lower() + field = SchemaField( - fieldPath=column.name, + fieldPath=field_name, nativeDataType=column.data_type, type=get_column_type( report, node.dbt_name, column.data_type, node.dbt_adapter diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 92ac6001ab93e..6f6c45617aa83 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -93,8 +93,6 @@ def get_columns( catalog_node: dict, manifest_node: dict, tag_prefix: str, - convert_column_urns_to_lowercase: bool, - target_platform: str, ) -> List[DBTColumn]: columns = [] @@ -113,9 +111,6 @@ def get_columns( tags = manifest_column.get("tags", []) tags = [tag_prefix + tag for tag in tags] - if convert_column_urns_to_lowercase or target_platform.lower() == "snowflake": - catalog_column["name"] = catalog_column["name"].lower() - dbtCol = DBTColumn( name=catalog_column["name"], comment=catalog_column.get("comment", ""), @@ -136,8 +131,6 @@ def extract_dbt_entities( manifest_adapter: str, use_identifiers: bool, tag_prefix: str, - convert_column_urns_to_lowercase: bool, - target_platform: str, report: DBTSourceReport, ) -> List[DBTNode]: sources_by_id = {x["unique_id"]: x for x in sources_results} @@ -269,8 +262,6 @@ def extract_dbt_entities( catalog_node, manifest_node, tag_prefix, - convert_column_urns_to_lowercase, - target_platform, ) else: @@ -453,8 +444,6 @@ def loadManifestAndCatalog( manifest_adapter, self.config.use_identifiers, self.config.tag_prefix, - self.config.convert_column_urns_to_lowercase, - self.config.target_platform, self.report, ) diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index ca822795c6e00..08e526ac6bfda 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -197,6 +197,31 @@ def test_dbt_entity_emission_configuration(): DBTCoreConfig.parse_obj(config_dict) +def test_default_convert_column_urns_to_lowercase(): + config_dict = { + "manifest_path": "dummy_path", + "catalog_path": "dummy_path", + "target_platform": "dummy_platform", + "entities_enabled": {"models": "Yes", "seeds": "Only"}, + } + + config = DBTCoreConfig.parse_obj({**config_dict}) + assert config.convert_column_urns_to_lowercase is False + + config = DBTCoreConfig.parse_obj({**config_dict, "target_platform": "snowflake"}) + assert config.convert_column_urns_to_lowercase is True + + # Check that we respect the user's setting if provided. + config = DBTCoreConfig.parse_obj( + { + **config_dict, + "convert_column_urns_to_lowercase": False, + "target_platform": "snowflake", + } + ) + assert config.convert_column_urns_to_lowercase is False + + def test_dbt_entity_emission_configuration_helpers(): config_dict = { "manifest_path": "dummy_path", From bdeec0c0fdf93cbc5f23f0d722b0747aaa10a9b4 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 17 Mar 2023 15:32:57 -0700 Subject: [PATCH 4/4] fix lint --- .../src/datahub/ingestion/source/dbt/dbt_common.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 97fef76fadb5b..836508a054218 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -295,7 +295,9 @@ def validate_target_platform_value(cls, target_platform: str) -> str: return target_platform @root_validator(pre=True) - def set_convert_column_urns_to_lowercase_default_for_snowflake(cls, values: dict) -> dict: + def set_convert_column_urns_to_lowercase_default_for_snowflake( + cls, values: dict + ) -> dict: if values.get("target_platform", "").lower() == "snowflake": values.setdefault("convert_column_urns_to_lowercase", True) return values