Skip to content

Commit

Permalink
docs(bigquery): add changelog and unittest for profiling limits (data…
Browse files Browse the repository at this point in the history
  • Loading branch information
MugdhaHardikar-GSLab authored and maggiehays committed Aug 1, 2022
1 parent cfd2437 commit dc52079
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 35 deletions.
6 changes: 6 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- The `should_overwrite` flag in `csv-enricher` has been replaced with `write_semantics` to match the format used for other sources. See the [documentation](https://datahubproject.io/docs/generated/ingestion/sources/csv/) for more details
- Closing an authorization hole in creating tags adding a Platform Privilege called `Create Tags` for creating tags. This is assigned to `datahub` root user, along
with default All Users policy. Notice: You may need to add this privilege (or `Manage Tags`) to existing users that need the ability to create tags on the platform.
- #5329 Below profiling config parameters are now supported in `BigQuery`:
- profiling.profile_if_updated_since_days (default=1)
- profiling.profile_table_size_limit (default=1GB)
- profiling.profile_table_row_limit (default=50000)

Set above parameters to `null` if you want older behaviour.

### Potential Downtime

### Deprecations
Expand Down
67 changes: 37 additions & 30 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
where REGEXP_CONTAINS(table_name, r'^\\d{{{date_length}}}$')
""".strip()


# The existing implementation of this method can be found here:
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/base.py#L1018-L1025.
# The existing implementation does not use the schema parameter and hence
Expand Down Expand Up @@ -442,12 +443,21 @@ def _make_gcp_logging_client(
else:
return [GCPLoggingClient(**client_options)]

def generate_profile_candidates(
self,
inspector: Inspector,
threshold_time: Optional[datetime.datetime],
schema: str,
) -> Optional[List[str]]:
@staticmethod
def get_all_schema_tables_query(schema: str) -> str:
base_query = (
f"SELECT "
f"table_id, "
f"size_bytes, "
f"last_modified_time, "
f"row_count, "
f"FROM {schema}.__TABLES__"
)
return base_query

def generate_profile_candidate_query(
self, threshold_time: Optional[datetime.datetime], schema: str
) -> str:
row_condition = (
f"row_count<{self.config.profiling.profile_table_row_limit} and "
if self.config.profiling.profile_table_row_limit
Expand All @@ -466,52 +476,49 @@ def generate_profile_candidates(
c = f"{row_condition}{size_condition}{time_condition}"
profile_clause = c if c == "" else f" WHERE {c}"[:-4]
if profile_clause == "":
return None
return ""
query = f"{self.get_all_schema_tables_query(schema)}{profile_clause}"
logger.debug(f"Profiling via {query}")
return query

def generate_profile_candidates(
self,
inspector: Inspector,
threshold_time: Optional[datetime.datetime],
schema: str,
) -> Optional[List[str]]:
storage_project_id = self.get_multiproject_project_id(inspector)
exec_project_id = self.get_multiproject_project_id(
inspector, run_on_compute=True
)
_client: BigQueryClient = BigQueryClient(project=exec_project_id)

full_schema_name = f"{storage_project_id}.{schema}"
# Reading all tables' metadata to report
base_query = (
f"SELECT "
f"table_id, "
f"size_bytes, "
f"last_modified_time, "
f"row_count, "
f"FROM {storage_project_id}.{schema}.__TABLES__"
)
all_tables = _client.query(base_query)
all_tables = _client.query(self.get_all_schema_tables_query(full_schema_name))
report_tables: List[str] = [
"table_id, size_bytes, last_modified_time, row_count"
]
for table_row in all_tables:
report_tables.append(
f"{table_row.table_id}, {table_row.size_bytes}, {table_row.last_modified_time}, {table_row.row_count}"
)
report_key = f"{self._get_project_id(inspector)}.{schema}"
report_key = f"{self._get_project_id(inspector)}.{full_schema_name}"
self.report.table_metadata[report_key] = report_tables

query = self.generate_profile_candidate_query(threshold_time, full_schema_name)
self.report.profile_table_selection_criteria[report_key] = (
"no constraint" if profile_clause == "" else profile_clause.lstrip(" WHERE")
"no constraint" if query == "" else query.split(" WHERE")[1]
)
if query == "":
return None

# reading filtered tables. TODO: remove this call and apply local filtering on above query results.
query = (
f"SELECT "
f"table_id, "
f"size_bytes, "
f"last_modified_time, "
f"row_count, "
f"FROM {storage_project_id}.{schema}.__TABLES__"
f"{profile_clause}"
)
logger.debug(f"Profiling via {query}")
query_job = _client.query(query)
_profile_candidates = []
for row in query_job:
_profile_candidates.append(
self.get_identifier(
schema=schema,
schema=full_schema_name,
entity=row.table_id,
inspector=inspector,
)
Expand Down
161 changes: 156 additions & 5 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from datetime import datetime

import pytest

Expand All @@ -10,7 +11,6 @@


def test_bigquery_uri():

config = BigQueryConfig.parse_obj(
{
"project_id": "test-project",
Expand All @@ -20,7 +20,6 @@ def test_bigquery_uri():


def test_bigquery_uri_with_credential():

expected_credential_json = {
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
Expand Down Expand Up @@ -67,7 +66,6 @@ def test_bigquery_uri_with_credential():


def test_simple_upstream_table_generation():

a: BigQueryTableRef = BigQueryTableRef(
project="test-project", dataset="test-dataset", table="a"
)
Expand Down Expand Up @@ -97,7 +95,6 @@ def test_error_on_missing_config():


def test_upstream_table_generation_with_temporary_table_without_temp_upstream():

a: BigQueryTableRef = BigQueryTableRef(
project="test-project", dataset="test-dataset", table="a"
)
Expand Down Expand Up @@ -143,7 +140,6 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream():


def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream():

a: BigQueryTableRef = BigQueryTableRef(
project="test-project", dataset="test-dataset", table="a"
)
Expand Down Expand Up @@ -173,3 +169,158 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr
}
upstreams = source.get_upstream_tables(str(a), [])
assert list(upstreams).sort() == [c, e].sort()


def test_bq_get_profile_candidate_query_all_params():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": 1,
"profile_table_size_limit": 5,
"profile_table_row_limit": 50000,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
threshold_time = datetime.fromtimestamp(1648876349)
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"row_count<50000 and ROUND(size_bytes/POW(10,9),2)<5 and last_modified_time>=1648876349000 "
)
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_no_day_limit():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": None,
"profile_table_size_limit": 5,
"profile_table_row_limit": 50000,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"row_count<50000 and ROUND(size_bytes/POW(10,9),2)<5 "
)
query = source.generate_profile_candidate_query(None, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_no_size_limit():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": 1,
"profile_table_size_limit": None,
"profile_table_row_limit": 50000,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
threshold_time = datetime.fromtimestamp(1648876349)
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"row_count<50000 and last_modified_time>=1648876349000 "
)
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_no_row_limit():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": 1,
"profile_table_size_limit": 5,
"profile_table_row_limit": None,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
threshold_time = datetime.fromtimestamp(1648876349)
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"ROUND(size_bytes/POW(10,9),2)<5 and last_modified_time>=1648876349000 "
)
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_all_null():

config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": None,
"profile_table_size_limit": None,
"profile_table_row_limit": None,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
expected_query = ""
query = source.generate_profile_candidate_query(None, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_only_row():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": None,
"profile_table_size_limit": None,
"profile_table_row_limit": 50000,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"row_count<50000 "
)
query = source.generate_profile_candidate_query(None, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_only_days():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": 1,
"profile_table_size_limit": None,
"profile_table_row_limit": None,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
threshold_time = datetime.fromtimestamp(1648876349)
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"last_modified_time>=1648876349000 "
)
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_only_size():

config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": None,
"profile_table_size_limit": 5,
"profile_table_row_limit": None,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"ROUND(size_bytes/POW(10,9),2)<5 "
)
query = source.generate_profile_candidate_query(None, "dataset_foo")
assert query == expected_query

0 comments on commit dc52079

Please sign in to comment.