From 187d9f48167b86f8c5a825bf08b82d19642c083d Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 29 Jun 2022 12:42:09 +0200 Subject: [PATCH 1/3] Fixing partitioned table profiling in BQ Adding support for date tables' profiling in BQ --- .../ingestion/source/ge_data_profiler.py | 16 ++++++++++++--- .../datahub/ingestion/source/sql/bigquery.py | 20 ++++++++++++++----- .../ingestion/source/usage/bigquery_usage.py | 8 +++++--- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index a486b3bb545aa..e94c4fd3aea38 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -11,6 +11,7 @@ from great_expectations import __version__ as ge_version +from datahub.configuration.common import ConfigurationError from datahub.telemetry import stats, telemetry # Fun compatibility hack! GE version 0.13.44 broke compatibility with SQLAlchemy 1.3.24. @@ -872,7 +873,15 @@ def _generate_single_profile( ge_config["schema"] = temp_table_db if self.config.bigquery_temp_table_schema: - bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" + num_parts = self.config.bigquery_temp_table_schema.split(".") + if len(num_parts) == 1: + bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" + elif len(num_parts) == 2: + bigquery_temp_table = f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" + else: + raise ConfigurationError( + f"bigquery_temp_table_schema should be either project.dataset or dataset format but it was: {self.config.bigquery_temp_table_schema}" + ) else: assert table table_parts = table.split(".") @@ -970,12 +979,13 @@ def _get_ge_dataset( if platform is not None and platform == "bigquery": # This is done as GE makes the name as DATASET.TABLE # but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups - logger.debug(f"Setting table name to be {pretty_name}") - batch._table = sa.text(pretty_name) name_parts = pretty_name.split(".") if len(name_parts) != 3: logger.error( f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts." ) + if len(str(batch._table).split(".")) == 2: + batch._table = sa.text(f"{name_parts[0]}.{str(batch._table)}") + logger.debug(f"Setting table name to be {batch._table}") return batch diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index cbbf3e561398a..f9a5106ff0857 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -160,6 +160,11 @@ WHERE table_id LIKE '{table}%' """.strip() +BQ_GET_LATEST_DATE_TABLE = """ +SELECT MAX(table_name) as max_shard +FROM `{project_id}.{schema}.INFORMATION_SCHEMA.TABLES` +where REGEXP_CONTAINS(table_name, r'^\d{{{date_length}}}$') +""".strip() # The existing implementation of this method can be found here: # https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/base.py#L1018-L1025. @@ -707,11 +712,16 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool: engine = self._get_engine(for_run_sql=True) if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids: with engine.connect() as con: - sql = BQ_GET_LATEST_SHARD.format( - project_id=project_id, - schema=schema, - table=table_name, - ) + if table_name is not None: + sql = BQ_GET_LATEST_SHARD.format( + project_id=project_id, + schema=schema, + table=table_name, + ) + else: + sql = BQ_GET_LATEST_DATE_TABLE.format( + project_id=project_id, schema=schema, date_length=len(shard) + ) result = con.execute(sql) for row in result: diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index 2fe25679a9ab1..5f49d06bb0aeb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -281,14 +281,16 @@ def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef": if matches: table_name = matches.group(1) if matches: - logger.debug( - f"Found sharded table {self.table}. Using {table_name} as the table name." - ) if not table_name: logger.debug( f"Using dataset id {self.dataset} as table name because table only contains date value {self.table}" ) table_name = self.dataset + + logger.debug( + f"Found sharded table {self.table}. Using {table_name} as the table name." + ) + return BigQueryTableRef(self.project, self.dataset, table_name) # Handle table snapshots. From 779c8aad312f6849bc682da2e4705abdeb248d5f Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 29 Jun 2022 12:52:33 +0200 Subject: [PATCH 2/3] Adding some additional comment --- .../src/datahub/ingestion/source/ge_data_profiler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index e94c4fd3aea38..c2dd7c0f15d59 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -874,6 +874,7 @@ def _generate_single_profile( if self.config.bigquery_temp_table_schema: num_parts = self.config.bigquery_temp_table_schema.split(".") + # If we only have 1 part that means the project_id is missing from the table name and we add it if len(num_parts) == 1: bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" elif len(num_parts) == 2: @@ -984,6 +985,8 @@ def _get_ge_dataset( logger.error( f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts." ) + # If we only have two parts that means the project_id is missing from the table name and we add it + # Temp tables has 3 parts while normal tables only has 2 parts if len(str(batch._table).split(".")) == 2: batch._table = sa.text(f"{name_parts[0]}.{str(batch._table)}") logger.debug(f"Setting table name to be {batch._table}") From 3a56c64e53c9615577e490fa32caccb8d4df8886 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 29 Jun 2022 13:15:04 +0200 Subject: [PATCH 3/3] Fixing invalid escape character --- metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index f9a5106ff0857..8aea67cc72bba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -163,7 +163,7 @@ BQ_GET_LATEST_DATE_TABLE = """ SELECT MAX(table_name) as max_shard FROM `{project_id}.{schema}.INFORMATION_SCHEMA.TABLES` -where REGEXP_CONTAINS(table_name, r'^\d{{{date_length}}}$') +where REGEXP_CONTAINS(table_name, r'^\\d{{{date_length}}}$') """.strip() # The existing implementation of this method can be found here: