From b67aacd28d09c8dcddcd7d77f2fdcb8b25ea9170 Mon Sep 17 00:00:00 2001 From: Mugdha Hardikar Date: Wed, 6 Jul 2022 15:29:26 +0530 Subject: [PATCH] feat(bigquery): support size, rowcount, lastmodified based table selection for profiling (#5329) Co-authored-by: Shirshanka Das --- .../ingestion/source/ge_profiling_config.py | 12 ++++- .../datahub/ingestion/source/sql/bigquery.py | 52 +++++++++++++++++++ .../datahub/ingestion/source/sql/snowflake.py | 18 +++++-- .../ingestion/source/sql/sql_common.py | 25 ++++++--- 4 files changed, 93 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 2dd2f0efd654ba..c58ed771c72887 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -86,7 +86,17 @@ class GEProfilingConfig(ConfigModel): profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field( default=1, - description="Profile table only if it has been updated since these many number of days. `None` implies profile all tables. Only Snowflake supports this.", + description="Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `Snowflake` and `BigQuery`.", + ) + + profile_table_size_limit: Optional[int] = Field( + default=1, + description="Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `BigQuery`", + ) + + profile_table_row_limit: Optional[int] = Field( + default=50000, + description="Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `BigQuery`", ) # The default of (5 * cpu_count) is adopted from the default max_workers diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 37163437e490c9..21aad90e8d8271 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -433,6 +433,58 @@ def _make_gcp_logging_client( else: return [GCPLoggingClient(**client_options)] + def generate_profile_candidates( + self, + inspector: Inspector, + threshold_time: Optional[datetime.datetime], + schema: str, + ) -> Optional[List[str]]: + row_condition = ( + f"row_count<{self.config.profiling.profile_table_row_limit} and " + if self.config.profiling.profile_table_row_limit + else "" + ) + size_condition = ( + f"ROUND(size_bytes/POW(10,9),2)<{self.config.profiling.profile_table_size_limit} and " + if self.config.profiling.profile_table_size_limit + else "" + ) + time_condition = ( + f"last_modified_time>={round(threshold_time.timestamp() * 1000)} and " + if threshold_time + else "" + ) + c = f"{row_condition}{size_condition}{time_condition}" + profile_clause = c if c == "" else f" WHERE {c}"[:-4] + if profile_clause == "": + return None + project_id = self.get_db_name(inspector) + _client: BigQueryClient = BigQueryClient(project=project_id) + query = ( + f"SELECT " + f"table_id, " + f"size_bytes, " + f"last_modified_time, " + f"row_count, " + f"FROM {schema}.__TABLES__" + f"{profile_clause}" + ) + logger.debug(f"Profiling via {query}") + query_job = _client.query(query) + _profile_candidates = [] + for row in query_job: + _profile_candidates.append( + self.get_identifier( + schema=schema, + entity=row.table_id, + inspector=inspector, + ) + ) + logger.debug( + f"Generated profiling candidates for {schema}: {_profile_candidates}" + ) + return _profile_candidates + def _get_bigquery_log_entries( self, clients: List[GCPLoggingClient], diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index 92c082af24cb28..872f9a02e2239d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -72,6 +72,7 @@ def __init__(self, config: SnowflakeConfig, ctx: PipelineContext): self.report: SnowflakeReport = SnowflakeReport() self.config: SnowflakeConfig = config self.provision_role_in_progress: bool = False + self.profile_candidates: Dict[str, List[str]] = {} @classmethod def create(cls, config_dict, ctx): @@ -754,11 +755,18 @@ def _is_dataset_allowed( return True def generate_profile_candidates( - self, inspector: Inspector, threshold_time: datetime - ) -> List[str]: + self, inspector: Inspector, threshold_time: Optional[datetime], schema: str + ) -> Optional[List[str]]: + if threshold_time is None: + return None + db_name = self.current_database + if self.profile_candidates.get(db_name) is not None: + # snowflake profile candidates are available at database level, + # no need to regenerate for every schema + return self.profile_candidates[db_name] self.report.profile_if_updated_since = threshold_time _profile_candidates = [] - + logger.debug(f"Generating profiling candidates for db {db_name}") db_rows = inspector.engine.execute( text( """ @@ -771,7 +779,6 @@ def generate_profile_candidates( ) ) - db_name = self.current_database for db_row in db_rows: _profile_candidates.append( self.get_identifier( @@ -780,8 +787,9 @@ def generate_profile_candidates( inspector=inspector, ).lower() ) - logger.debug(f"Generating profiling candidates for db {db_name}") + self.report.profile_candidates[db_name] = _profile_candidates + self.profile_candidates[db_name] = _profile_candidates return _profile_candidates # Stateful Ingestion specific overrides diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index f950abd3157a79..034445bd853c55 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -1336,7 +1336,10 @@ def is_table_partitioned( # Override if needed def generate_profile_candidates( - self, inspector: Inspector, threshold_time: datetime.datetime + self, + inspector: Inspector, + threshold_time: Optional[datetime.datetime], + schema: str, ) -> Optional[List[str]]: raise NotImplementedError() @@ -1366,15 +1369,21 @@ def loop_profiler_requests( tables_seen: Set[str] = set() profile_candidates = None # Default value if profile candidates not available. - if sql_config.profiling.profile_if_updated_since_days is not None: + if ( + sql_config.profiling.profile_if_updated_since_days is not None + or sql_config.profiling.profile_table_size_limit is not None + or sql_config.profiling.profile_table_row_limit is None + ): try: - threshold_time: datetime.datetime = datetime.datetime.now( - datetime.timezone.utc - ) - datetime.timedelta( - sql_config.profiling.profile_if_updated_since_days # type:ignore - ) + threshold_time: Optional[datetime.datetime] = None + if sql_config.profiling.profile_if_updated_since_days is not None: + threshold_time = datetime.datetime.now( + datetime.timezone.utc + ) - datetime.timedelta( + sql_config.profiling.profile_if_updated_since_days + ) profile_candidates = self.generate_profile_candidates( - inspector, threshold_time + inspector, threshold_time, schema ) except NotImplementedError: logger.debug("Source does not support generating profile candidates.")