Skip to content

Commit

Permalink
feat(ingest): bigquery - option to set on behalf project (#6660)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Dec 8, 2022
1 parent b219f08 commit 729e486
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
BigqueryTable,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.common import get_bigquery_client
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
Expand Down Expand Up @@ -228,10 +229,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigqueryV2Source":
config = BigQueryV2Config.parse_obj(config_dict)
return cls(ctx, config)

def get_bigquery_client(self) -> bigquery.Client:
client_options = self.config.extra_client_options
return bigquery.Client(**client_options)

@staticmethod
def connectivity_test(client: bigquery.Client) -> CapabilityReport:
ret = client.query("select 1")
Expand All @@ -244,12 +241,12 @@ def connectivity_test(client: bigquery.Client) -> CapabilityReport:

@staticmethod
def metada_read_capability_test(
project_ids: List[str], profiling_enabled: bool
project_ids: List[str], config: BigQueryV2Config
) -> CapabilityReport:
for project_id in project_ids:
try:
logger.info((f"Metadata read capability test for project {project_id}"))
client: bigquery.Client = bigquery.Client(project_id)
client: bigquery.Client = get_bigquery_client(config)
assert client
result = BigQueryDataDictionary.get_datasets_for_project_id(
client, project_id, 10
Expand All @@ -264,7 +261,7 @@ def metada_read_capability_test(
project_id=project_id,
dataset_name=result[0].name,
tables={},
with_data_read_permission=profiling_enabled,
with_data_read_permission=config.profiling.enabled,
)
if len(tables) == 0:
return CapabilityReport(
Expand Down Expand Up @@ -333,7 +330,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
pydantic.Extra.allow
) # we are okay with extra fields during this stage
connection_conf = BigQueryV2Config.parse_obj(config_dict)
client: bigquery.Client = bigquery.Client()
client: bigquery.Client = get_bigquery_client(connection_conf)
assert client

test_report.basic_connectivity = BigqueryV2Source.connectivity_test(client)
Expand All @@ -350,7 +347,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
project_ids.append(project.project_id)

metada_read_capability = BigqueryV2Source.metada_read_capability_test(
project_ids, connection_conf.profiling.enabled
project_ids, connection_conf
)
if SourceCapability.SCHEMA_METADATA not in _report:
_report[SourceCapability.SCHEMA_METADATA] = metada_read_capability
Expand Down Expand Up @@ -493,7 +490,7 @@ def add_table_to_dataset_container(

def get_workunits(self) -> Iterable[WorkUnit]:
logger.info("Getting projects")
conn: bigquery.Client = self.get_bigquery_client()
conn: bigquery.Client = get_bigquery_client(self.config)
self.add_config_to_report()

projects: List[BigqueryProject]
Expand All @@ -503,12 +500,26 @@ def get_workunits(self) -> Iterable[WorkUnit]:
)
projects = [project]
else:
projects = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.warning(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
try:
projects = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.error(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)
return
except Exception as e:
logger.error(
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}"
)
self.report.report_failure(
"metadata-extraction",
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}",
)
return
return None

for project_id in projects:
if not self.config.project_id_pattern.allowed(project_id.id):
Expand Down Expand Up @@ -543,8 +554,13 @@ def _process_project(
BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id)
)
except Exception as e:
logger.error(
f"Unable to get datasets for project {project_id}, skipping. The error was: {e}"
error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}"
if self.config.profiling.enabled:
error_message = f"Unable to get datasets for project {project_id}, skipping. Does your service account has bigquery.datasets.get permission? The error was: {e}"
logger.error(error_message)
self.report.report_failure(
"metadata-extraction",
f"{project_id} - {error_message}",
)
return None

Expand All @@ -565,60 +581,66 @@ def _process_project(
try:
yield from self._process_schema(conn, project_id, bigquery_dataset)
except Exception as e:
error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission? The error was: {e}"
if self.config.profiling.enabled:
error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission, bigquery.tables.getData permission? The error was: {e}"

trace = traceback.format_exc()
logger.error(trace)
logger.error(
f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. The error was: {e}"
logger.error(error_message)
self.report.report_failure(
"metadata-extraction",
f"{project_id}.{bigquery_dataset.name} - {error_message}",
)
continue

if self.config.include_table_lineage:
logger.info(f"Generate lineage for {project_id}")
for dataset in self.db_tables[project_id]:
for table in self.db_tables[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=table,
platform=self.platform,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)

for dataset in self.db_views[project_id]:
for view in self.db_views[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=view,
platform=self.platform,
)
yield from self.gen_lineage(dataset_urn, lineage_info)
yield from self.generate_lineage(project_id)

if self.config.include_usage_statistics:
logger.info(f"Generate usage for {project_id}")
tables: Dict[str, List[str]] = {}

for dataset in self.db_tables[project_id]:
yield from self.generate_usage_statistics(project_id)

def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate lineage for {project_id}")
for dataset in self.db_tables[project_id]:
for table in self.db_tables[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=table,
platform=self.platform,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)
for dataset in self.db_views[project_id]:
for view in self.db_views[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=view,
platform=self.platform,
)
yield from self.gen_lineage(dataset_urn, lineage_info)

def generate_usage_statistics(self, project_id: str) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate usage for {project_id}")
tables: Dict[str, List[str]] = {}
for dataset in self.db_tables[project_id]:
tables[dataset] = [
table.name for table in self.db_tables[project_id][dataset]
]
for dataset in self.db_views[project_id]:
if not tables[dataset]:
tables[dataset] = [
table.name for table in self.db_tables[project_id][dataset]
table.name for table in self.db_views[project_id][dataset]
]

for dataset in self.db_views[project_id]:
if not tables[dataset]:
tables[dataset] = [
table.name for table in self.db_views[project_id][dataset]
]
else:
tables[dataset].extend(
[table.name for table in self.db_views[project_id][dataset]]
)

yield from self.usage_extractor.generate_usage_for_project(
project_id, tables
)
else:
tables[dataset].extend(
[table.name for table in self.db_views[project_id][dataset]]
)
yield from self.usage_extractor.generate_usage_for_project(project_id, tables)

def _process_schema(
self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
default=None,
description="[deprecated] Use project_id_pattern instead. You can use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account",
)

project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account..",
)

storage_project_id: None = Field(default=None, hidden_from_schema=True)

lineage_use_sql_parser: bool = Field(
Expand Down Expand Up @@ -126,3 +132,12 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:

def get_table_pattern(self, pattern: List[str]) -> str:
return "|".join(pattern) if self.table_pattern else ""

# TODO: remove run_on_compute when the legacy bigquery source will be deprecated
def get_sql_alchemy_url(self, run_on_compute: bool = False) -> str:
if self.project_on_behalf:
return f"bigquery://{self.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Any, Dict, Optional

from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient

from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config

BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
BQ_DATE_SHARD_FORMAT = "%Y%m%d"

Expand All @@ -17,3 +20,17 @@ def _make_gcp_logging_client(
return GCPLoggingClient(**client_options, project=project_id)
else:
return GCPLoggingClient(**client_options)


def get_bigquery_client(config: BigQueryV2Config) -> bigquery.Client:
client_options = config.extra_client_options
return bigquery.Client(config.project_on_behalf, **client_options)


def get_sql_alchemy_url(config: BigQueryV2Config) -> str:
if config.project_on_behalf:
return f"bigquery://{config.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ def _get_bigquery_log_entries_via_exported_bigquery_audit_metadata(
e,
)
self.report.report_failure(
f"{client.project}", f"unable to retrieve log entries {e}"
"lineage-extraction",
f"{client.project} - unable to retrieve log entries {e}",
)

def _get_exported_bigquery_audit_metadata(
Expand Down Expand Up @@ -367,7 +368,8 @@ def _get_bigquery_log_entries_via_gcp_logging(
e,
)
self.report.report_failure(
f"{client.project}", f"unable to retrive log entrires {e}"
"usage-extraction",
f"{client.project} - unable to retrive log entrires {e}",
)

def _generate_filter(self, audit_templates: Dict[str, str]) -> str:
Expand Down Expand Up @@ -622,10 +624,8 @@ def _parse_bigquery_log_entries(
self.report.num_query_events += 1

if event is None:
self.error(
logger,
f"{entry.log_name}-{entry.insert_id}",
f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}",
logger.warning(
f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}"
)
else:
yield event
Expand Down Expand Up @@ -664,10 +664,8 @@ def _parse_exported_bigquery_audit_metadata(
else:
self.error(
logger,
f"{audit_metadata['logName']}-{audit_metadata['insertId']}",
f"Unable to parse audit metadata missing "
f"QueryEvent keys:{str(missing_query_event_exported_audit)},"
f" ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}",
"usage-extraction",
f"{audit_metadata['logName']}-{audit_metadata['insertId']} Unable to parse audit metadata missing QueryEvent keys:{str(missing_query_event_exported_audit)} ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}",
)

def error(self, log: logging.Logger, key: str, reason: str) -> Any:
Expand Down

0 comments on commit 729e486

Please sign in to comment.