From 5d5d91b4319584b7a76f61fdf9cecef2d8beadcd Mon Sep 17 00:00:00 2001 From: MugdhaHardikar-GSLab Date: Thu, 14 Jul 2022 18:35:17 +0530 Subject: [PATCH 1/2] fix(bigquery-usage): fix dataset name for sharded table --- .../ingestion/source/usage/bigquery_usage.py | 22 ++++++++----------- .../tests/unit/test_bigquery_usage_source.py | 6 +++++ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index c3d990affb9ef..b141407b67414 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -272,20 +272,16 @@ def is_temporary_table(self, prefix: str) -> bool: def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef": # Handle partitioned and sharded tables. table_name: Optional[str] = None + shortened_table_name = self.table + # if table name ends in _* or * then we strip it as that represents a query on a sharded table + shortened_table_name = shortened_table_name.removesuffix("_*") + shortened_table_name = shortened_table_name.removesuffix("*") - # if table name ends in _* then we strip it as that represents a query on a sharded table - if self.table.endswith("_*"): - table_name = self.table[:-2] - logger.debug( - f"Found query on sharded table {self.table}. Using {table_name} as the table name." - ) - return BigQueryTableRef(self.project, self.dataset, table_name) - - matches = re.match(sharded_table_regex, self.table) + matches = re.match(sharded_table_regex, shortened_table_name) if matches: table_name = matches.group(2) else: - matches = PARTITION_SUMMARY_REGEXP.match(self.table) + matches = PARTITION_SUMMARY_REGEXP.match(shortened_table_name) if matches: table_name = matches.group(1) if matches: @@ -302,7 +298,7 @@ def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef": return BigQueryTableRef(self.project, self.dataset, table_name) # Handle table snapshots. - matches = SNAPSHOT_TABLE_REGEX.match(self.table) + matches = SNAPSHOT_TABLE_REGEX.match(shortened_table_name) if matches: table_name = matches.group(1) logger.debug( @@ -312,14 +308,14 @@ def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef": # Handle exceptions invalid_chars_in_table_name: List[str] = [ - c for c in {"$", "@"} if c in self.table + c for c in {"$", "@"} if c in shortened_table_name ] if invalid_chars_in_table_name: raise ValueError( f"Cannot handle {self} - poorly formatted table name, contains {invalid_chars_in_table_name}" ) - return self + return BigQueryTableRef(self.project, self.dataset, shortened_table_name) def __str__(self) -> str: return f"projects/{self.project}/datasets/{self.dataset}/tables/{self.table}" diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage_source.py b/metadata-ingestion/tests/unit/test_bigquery_usage_source.py index 7f2b7f761b10c..94e0cc2475266 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage_source.py @@ -194,3 +194,9 @@ def test_bigquery_ref_extra_removal(): assert new_table_ref.table == "foo" assert new_table_ref.project == table_ref.project assert new_table_ref.dataset == table_ref.dataset + + table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo_2016*") + new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX) + assert new_table_ref.table == "foo" + assert new_table_ref.project == table_ref.project + assert new_table_ref.dataset == table_ref.dataset From fe91b5fe04ec1762663b64e9019eadfbfef9ab49 Mon Sep 17 00:00:00 2001 From: MugdhaHardikar-GSLab Date: Tue, 19 Jul 2022 10:43:57 +0530 Subject: [PATCH 2/2] support python3.6 --- .../ingestion/source/usage/bigquery_usage.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index b141407b67414..538e760f8ff6d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -156,7 +156,6 @@ """.strip(), } - OPERATION_STATEMENT_TYPES = { "INSERT": OperationTypeClass.INSERT, "UPDATE": OperationTypeClass.UPDATE, @@ -269,13 +268,19 @@ def is_temporary_table(self, prefix: str) -> bool: # Temporary tables will have a dataset that begins with an underscore. return self.dataset.startswith(prefix) + @staticmethod + def remove_suffix(input_string, suffix): + if suffix and input_string.endswith(suffix): + return input_string[: -len(suffix)] + return input_string + def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef": # Handle partitioned and sharded tables. table_name: Optional[str] = None shortened_table_name = self.table # if table name ends in _* or * then we strip it as that represents a query on a sharded table - shortened_table_name = shortened_table_name.removesuffix("_*") - shortened_table_name = shortened_table_name.removesuffix("*") + shortened_table_name = self.remove_suffix(shortened_table_name, "_*") + shortened_table_name = self.remove_suffix(shortened_table_name, "*") matches = re.match(sharded_table_regex, shortened_table_name) if matches: @@ -1142,7 +1147,7 @@ def _create_operation_aspect_work_unit( aspect=operation_aspect, ) return MetadataWorkUnit( - id=f"{datetime.fromtimestamp(last_updated_timestamp/1000).isoformat()}-operation-aspect-{destination_table}", + id=f"{datetime.fromtimestamp(last_updated_timestamp / 1000).isoformat()}-operation-aspect-{destination_table}", mcp=mcp, )