Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion): profiling - Fixing partitioned table profiling in BQ #5283

Merged
merged 5 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from great_expectations import __version__ as ge_version

from datahub.configuration.common import ConfigurationError
from datahub.telemetry import stats, telemetry

# Fun compatibility hack! GE version 0.13.44 broke compatibility with SQLAlchemy 1.3.24.
Expand Down Expand Up @@ -872,7 +873,15 @@ def _generate_single_profile(
ge_config["schema"] = temp_table_db

if self.config.bigquery_temp_table_schema:
bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
num_parts = self.config.bigquery_temp_table_schema.split(".")
if len(num_parts) == 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments on the PR for the special cases added? It is not clear why we have these special cases

bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
elif len(num_parts) == 2:
bigquery_temp_table = f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
else:
raise ConfigurationError(
f"bigquery_temp_table_schema should be either project.dataset or dataset format but it was: {self.config.bigquery_temp_table_schema}"
)
else:
assert table
table_parts = table.split(".")
Expand Down Expand Up @@ -970,12 +979,13 @@ def _get_ge_dataset(
if platform is not None and platform == "bigquery":
# This is done as GE makes the name as DATASET.TABLE
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
logger.debug(f"Setting table name to be {pretty_name}")
batch._table = sa.text(pretty_name)
name_parts = pretty_name.split(".")
if len(name_parts) != 3:
logger.error(
f"Unexpected {pretty_name} while profiling. Should have 3 parts but has {len(name_parts)} parts."
)
if len(str(batch._table).split(".")) == 2:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments on the PR for the special cases added? It is not clear why we have this special case

batch._table = sa.text(f"{name_parts[0]}.{str(batch._table)}")
logger.debug(f"Setting table name to be {batch._table}")

return batch
20 changes: 15 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@
WHERE table_id LIKE '{table}%'
""".strip()

BQ_GET_LATEST_DATE_TABLE = """
SELECT MAX(table_name) as max_shard
FROM `{project_id}.{schema}.INFORMATION_SCHEMA.TABLES`
where REGEXP_CONTAINS(table_name, r'^\d{{{date_length}}}$')
""".strip()

# The existing implementation of this method can be found here:
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/base.py#L1018-L1025.
Expand Down Expand Up @@ -707,11 +712,16 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool:
engine = self._get_engine(for_run_sql=True)
if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids:
with engine.connect() as con:
sql = BQ_GET_LATEST_SHARD.format(
project_id=project_id,
schema=schema,
table=table_name,
)
if table_name is not None:
sql = BQ_GET_LATEST_SHARD.format(
project_id=project_id,
schema=schema,
table=table_name,
)
else:
sql = BQ_GET_LATEST_DATE_TABLE.format(
project_id=project_id, schema=schema, date_length=len(shard)
)

result = con.execute(sql)
for row in result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,16 @@ def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef":
if matches:
table_name = matches.group(1)
if matches:
logger.debug(
f"Found sharded table {self.table}. Using {table_name} as the table name."
)
if not table_name:
logger.debug(
f"Using dataset id {self.dataset} as table name because table only contains date value {self.table}"
)
table_name = self.dataset

logger.debug(
f"Found sharded table {self.table}. Using {table_name} as the table name."
)

return BigQueryTableRef(self.project, self.dataset, table_name)

# Handle table snapshots.
Expand Down