Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/dbt): introduce lowercase column urn option #7418

Merged
merged 6 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,10 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:
test_result=test_result,
)

def _parse_into_dbt_column(self, column: Dict) -> DBTColumn:
def _parse_into_dbt_column(
self,
column: Dict,
) -> DBTColumn:
return DBTColumn(
name=column["name"],
comment=column.get("comment", ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ class DBTCommonConfig(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="DBT Stateful Ingestion Config."
)
convert_column_urns_to_lowercase: bool = Field(
default=False,
description="When enabled, converts column URNs to lowercase to ensure cross-platform compatibility. "
"If `target_platform` is Snowflake, the default is True.",
)

@validator("target_platform")
def validate_target_platform_value(cls, target_platform: str) -> str:
Expand All @@ -289,6 +294,14 @@ def validate_target_platform_value(cls, target_platform: str) -> str:
)
return target_platform

@root_validator(pre=True)
def set_convert_column_urns_to_lowercase_default_for_snowflake(
cls, values: dict
) -> dict:
if values.get("target_platform", "").lower() == "snowflake":
values.setdefault("convert_column_urns_to_lowercase", True)
return values

@validator("write_semantics")
def validate_write_semantics(cls, write_semantics: str) -> str:
if write_semantics.lower() not in {"patch", "override"}:
Expand Down Expand Up @@ -1253,8 +1266,12 @@ def get_schema_metadata(
if meta_aspects.get(Constants.ADD_TERM_OPERATION):
glossaryTerms = meta_aspects.get(Constants.ADD_TERM_OPERATION)

field_name = column.name
if self.config.convert_column_urns_to_lowercase:
field_name = field_name.lower()

field = SchemaField(
fieldPath=column.name,
fieldPath=field_name,
nativeDataType=column.data_type,
type=get_column_type(
report, node.dbt_name, column.data_type, node.dbt_adapter
Expand Down
10 changes: 8 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def aws_connection_needed_if_s3_uris_present(


def get_columns(
catalog_node: dict, manifest_node: dict, tag_prefix: str
catalog_node: dict,
manifest_node: dict,
tag_prefix: str,
) -> List[DBTColumn]:
columns = []

Expand Down Expand Up @@ -256,7 +258,11 @@ def extract_dbt_entities(
logger.debug(f"Loading schema info for {dbtNode.dbt_name}")
if catalog_node is not None:
# We already have done the reporting for catalog_node being None above.
dbtNode.columns = get_columns(catalog_node, manifest_node, tag_prefix)
dbtNode.columns = get_columns(
catalog_node,
manifest_node,
tag_prefix,
)

else:
dbtNode.columns = []
Expand Down
25 changes: 25 additions & 0 deletions metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,31 @@ def test_dbt_entity_emission_configuration():
DBTCoreConfig.parse_obj(config_dict)


def test_default_convert_column_urns_to_lowercase():
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
"entities_enabled": {"models": "Yes", "seeds": "Only"},
}

config = DBTCoreConfig.parse_obj({**config_dict})
assert config.convert_column_urns_to_lowercase is False

config = DBTCoreConfig.parse_obj({**config_dict, "target_platform": "snowflake"})
assert config.convert_column_urns_to_lowercase is True

# Check that we respect the user's setting if provided.
config = DBTCoreConfig.parse_obj(
{
**config_dict,
"convert_column_urns_to_lowercase": False,
"target_platform": "snowflake",
}
)
assert config.convert_column_urns_to_lowercase is False


def test_dbt_entity_emission_configuration_helpers():
config_dict = {
"manifest_path": "dummy_path",
Expand Down