diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 4ff6021f2cfca2..6a4fa7332850eb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -401,7 +401,10 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: test_result=test_result, ) - def _parse_into_dbt_column(self, column: Dict) -> DBTColumn: + def _parse_into_dbt_column( + self, + column: Dict, + ) -> DBTColumn: return DBTColumn( name=column["name"], comment=column.get("comment", ""), diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 51e822abdc1a33..836508a0542184 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -279,6 +279,11 @@ class DBTCommonConfig( stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( default=None, description="DBT Stateful Ingestion Config." ) + convert_column_urns_to_lowercase: bool = Field( + default=False, + description="When enabled, converts column URNs to lowercase to ensure cross-platform compatibility. " + "If `target_platform` is Snowflake, the default is True.", + ) @validator("target_platform") def validate_target_platform_value(cls, target_platform: str) -> str: @@ -289,6 +294,14 @@ def validate_target_platform_value(cls, target_platform: str) -> str: ) return target_platform + @root_validator(pre=True) + def set_convert_column_urns_to_lowercase_default_for_snowflake( + cls, values: dict + ) -> dict: + if values.get("target_platform", "").lower() == "snowflake": + values.setdefault("convert_column_urns_to_lowercase", True) + return values + @validator("write_semantics") def validate_write_semantics(cls, write_semantics: str) -> str: if write_semantics.lower() not in {"patch", "override"}: @@ -1253,8 +1266,12 @@ def get_schema_metadata( if meta_aspects.get(Constants.ADD_TERM_OPERATION): glossaryTerms = meta_aspects.get(Constants.ADD_TERM_OPERATION) + field_name = column.name + if self.config.convert_column_urns_to_lowercase: + field_name = field_name.lower() + field = SchemaField( - fieldPath=column.name, + fieldPath=field_name, nativeDataType=column.data_type, type=get_column_type( report, node.dbt_name, column.data_type, node.dbt_adapter diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 8a88f11411720c..6f6c45617aa836 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -90,7 +90,9 @@ def aws_connection_needed_if_s3_uris_present( def get_columns( - catalog_node: dict, manifest_node: dict, tag_prefix: str + catalog_node: dict, + manifest_node: dict, + tag_prefix: str, ) -> List[DBTColumn]: columns = [] @@ -256,7 +258,11 @@ def extract_dbt_entities( logger.debug(f"Loading schema info for {dbtNode.dbt_name}") if catalog_node is not None: # We already have done the reporting for catalog_node being None above. - dbtNode.columns = get_columns(catalog_node, manifest_node, tag_prefix) + dbtNode.columns = get_columns( + catalog_node, + manifest_node, + tag_prefix, + ) else: dbtNode.columns = [] diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index ca822795c6e007..08e526ac6bfdae 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -197,6 +197,31 @@ def test_dbt_entity_emission_configuration(): DBTCoreConfig.parse_obj(config_dict) +def test_default_convert_column_urns_to_lowercase(): + config_dict = { + "manifest_path": "dummy_path", + "catalog_path": "dummy_path", + "target_platform": "dummy_platform", + "entities_enabled": {"models": "Yes", "seeds": "Only"}, + } + + config = DBTCoreConfig.parse_obj({**config_dict}) + assert config.convert_column_urns_to_lowercase is False + + config = DBTCoreConfig.parse_obj({**config_dict, "target_platform": "snowflake"}) + assert config.convert_column_urns_to_lowercase is True + + # Check that we respect the user's setting if provided. + config = DBTCoreConfig.parse_obj( + { + **config_dict, + "convert_column_urns_to_lowercase": False, + "target_platform": "snowflake", + } + ) + assert config.convert_column_urns_to_lowercase is False + + def test_dbt_entity_emission_configuration_helpers(): config_dict = { "manifest_path": "dummy_path",