Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(bigquery): add changelog and unittest for profiling limits #5407

Merged
merged 4 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- The `should_overwrite` flag in `csv-enricher` has been replaced with `write_semantics` to match the format used for other sources. See the [documentation](https://datahubproject.io/docs/generated/ingestion/sources/csv/) for more details
- Closing an authorization hole in creating tags adding a Platform Privilege called `Create Tags` for creating tags. This is assigned to `datahub` root user, along
with default All Users policy. Notice: You may need to add this privilege (or `Manage Tags`) to existing users that need the ability to create tags on the platform.
- #5329 Below profiling config parameters are now supported in `BigQuery`:
- profiling.profile_if_updated_since_days (default=1)
- profiling.profile_table_size_limit (default=1GB)
- profiling.profile_table_row_limit (default=50000)

Set above parameters to `null` if you want older behaviour.

### Potential Downtime

### Deprecations
Expand Down
67 changes: 37 additions & 30 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
where REGEXP_CONTAINS(table_name, r'^\\d{{{date_length}}}$')
""".strip()


# The existing implementation of this method can be found here:
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/base.py#L1018-L1025.
# The existing implementation does not use the schema parameter and hence
Expand Down Expand Up @@ -442,12 +443,21 @@ def _make_gcp_logging_client(
else:
return [GCPLoggingClient(**client_options)]

def generate_profile_candidates(
self,
inspector: Inspector,
threshold_time: Optional[datetime.datetime],
schema: str,
) -> Optional[List[str]]:
@staticmethod
def get_all_schema_tables_query(schema: str) -> str:
base_query = (
f"SELECT "
f"table_id, "
f"size_bytes, "
f"last_modified_time, "
f"row_count, "
f"FROM {schema}.__TABLES__"
)
return base_query

def generate_profile_candidate_query(
self, threshold_time: Optional[datetime.datetime], schema: str
) -> str:
row_condition = (
f"row_count<{self.config.profiling.profile_table_row_limit} and "
if self.config.profiling.profile_table_row_limit
Expand All @@ -466,52 +476,49 @@ def generate_profile_candidates(
c = f"{row_condition}{size_condition}{time_condition}"
profile_clause = c if c == "" else f" WHERE {c}"[:-4]
if profile_clause == "":
return None
return ""
query = f"{self.get_all_schema_tables_query(schema)}{profile_clause}"
logger.debug(f"Profiling via {query}")
return query

def generate_profile_candidates(
self,
inspector: Inspector,
threshold_time: Optional[datetime.datetime],
schema: str,
) -> Optional[List[str]]:
storage_project_id = self.get_multiproject_project_id(inspector)
exec_project_id = self.get_multiproject_project_id(
inspector, run_on_compute=True
)
_client: BigQueryClient = BigQueryClient(project=exec_project_id)

full_schema_name = f"{storage_project_id}.{schema}"
# Reading all tables' metadata to report
base_query = (
f"SELECT "
f"table_id, "
f"size_bytes, "
f"last_modified_time, "
f"row_count, "
f"FROM {storage_project_id}.{schema}.__TABLES__"
)
all_tables = _client.query(base_query)
all_tables = _client.query(self.get_all_schema_tables_query(full_schema_name))
report_tables: List[str] = [
"table_id, size_bytes, last_modified_time, row_count"
]
for table_row in all_tables:
report_tables.append(
f"{table_row.table_id}, {table_row.size_bytes}, {table_row.last_modified_time}, {table_row.row_count}"
)
report_key = f"{self._get_project_id(inspector)}.{schema}"
report_key = f"{self._get_project_id(inspector)}.{full_schema_name}"
self.report.table_metadata[report_key] = report_tables

query = self.generate_profile_candidate_query(threshold_time, full_schema_name)
self.report.profile_table_selection_criteria[report_key] = (
"no constraint" if profile_clause == "" else profile_clause.lstrip(" WHERE")
"no constraint" if query == "" else query.split(" WHERE")[1]
)
if query == "":
return None

# reading filtered tables. TODO: remove this call and apply local filtering on above query results.
query = (
f"SELECT "
f"table_id, "
f"size_bytes, "
f"last_modified_time, "
f"row_count, "
f"FROM {storage_project_id}.{schema}.__TABLES__"
f"{profile_clause}"
)
logger.debug(f"Profiling via {query}")
query_job = _client.query(query)
_profile_candidates = []
for row in query_job:
_profile_candidates.append(
self.get_identifier(
schema=schema,
schema=full_schema_name,
entity=row.table_id,
inspector=inspector,
)
Expand Down
161 changes: 156 additions & 5 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from datetime import datetime

import pytest

Expand All @@ -10,7 +11,6 @@


def test_bigquery_uri():

config = BigQueryConfig.parse_obj(
{
"project_id": "test-project",
Expand All @@ -20,7 +20,6 @@ def test_bigquery_uri():


def test_bigquery_uri_with_credential():

expected_credential_json = {
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
Expand Down Expand Up @@ -67,7 +66,6 @@ def test_bigquery_uri_with_credential():


def test_simple_upstream_table_generation():

a: BigQueryTableRef = BigQueryTableRef(
project="test-project", dataset="test-dataset", table="a"
)
Expand Down Expand Up @@ -97,7 +95,6 @@ def test_error_on_missing_config():


def test_upstream_table_generation_with_temporary_table_without_temp_upstream():

a: BigQueryTableRef = BigQueryTableRef(
project="test-project", dataset="test-dataset", table="a"
)
Expand Down Expand Up @@ -143,7 +140,6 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream():


def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream():

a: BigQueryTableRef = BigQueryTableRef(
project="test-project", dataset="test-dataset", table="a"
)
Expand Down Expand Up @@ -173,3 +169,158 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr
}
upstreams = source.get_upstream_tables(str(a), [])
assert list(upstreams).sort() == [c, e].sort()


def test_bq_get_profile_candidate_query_all_params():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": 1,
"profile_table_size_limit": 5,
"profile_table_row_limit": 50000,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
threshold_time = datetime.fromtimestamp(1648876349)
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"row_count<50000 and ROUND(size_bytes/POW(10,9),2)<5 and last_modified_time>=1648876349000 "
)
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_no_day_limit():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": None,
"profile_table_size_limit": 5,
"profile_table_row_limit": 50000,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"row_count<50000 and ROUND(size_bytes/POW(10,9),2)<5 "
)
query = source.generate_profile_candidate_query(None, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_no_size_limit():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": 1,
"profile_table_size_limit": None,
"profile_table_row_limit": 50000,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
threshold_time = datetime.fromtimestamp(1648876349)
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"row_count<50000 and last_modified_time>=1648876349000 "
)
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_no_row_limit():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": 1,
"profile_table_size_limit": 5,
"profile_table_row_limit": None,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
threshold_time = datetime.fromtimestamp(1648876349)
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"ROUND(size_bytes/POW(10,9),2)<5 and last_modified_time>=1648876349000 "
)
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_all_null():

config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": None,
"profile_table_size_limit": None,
"profile_table_row_limit": None,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
expected_query = ""
query = source.generate_profile_candidate_query(None, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_only_row():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": None,
"profile_table_size_limit": None,
"profile_table_row_limit": 50000,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"row_count<50000 "
)
query = source.generate_profile_candidate_query(None, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_only_days():
config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": 1,
"profile_table_size_limit": None,
"profile_table_row_limit": None,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
threshold_time = datetime.fromtimestamp(1648876349)
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"last_modified_time>=1648876349000 "
)
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
assert query == expected_query


def test_bq_get_profile_candidate_query_only_size():

config = BigQueryConfig.parse_obj(
{
"profiling": {
"profile_if_updated_since_days": None,
"profile_table_size_limit": 5,
"profile_table_row_limit": None,
}
}
)
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
expected_query = (
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE "
"ROUND(size_bytes/POW(10,9),2)<5 "
)
query = source.generate_profile_candidate_query(None, "dataset_foo")
assert query == expected_query