Skip to content

Commit

Permalink
feat(ingest/GX): add urn lowercasing option for GX assertions (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Mar 7, 2023
1 parent 4f78de6 commit 406b11a
Show file tree
Hide file tree
Showing 5 changed files with 1,532 additions and 361 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/integration_docs/great-expectations.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ This integration does not support
- `retry_max_times` (optional): Maximum times to retry if HTTP request fails. The delay between retries is increased exponentially.
- `extra_headers` (optional): Extra headers which will be added to the datahub request.
- `parse_table_names_from_sql` (defaults to false): The integration can use an SQL parser to try to parse the datasets being asserted. This parsing is disabled by default, but can be enabled by setting `parse_table_names_from_sql: True`. The parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package.
- `convert_urns_to_lowercase` (optional): Whether to convert dataset urns to lowercase.
## Debugging
Set environment variable `DATAHUB_DEBUG` (default `false`) to `true` to enable debug logging for `DataHubValidationAction`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(
extra_headers: Optional[Dict[str, str]] = None,
exclude_dbname: Optional[bool] = None,
parse_table_names_from_sql: bool = False,
convert_urns_to_lowercase: bool = False,
):
super().__init__(data_context)
self.server_url = server_url
Expand All @@ -101,6 +102,7 @@ def __init__(
self.extra_headers = extra_headers
self.exclude_dbname = exclude_dbname
self.parse_table_names_from_sql = parse_table_names_from_sql
self.convert_urns_to_lowercase = convert_urns_to_lowercase

def _run(
self,
Expand Down Expand Up @@ -593,6 +595,7 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
),
self.exclude_dbname,
self.platform_alias,
self.convert_urns_to_lowercase,
)
batchSpec = BatchSpec(
nativeBatchId=batch_identifier,
Expand Down Expand Up @@ -661,6 +664,7 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
),
self.exclude_dbname,
self.platform_alias,
self.convert_urns_to_lowercase,
)
dataset_partitions.append(
{
Expand Down Expand Up @@ -703,6 +707,7 @@ def make_dataset_urn_from_sqlalchemy_uri(
platform_instance=None,
exclude_dbname=None,
platform_alias=None,
convert_urns_to_lowercase=False,
):
data_platform = get_platform_from_sqlalchemy_uri(str(sqlalchemy_uri))
url_instance = make_url(sqlalchemy_uri)
Expand Down Expand Up @@ -777,6 +782,9 @@ def make_dataset_urn_from_sqlalchemy_uri(

dataset_name = f"{schema_name}.{table_name}"

if convert_urns_to_lowercase:
dataset_name = dataset_name.lower()

dataset_urn = builder.make_dataset_urn_with_platform_instance(
platform=data_platform if platform_alias is None else platform_alias,
name=dataset_name,
Expand Down
Loading

0 comments on commit 406b11a

Please sign in to comment.