Skip to content

Commit

Permalink
feat(bigquery): support size, rowcount, lastmodified based table sele…
Browse files Browse the repository at this point in the history
…ction for profiling (datahub-project#5329)

Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
2 people authored and maggiehays committed Aug 1, 2022
1 parent c56541b commit b67aacd
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,17 @@ class GEProfilingConfig(ConfigModel):

profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field(
default=1,
description="Profile table only if it has been updated since these many number of days. `None` implies profile all tables. Only Snowflake supports this.",
description="Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `Snowflake` and `BigQuery`.",
)

profile_table_size_limit: Optional[int] = Field(
default=1,
description="Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `BigQuery`",
)

profile_table_row_limit: Optional[int] = Field(
default=50000,
description="Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `BigQuery`",
)

# The default of (5 * cpu_count) is adopted from the default max_workers
Expand Down
52 changes: 52 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,58 @@ def _make_gcp_logging_client(
else:
return [GCPLoggingClient(**client_options)]

def generate_profile_candidates(
self,
inspector: Inspector,
threshold_time: Optional[datetime.datetime],
schema: str,
) -> Optional[List[str]]:
row_condition = (
f"row_count<{self.config.profiling.profile_table_row_limit} and "
if self.config.profiling.profile_table_row_limit
else ""
)
size_condition = (
f"ROUND(size_bytes/POW(10,9),2)<{self.config.profiling.profile_table_size_limit} and "
if self.config.profiling.profile_table_size_limit
else ""
)
time_condition = (
f"last_modified_time>={round(threshold_time.timestamp() * 1000)} and "
if threshold_time
else ""
)
c = f"{row_condition}{size_condition}{time_condition}"
profile_clause = c if c == "" else f" WHERE {c}"[:-4]
if profile_clause == "":
return None
project_id = self.get_db_name(inspector)
_client: BigQueryClient = BigQueryClient(project=project_id)
query = (
f"SELECT "
f"table_id, "
f"size_bytes, "
f"last_modified_time, "
f"row_count, "
f"FROM {schema}.__TABLES__"
f"{profile_clause}"
)
logger.debug(f"Profiling via {query}")
query_job = _client.query(query)
_profile_candidates = []
for row in query_job:
_profile_candidates.append(
self.get_identifier(
schema=schema,
entity=row.table_id,
inspector=inspector,
)
)
logger.debug(
f"Generated profiling candidates for {schema}: {_profile_candidates}"
)
return _profile_candidates

def _get_bigquery_log_entries(
self,
clients: List[GCPLoggingClient],
Expand Down
18 changes: 13 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self, config: SnowflakeConfig, ctx: PipelineContext):
self.report: SnowflakeReport = SnowflakeReport()
self.config: SnowflakeConfig = config
self.provision_role_in_progress: bool = False
self.profile_candidates: Dict[str, List[str]] = {}

@classmethod
def create(cls, config_dict, ctx):
Expand Down Expand Up @@ -754,11 +755,18 @@ def _is_dataset_allowed(
return True

def generate_profile_candidates(
self, inspector: Inspector, threshold_time: datetime
) -> List[str]:
self, inspector: Inspector, threshold_time: Optional[datetime], schema: str
) -> Optional[List[str]]:
if threshold_time is None:
return None
db_name = self.current_database
if self.profile_candidates.get(db_name) is not None:
# snowflake profile candidates are available at database level,
# no need to regenerate for every schema
return self.profile_candidates[db_name]
self.report.profile_if_updated_since = threshold_time
_profile_candidates = []

logger.debug(f"Generating profiling candidates for db {db_name}")
db_rows = inspector.engine.execute(
text(
"""
Expand All @@ -771,7 +779,6 @@ def generate_profile_candidates(
)
)

db_name = self.current_database
for db_row in db_rows:
_profile_candidates.append(
self.get_identifier(
Expand All @@ -780,8 +787,9 @@ def generate_profile_candidates(
inspector=inspector,
).lower()
)
logger.debug(f"Generating profiling candidates for db {db_name}")

self.report.profile_candidates[db_name] = _profile_candidates
self.profile_candidates[db_name] = _profile_candidates
return _profile_candidates

# Stateful Ingestion specific overrides
Expand Down
25 changes: 17 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,10 @@ def is_table_partitioned(

# Override if needed
def generate_profile_candidates(
self, inspector: Inspector, threshold_time: datetime.datetime
self,
inspector: Inspector,
threshold_time: Optional[datetime.datetime],
schema: str,
) -> Optional[List[str]]:
raise NotImplementedError()

Expand Down Expand Up @@ -1366,15 +1369,21 @@ def loop_profiler_requests(

tables_seen: Set[str] = set()
profile_candidates = None # Default value if profile candidates not available.
if sql_config.profiling.profile_if_updated_since_days is not None:
if (
sql_config.profiling.profile_if_updated_since_days is not None
or sql_config.profiling.profile_table_size_limit is not None
or sql_config.profiling.profile_table_row_limit is None
):
try:
threshold_time: datetime.datetime = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(
sql_config.profiling.profile_if_updated_since_days # type:ignore
)
threshold_time: Optional[datetime.datetime] = None
if sql_config.profiling.profile_if_updated_since_days is not None:
threshold_time = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(
sql_config.profiling.profile_if_updated_since_days
)
profile_candidates = self.generate_profile_candidates(
inspector, threshold_time
inspector, threshold_time, schema
)
except NotImplementedError:
logger.debug("Source does not support generating profile candidates.")
Expand Down

0 comments on commit b67aacd

Please sign in to comment.