Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): support size, rowcount, lastmodified based table selection for profiling #5329

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,17 @@ class GEProfilingConfig(ConfigModel):

profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field(
default=1,
description="Profile table only if it has been updated since these many number of days. `None` implies profile all tables. Only Snowflake supports this.",
description="Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `Snowflake` and `BigQuery`.",
)

profile_table_size_limit: Optional[int] = Field(
default=1,
description="Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `BigQuery`",
)

profile_table_row_limit: Optional[int] = Field(
default=50000,
description="Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `BigQuery`",
)

# The default of (5 * cpu_count) is adopted from the default max_workers
Expand Down
52 changes: 52 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,58 @@ def _make_gcp_logging_client(
else:
return [GCPLoggingClient(**client_options)]

def generate_profile_candidates(
self,
inspector: Inspector,
threshold_time: Optional[datetime.datetime],
schema: str,
) -> Optional[List[str]]:
row_condition = (
f"row_count<{self.config.profiling.profile_table_row_limit} and "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first half of this method (line 442 - 471) is worth refactoring and writing a unit test for.
Depending on various settings, what are the query strings generated?

if self.config.profiling.profile_table_row_limit
else ""
)
size_condition = (
f"ROUND(size_bytes/POW(10,9),2)<{self.config.profiling.profile_table_size_limit} and "
if self.config.profiling.profile_table_size_limit
else ""
)
time_condition = (
f"last_modified_time>={round(threshold_time.timestamp() * 1000)} and "
if threshold_time
else ""
)
c = f"{row_condition}{size_condition}{time_condition}"
profile_clause = c if c == "" else f" WHERE {c}"[:-4]
if profile_clause == "":
return None
project_id = self.get_db_name(inspector)
_client: BigQueryClient = BigQueryClient(project=project_id)
query = (
f"SELECT "
f"table_id, "
f"size_bytes, "
f"last_modified_time, "
f"row_count, "
f"FROM {schema}.__TABLES__"
f"{profile_clause}"
)
logger.debug(f"Profiling via {query}")
query_job = _client.query(query)
_profile_candidates = []
for row in query_job:
_profile_candidates.append(
self.get_identifier(
schema=schema,
entity=row.table_id,
inspector=inspector,
)
)
logger.debug(
f"Generated profiling candidates for {schema}: {_profile_candidates}"
)
return _profile_candidates

def _get_bigquery_log_entries(
self,
clients: List[GCPLoggingClient],
Expand Down
18 changes: 13 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self, config: SnowflakeConfig, ctx: PipelineContext):
self.report: SnowflakeReport = SnowflakeReport()
self.config: SnowflakeConfig = config
self.provision_role_in_progress: bool = False
self.profile_candidates: Dict[str, List[str]] = {}

@classmethod
def create(cls, config_dict, ctx):
Expand Down Expand Up @@ -754,11 +755,18 @@ def _is_dataset_allowed(
return True

def generate_profile_candidates(
self, inspector: Inspector, threshold_time: datetime
) -> List[str]:
self, inspector: Inspector, threshold_time: Optional[datetime], schema: str
) -> Optional[List[str]]:
if threshold_time is None:
return None
db_name = self.current_database
if self.profile_candidates.get(db_name) is not None:
# snowflake profile candidates are available at database level,
# no need to regenerate for every schema
return self.profile_candidates[db_name]
self.report.profile_if_updated_since = threshold_time
_profile_candidates = []

logger.debug(f"Generating profiling candidates for db {db_name}")
db_rows = inspector.engine.execute(
text(
"""
Expand All @@ -771,7 +779,6 @@ def generate_profile_candidates(
)
)

db_name = self.current_database
for db_row in db_rows:
_profile_candidates.append(
self.get_identifier(
Expand All @@ -780,8 +787,9 @@ def generate_profile_candidates(
inspector=inspector,
).lower()
)
logger.debug(f"Generating profiling candidates for db {db_name}")

self.report.profile_candidates[db_name] = _profile_candidates
self.profile_candidates[db_name] = _profile_candidates
return _profile_candidates

# Stateful Ingestion specific overrides
Expand Down
25 changes: 17 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,10 @@ def is_table_partitioned(

# Override if needed
def generate_profile_candidates(
self, inspector: Inspector, threshold_time: datetime.datetime
self,
inspector: Inspector,
threshold_time: Optional[datetime.datetime],
schema: str,
) -> Optional[List[str]]:
raise NotImplementedError()

Expand Down Expand Up @@ -1366,15 +1369,21 @@ def loop_profiler_requests(

tables_seen: Set[str] = set()
profile_candidates = None # Default value if profile candidates not available.
if sql_config.profiling.profile_if_updated_since_days is not None:
if (
sql_config.profiling.profile_if_updated_since_days is not None
or sql_config.profiling.profile_table_size_limit is not None
or sql_config.profiling.profile_table_row_limit is None
):
try:
threshold_time: datetime.datetime = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(
sql_config.profiling.profile_if_updated_since_days # type:ignore
)
threshold_time: Optional[datetime.datetime] = None
if sql_config.profiling.profile_if_updated_since_days is not None:
threshold_time = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(
sql_config.profiling.profile_if_updated_since_days
)
profile_candidates = self.generate_profile_candidates(
inspector, threshold_time
inspector, threshold_time, schema
)
except NotImplementedError:
logger.debug("Source does not support generating profile candidates.")
Expand Down