From 1e57de7e95bf7528d37a48aad82d07b9dce762cd Mon Sep 17 00:00:00 2001 From: treff7es Date: Sun, 12 Feb 2023 23:45:25 +0100 Subject: [PATCH 1/6] - Fix for not clearing bigquery table cache which caused to carry over tables from one project to the other - Fixing range partition profiling - Fixing peak memory report --- .../src/datahub/ingestion/run/pipeline.py | 10 +- .../ingestion/source/bigquery_v2/bigquery.py | 282 +++++++++--------- .../source/bigquery_v2/bigquery_schema.py | 43 ++- .../ingestion/source/bigquery_v2/profiler.py | 24 +- 4 files changed, 197 insertions(+), 162 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index e2a8157b0434c..111fdc1d53382 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -123,15 +123,15 @@ class CliReport(Report): py_version: str = sys.version py_exec_path: str = sys.executable os_details: str = platform.platform() - _peek_memory_usage: int = 0 + _peak_memory_usage: int = 0 def compute_stats(self) -> None: mem_usage = psutil.Process(os.getpid()).memory_info().rss - if self._peek_memory_usage < mem_usage: - self._peek_memory_usage = mem_usage - self.peek_memory_usage = humanfriendly.format_size(self._peek_memory_usage) + if self._peak_memory_usage < mem_usage: + self._peak_memory_usage = mem_usage + self.peak_memory_usage = humanfriendly.format_size(self._peak_memory_usage) - self.mem_info = humanfriendly.format_size(self._peek_memory_usage) + self.mem_info = humanfriendly.format_size(mem_usage) return super().compute_stats() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 9eff33cf6db5d..dd8f8db1fc034 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -212,13 +212,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): self.usage_extractor = BigQueryUsageExtractor(config, self.report) # Currently caching using instance variables - # TODO - rewrite cache for readability or use out of the box solution - self.db_tables: Dict[str, List[BigqueryTable]] = {} - self.db_views: Dict[str, List[BigqueryView]] = {} - - self.schema_columns: Dict[ - Tuple[str, str], Optional[Dict[str, List[BigqueryColumn]]] - ] = {} # Create and register the stateful ingestion use-case handler. self.stale_entity_removal_handler = StaleEntityRemovalHandler( @@ -534,6 +527,9 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: def _process_project( self, conn: bigquery.Client, bigquery_project: BigqueryProject ) -> Iterable[MetadataWorkUnit]: + db_tables: Dict[str, List[BigqueryTable]] = {} + db_views: Dict[str, List[BigqueryView]] = {} + project_id = bigquery_project.id yield from self.gen_project_id_containers(project_id) @@ -572,7 +568,10 @@ def _process_project( self.report.report_dropped(f"{bigquery_dataset.name}.*") continue try: - yield from self._process_schema(conn, project_id, bigquery_dataset) + # db_tables and db_views are populated in the this method + yield from self._process_schema( + conn, project_id, bigquery_dataset, db_tables, db_views + ) except Exception as e: error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission? The error was: {e}" if self.config.profiling.enabled: @@ -608,7 +607,9 @@ def _process_project( end_time_millis=datetime_to_ts_millis(self.config.end_time), ) self.report.set_project_state(project_id, "Lineage Extraction") - yield from self.generate_lineage(project_id) + yield from self.generate_lineage( + project_id, db_tables=db_tables, db_views=db_views + ) if self.config.include_usage_statistics: if ( @@ -631,20 +632,27 @@ def _process_project( ) self.report.set_project_state(project_id, "Usage Extraction") - yield from self.generate_usage_statistics(project_id) + yield from self.generate_usage_statistics( + project_id, db_tables=db_tables, db_views=db_views + ) if self.config.profiling.enabled: logger.info(f"Starting profiling project {project_id}") self.report.set_project_state(project_id, "Profiling") yield from self.profiler.get_workunits( project_id=project_id, - tables=self.db_tables, + tables=db_tables, ) - def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: + def generate_lineage( + self, + project_id: str, + db_tables: Dict[str, List[BigqueryTable]], + db_views: Dict[str, List[BigqueryView]], + ) -> Iterable[MetadataWorkUnit]: logger.info(f"Generate lineage for {project_id}") - for dataset in self.db_tables: - for table in self.db_tables[dataset]: + for dataset in db_tables: + for table in db_tables[dataset]: dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name) lineage_info = self.lineage_extractor.get_upstream_lineage_info( project_id=project_id, @@ -654,8 +662,8 @@ def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: ) if lineage_info: yield from self.gen_lineage(dataset_urn, lineage_info) - for dataset in self.db_views: - for view in self.db_views[dataset]: + for dataset in db_views.keys(): + for view in db_views[dataset]: dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name) lineage_info = self.lineage_extractor.get_upstream_lineage_info( project_id=project_id, @@ -665,29 +673,39 @@ def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: ) yield from self.gen_lineage(dataset_urn, lineage_info) - def generate_usage_statistics(self, project_id: str) -> Iterable[MetadataWorkUnit]: + def generate_usage_statistics( + self, + project_id: str, + db_tables: Dict[str, List[BigqueryTable]], + db_views: Dict[str, List[BigqueryView]], + ) -> Iterable[MetadataWorkUnit]: logger.info(f"Generate usage for {project_id}") tables: Dict[str, List[str]] = defaultdict() - for dataset in self.db_tables.keys(): + for dataset in db_tables.keys(): tables[dataset] = [ BigqueryTableIdentifier( project_id, dataset, table.name ).get_table_name() - for table in self.db_tables[dataset] + for table in db_tables[dataset] ] - for dataset in self.db_views.keys(): + for dataset in db_views.keys(): tables[dataset].extend( [ BigqueryTableIdentifier( project_id, dataset, view.name ).get_table_name() - for view in self.db_views[dataset] + for view in db_views[dataset] ] ) yield from self.usage_extractor.generate_usage_for_project(project_id, tables) def _process_schema( - self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset + self, + conn: bigquery.Client, + project_id: str, + bigquery_dataset: BigqueryDataset, + db_tables: Dict[str, List[BigqueryTable]], + db_views: Dict[str, List[BigqueryView]], ) -> Iterable[MetadataWorkUnit]: dataset_name = bigquery_dataset.name @@ -704,8 +722,11 @@ def _process_schema( ) if self.config.include_tables: - tables = self.get_tables_for_dataset(conn, project_id, dataset_name) - for table in tables: + db_tables[dataset_name] = self.get_tables_for_dataset( + conn, project_id, dataset_name + ) + + for table in db_tables[dataset_name]: table_columns = columns.get(table.name, []) if columns else [] yield from self._process_table( @@ -713,11 +734,11 @@ def _process_schema( ) if self.config.include_views: - bigquery_dataset.views = self.get_views_for_dataset( + db_views[dataset_name] = self.get_views_for_dataset( conn, project_id, dataset_name ) - for view in bigquery_dataset.views: + for view in db_views[dataset_name]: view_columns = columns.get(view.name, []) if columns else [] yield from self._process_view( view, view_columns, project_id, dataset_name @@ -768,12 +789,12 @@ def _process_table( ) # If table has time partitioning, set the data type of the partitioning field - if table.time_partitioning: - table.time_partitioning.column = next( + if table.partition_info: + table.partition_info.column = next( ( column for column in columns - if column.name == table.time_partitioning.field + if column.name == table.partition_info.field ), None, ) @@ -820,8 +841,8 @@ def gen_table_dataset_workunits( if table.expires: custom_properties["expiration_date"] = str(table.expires) - if table.time_partitioning: - custom_properties["time_partitioning"] = str(table.time_partitioning) + if table.partition_info: + custom_properties["partition_info"] = str(table.partition_info) if table.size_in_bytes: custom_properties["size_in_bytes"] = str(table.size_in_bytes) @@ -1096,106 +1117,74 @@ def get_tables_for_dataset( project_id: str, dataset_name: str, ) -> List[BigqueryTable]: - bigquery_tables: Optional[List[BigqueryTable]] = self.db_tables.get( - dataset_name, [] - ) + bigquery_tables: Optional[List[BigqueryTable]] = [] # In bigquery there is no way to query all tables in a Project id - if not bigquery_tables: - with PerfTimer() as timer: - bigquery_tables = [] - table_count: int = 0 - table_items: Dict[str, TableListItem] = {} - # Dict to store sharded table and the last seen max shard id - sharded_tables: Dict[str, TableListItem] = defaultdict() - # Partitions view throw exception if we try to query partition info for too many tables - # so we have to limit the number of tables we query partition info. - # The conn.list_tables returns table infos that information_schema doesn't contain and this - # way we can merge that info with the queried one. - # https://cloud.google.com/bigquery/docs/information-schema-partitions - for table in conn.list_tables(f"{project_id}.{dataset_name}"): - table_identifier = BigqueryTableIdentifier( - project_id=project_id, - dataset=dataset_name, - table=table.table_id, - ) + with PerfTimer() as timer: + bigquery_tables = [] + table_count: int = 0 + table_items: Dict[str, TableListItem] = {} + # Dict to store sharded table and the last seen max shard id + sharded_tables: Dict[str, TableListItem] = defaultdict() + # Partitions view throw exception if we try to query partition info for too many tables + # so we have to limit the number of tables we query partition info. + # The conn.list_tables returns table infos that information_schema doesn't contain and this + # way we can merge that info with the queried one. + # https://cloud.google.com/bigquery/docs/information-schema-partitions + for table in conn.list_tables(f"{project_id}.{dataset_name}"): + table_identifier = BigqueryTableIdentifier( + project_id=project_id, + dataset=dataset_name, + table=table.table_id, + ) - _, shard = BigqueryTableIdentifier.get_table_and_shard( - table_identifier.raw_table_name() - ) - table_name = table_identifier.get_table_name().split(".")[-1] - - # For sharded tables we only process the latest shard - # which has the highest date in the table name. - # Sharded tables look like: table_20220120 - # We only has one special case where the table name is a date - # in this case we merge all these tables under dataset name as table name. - # For example some_dataset.20220110 will be turned to some_dataset.some_dataset - # It seems like there are some bigquery user who uses this way the tables. - if shard: - if not sharded_tables.get(table_identifier.get_table_name()): - # When table is only a shard we use dataset_name as table_name - sharded_tables[table_name] = table - continue - else: - stored_table_identifier = BigqueryTableIdentifier( - project_id=project_id, - dataset=dataset_name, - table=sharded_tables[table_name].table_id, - ) - ( - _, - stored_shard, - ) = BigqueryTableIdentifier.get_table_and_shard( - stored_table_identifier.raw_table_name() - ) - # When table is none, we use dataset_name as table_name - table_name = table_identifier.get_table_name().split(".")[ - -1 - ] - assert stored_shard - if stored_shard < shard: - sharded_tables[table_name] = table - continue + _, shard = BigqueryTableIdentifier.get_table_and_shard( + table_identifier.raw_table_name() + ) + table_name = table_identifier.get_table_name().split(".")[-1] + + # For sharded tables we only process the latest shard + # which has the highest date in the table name. + # Sharded tables look like: table_20220120 + # We only has one special case where the table name is a date + # in this case we merge all these tables under dataset name as table name. + # For example some_dataset.20220110 will be turned to some_dataset.some_dataset + # It seems like there are some bigquery user who uses this way the tables. + if shard: + if not sharded_tables.get(table_identifier.get_table_name()): + # When table is only a shard we use dataset_name as table_name + sharded_tables[table_name] = table + continue else: - table_count = table_count + 1 - table_items[table.table_id] = table - - if str(table_identifier).startswith( - self.config.temp_table_dataset_prefix - ): - logger.debug( - f"Dropping temporary table {table_identifier.table}" + stored_table_identifier = BigqueryTableIdentifier( + project_id=project_id, + dataset=dataset_name, + table=sharded_tables[table_name].table_id, ) - self.report.report_dropped(table_identifier.raw_table_name()) - continue - - if ( - table_count % self.config.number_of_datasets_process_in_batch - == 0 - ): - bigquery_tables.extend( - BigQueryDataDictionary.get_tables_for_dataset( - conn, - project_id, - dataset_name, - table_items, - with_data_read_permission=self.config.profiling.enabled, - ) + ( + _, + stored_shard, + ) = BigqueryTableIdentifier.get_table_and_shard( + stored_table_identifier.raw_table_name() ) - table_items.clear() - - # Sharded tables don't have partition keys, so it is safe to add to the list as - # it should not affect the number of tables will be touched in the partitions system view. - # Because we have the batched query of get_tables_for_dataset to makes sure - # we won't hit too many tables queried with partitions system view. - # The key in the map is the actual underlying table name and not the friendly name and - # that's why we need to get the actual table names and not the normalized ones. - table_items.update( - {value.table_id: value for value in sharded_tables.values()} - ) - - if table_items: + # When table is none, we use dataset_name as table_name + table_name = table_identifier.get_table_name().split(".")[-1] + assert stored_shard + if stored_shard < shard: + sharded_tables[table_name] = table + continue + else: + table_count = table_count + 1 + table_items[table.table_id] = table + + if str(table_identifier).startswith( + self.config.temp_table_dataset_prefix + ): + logger.debug(f"Dropping temporary table {table_identifier.table}") + self.report.report_dropped(table_identifier.raw_table_name()) + continue + + if table_count % self.config.number_of_datasets_process_in_batch == 0: bigquery_tables.extend( BigQueryDataDictionary.get_tables_for_dataset( conn, @@ -1205,17 +1194,34 @@ def get_tables_for_dataset( with_data_read_permission=self.config.profiling.enabled, ) ) + table_items.clear() + + # Sharded tables don't have partition keys, so it is safe to add to the list as + # it should not affect the number of tables will be touched in the partitions system view. + # Because we have the batched query of get_tables_for_dataset to makes sure + # we won't hit too many tables queried with partitions system view. + # The key in the map is the actual underlying table name and not the friendly name and + # that's why we need to get the actual table names and not the normalized ones. + table_items.update( + {value.table_id: value for value in sharded_tables.values()} + ) - self.db_tables[dataset_name] = bigquery_tables - - self.report.metadata_extraction_sec[ - f"{project_id}.{dataset_name}" - ] = round(timer.elapsed_seconds(), 2) + if table_items: + bigquery_tables.extend( + BigQueryDataDictionary.get_tables_for_dataset( + conn, + project_id, + dataset_name, + table_items, + with_data_read_permission=self.config.profiling.enabled, + ) + ) - return bigquery_tables + self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round( + timer.elapsed_seconds(), 2 + ) - # Some schema may not have any table - return self.db_tables.get(dataset_name, []) + return bigquery_tables def get_views_for_dataset( self, @@ -1223,13 +1229,11 @@ def get_views_for_dataset( project_id: str, dataset_name: str, ) -> List[BigqueryView]: - views = self.db_views.get(dataset_name, []) + views: List[BigqueryView] = [] - if not views: - views = BigQueryDataDictionary.get_views_for_dataset( - conn, project_id, dataset_name, self.config.profiling.enabled - ) - self.db_views[dataset_name] = views + views = BigQueryDataDictionary.get_views_for_dataset( + conn, project_id, dataset_name, self.config.profiling.enabled + ) return views def add_config_to_report(self): diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 48731cac18500..130f6e1bf02fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -2,7 +2,7 @@ from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Dict, List, Optional, cast +from typing import Any, Dict, List, Optional, cast from google.cloud import bigquery from google.cloud.bigquery.table import ( @@ -24,6 +24,9 @@ class BigqueryColumn(BaseColumn): is_partition_column: bool +RANGE_PARTITION_NAME: str = "RANGE" + + @dataclass class PartitionInfo: field: str @@ -39,12 +42,40 @@ def from_time_partitioning( cls, time_partitioning: TimePartitioning ) -> "PartitionInfo": return cls( - field=time_partitioning.field, + field=time_partitioning.field + if time_partitioning.field + else "_PARTITIONTIME", type=time_partitioning.type_, expiration_ms=time_partitioning.expiration_ms, require_partition_filter=time_partitioning.require_partition_filter, ) + @classmethod + def from_range_partitioning( + cls, range_partitioning: Dict[str, Any] + ) -> Optional["PartitionInfo"]: + field: Optional[str] = range_partitioning.get("field") + if not field: + return None + + return cls( + field=field, + type="RANGE", + ) + + @classmethod + def from_table_info(cls, table_info: TableListItem) -> Optional["PartitionInfo"]: + RANGE_PARTITIONING_KEY: str = "rangePartitioning" + + if table_info.time_partitioning: + return PartitionInfo.from_time_partitioning(table_info.time_partitioning) + elif RANGE_PARTITIONING_KEY in table_info._properties: + return PartitionInfo.from_range_partitioning( + table_info._properties[RANGE_PARTITIONING_KEY] + ) + else: + return None + @dataclass class BigqueryTable(BaseTable): @@ -56,7 +87,7 @@ class BigqueryTable(BaseTable): max_shard_id: Optional[str] = None active_billable_bytes: Optional[int] = None long_term_billable_bytes: Optional[int] = None - time_partitioning: Optional[PartitionInfo] = None + partition_info: Optional[PartitionInfo] = None columns_ignore_from_profiling: List[str] = field(default_factory=list) @@ -365,10 +396,8 @@ def get_tables_for_dataset( ddl=table.ddl, expires=tables[table.table_name].expires if tables else None, labels=tables[table.table_name].labels if tables else None, - time_partitioning=PartitionInfo.from_time_partitioning( - tables[table.table_name].time_partitioning - ) - if tables and tables[table.table_name].time_partitioning + partition_info=PartitionInfo.from_table_info(tables[table.table_name]) + if tables else None, clustering_fields=tables[table.table_name].clustering_fields if tables diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 86ae0e7252de2..3af3c5acaa2a5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -11,7 +11,10 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report -from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryTable +from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( + RANGE_PARTITION_NAME, + BigqueryTable, +) from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest from datahub.ingestion.source.sql.sql_generic_profiler import ( GenericProfiler, @@ -87,13 +90,13 @@ def generate_partition_profiler_query( f"generate partition profiler query for project: {project} schema: {schema} and table {table.name}, partition_datetime: {partition_datetime}" ) partition = table.max_partition_id - if partition: + if table.partition_info and partition: partition_where_clause: str - if not table.time_partitioning: - if table.time_partitioning and table.time_partitioning.column: + if table.partition_info.type == RANGE_PARTITION_NAME: + if table.partition_info and table.partition_info.column: partition_where_clause = ( - f"{table.time_partitioning.column.name} >= {partition}" + f"{table.partition_info.column.name} >= {partition}" ) else: logger.warning( @@ -120,18 +123,17 @@ def generate_partition_profiler_query( ] = partition return None, None - # ingestion time partitoned tables partition column is not in the schema, so we default to TIMESTAMP type - if not table.time_partitioning.column: + if not table.partition_info.column: logger.warning( f"Partitioned table {table.name} without partition column, it seems like a bug in our extraction" ) return None, None - if table.time_partitioning.type in ("HOUR", "DAY", "MONTH", "YEAR"): - partition_where_clause = f"{table.time_partitioning.column.data_type}(`{table.time_partitioning.field}`) BETWEEN {table.time_partitioning.column.data_type}('{partition_datetime}') AND {table.time_partitioning.column.data_type}('{upper_bound_partition_datetime}')" + if table.partition_info.type in ("HOUR", "DAY", "MONTH", "YEAR"): + partition_where_clause = f"{table.partition_info.column.data_type}(`{table.partition_info.field}`) BETWEEN {table.partition_info.column.data_type}('{partition_datetime}') AND {table.partition_info.column.data_type}('{upper_bound_partition_datetime}')" else: logger.warning( - f"Not supported partition type {table.time_partitioning.type}" + f"Not supported partition type {table.partition_info.type}" ) return None, None custom_sql = """ @@ -258,7 +260,7 @@ def get_bigquery_profile_request( project, dataset, table, self.config.profiling.partition_datetime ) - if partition is None and table.time_partitioning: + if partition is None and table.partition_info: self.report.report_warning( "profile skipped as partitioned table is empty or partition id was invalid", dataset_name, From a4da49b527c5cdad01b40615a822dc42e398e888 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 13 Feb 2023 11:30:21 +0100 Subject: [PATCH 2/6] PR review fixes Adding tests --- .../ingestion/source/bigquery_v2/bigquery.py | 28 +-- .../ingestion/source/bigquery_v2/profiler.py | 18 +- .../tests/unit/test_bigquery_profiler.py | 210 ++++++++++++++++++ .../tests/unit/test_bigquery_source.py | 85 ++++++- 4 files changed, 312 insertions(+), 29 deletions(-) create mode 100644 metadata-ingestion/tests/unit/test_bigquery_profiler.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index dd8f8db1fc034..acc4925985258 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -211,8 +211,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): self.lineage_extractor = BigqueryLineageExtractor(config, self.report) self.usage_extractor = BigQueryUsageExtractor(config, self.report) - # Currently caching using instance variables - # Create and register the stateful ingestion use-case handler. self.stale_entity_removal_handler = StaleEntityRemovalHandler( source=self, @@ -498,12 +496,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) return except Exception as e: + trace = traceback.format_exc() logger.error( f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}" ) + logger.error(trace) self.report.report_failure( "metadata-extraction", - f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}", + f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e} Stacktrace: {trace}", ) return None @@ -572,6 +572,7 @@ def _process_project( yield from self._process_schema( conn, project_id, bigquery_dataset, db_tables, db_views ) + except Exception as e: error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission? The error was: {e}" if self.config.profiling.enabled: @@ -651,7 +652,7 @@ def generate_lineage( db_views: Dict[str, List[BigqueryView]], ) -> Iterable[MetadataWorkUnit]: logger.info(f"Generate lineage for {project_id}") - for dataset in db_tables: + for dataset in db_tables.keys(): for table in db_tables[dataset]: dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name) lineage_info = self.lineage_extractor.get_upstream_lineage_info( @@ -1143,15 +1144,15 @@ def get_tables_for_dataset( ) table_name = table_identifier.get_table_name().split(".")[-1] - # For sharded tables we only process the latest shard - # which has the highest date in the table name. # Sharded tables look like: table_20220120 - # We only has one special case where the table name is a date + # For sharded tables we only process the latest shard and ignore the rest + # to find the latest shard we iterate over the list of tables and store the maximum shard id + # We only has one special case where the table name is a date `20220110` # in this case we merge all these tables under dataset name as table name. # For example some_dataset.20220110 will be turned to some_dataset.some_dataset - # It seems like there are some bigquery user who uses this way the tables. + # It seems like there are some bigquery user who uses this non-standard way of sharding the tables. if shard: - if not sharded_tables.get(table_identifier.get_table_name()): + if not sharded_tables.get(table_name): # When table is only a shard we use dataset_name as table_name sharded_tables[table_name] = table continue @@ -1168,7 +1169,6 @@ def get_tables_for_dataset( stored_table_identifier.raw_table_name() ) # When table is none, we use dataset_name as table_name - table_name = table_identifier.get_table_name().split(".")[-1] assert stored_shard if stored_shard < shard: sharded_tables[table_name] = table @@ -1217,11 +1217,11 @@ def get_tables_for_dataset( ) ) - self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round( - timer.elapsed_seconds(), 2 - ) + self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round( + timer.elapsed_seconds(), 2 + ) - return bigquery_tables + return bigquery_tables def get_views_for_dataset( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 3af3c5acaa2a5..71afceb4d3261 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -79,7 +79,7 @@ def generate_partition_profiler_query( project: str, schema: str, table: BigqueryTable, - partition_datetime: Optional[datetime], + partition_datetime: Optional[datetime] = None, ) -> Tuple[Optional[str], Optional[str]]: """ Method returns partition id if table is partitioned or sharded and generate custom partition query for @@ -123,14 +123,16 @@ def generate_partition_profiler_query( ] = partition return None, None - if not table.partition_info.column: - logger.warning( - f"Partitioned table {table.name} without partition column, it seems like a bug in our extraction" - ) - return None, None - + partition_data_type: str = "TIMESTAMP" + # Ingestion time partitioned tables has a pseudo column called _PARTITIONTIME + # See more about this at + # https://cloud.google.com/bigquery/docs/partitioned-tables#ingestion_time + partition_column_name = "_PARTITIONTIME" + if table.partition_info.column: + partition_column_name = table.partition_info.column.name + partition_data_type = table.partition_info.column.data_type if table.partition_info.type in ("HOUR", "DAY", "MONTH", "YEAR"): - partition_where_clause = f"{table.partition_info.column.data_type}(`{table.partition_info.field}`) BETWEEN {table.partition_info.column.data_type}('{partition_datetime}') AND {table.partition_info.column.data_type}('{upper_bound_partition_datetime}')" + partition_where_clause = f"{partition_data_type}(`{partition_column_name}`) BETWEEN {partition_data_type}('{partition_datetime}') AND {partition_data_type}('{upper_bound_partition_datetime}')" else: logger.warning( f"Not supported partition type {table.partition_info.type}" diff --git a/metadata-ingestion/tests/unit/test_bigquery_profiler.py b/metadata-ingestion/tests/unit/test_bigquery_profiler.py new file mode 100644 index 0000000000000..b947832051c2d --- /dev/null +++ b/metadata-ingestion/tests/unit/test_bigquery_profiler.py @@ -0,0 +1,210 @@ +from datetime import datetime, timezone + +from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config +from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report +from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( + BigqueryColumn, + BigqueryTable, + PartitionInfo, +) +from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler + + +def test_not_generate_partition_profiler_query_if_not_partitioned_sharded_table(): + profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) + test_table = BigqueryTable( + name="test_table", + comment="test_comment", + rows_count=1, + size_in_bytes=1, + last_altered=datetime.now(timezone.utc), + created=datetime.now(timezone.utc), + ) + query = profiler.generate_partition_profiler_query( + project="test_project", + schema="test_dataset", + table=test_table, + partition_datetime=None, + ) + + assert query == (None, None) + + +def test_generate_day_partitioned_partition_profiler_query(): + column = BigqueryColumn( + name="date", + field_path="date", + ordinal_position=1, + data_type="TIMESTAMP", + is_partition_column=True, + comment=None, + is_nullable=False, + ) + partition_info = PartitionInfo(type="DAY", field="date", column=column) + profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) + test_table = BigqueryTable( + name="test_table", + comment="test_comment", + rows_count=1, + size_in_bytes=1, + last_altered=datetime.now(timezone.utc), + created=datetime.now(timezone.utc), + partition_info=partition_info, + max_partition_id="20200101", + ) + query = profiler.generate_partition_profiler_query( + project="test_project", + schema="test_dataset", + table=test_table, + ) + expected_query = """ +SELECT + * +FROM + `test_project.test_dataset.test_table` +WHERE + TIMESTAMP(`date`) BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00') +""".strip() + + assert "20200101" == query[0] + assert query[1] + assert expected_query == query[1].strip() + + +# If partition time is passed in we force to use that time instead of the max partition id +def test_generate_day_partitioned_partition_profiler_query_with_set_partition_time(): + column = BigqueryColumn( + name="date", + field_path="date", + ordinal_position=1, + data_type="TIMESTAMP", + is_partition_column=True, + comment=None, + is_nullable=False, + ) + partition_info = PartitionInfo(type="DAY", field="date", column=column) + profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) + test_table = BigqueryTable( + name="test_table", + comment="test_comment", + rows_count=1, + size_in_bytes=1, + last_altered=datetime.now(timezone.utc), + created=datetime.now(timezone.utc), + partition_info=partition_info, + max_partition_id="20200101", + ) + query = profiler.generate_partition_profiler_query( + project="test_project", + schema="test_dataset", + table=test_table, + ) + expected_query = """ +SELECT + * +FROM + `test_project.test_dataset.test_table` +WHERE + TIMESTAMP(`date`) BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00') +""".strip() + + assert "20200101" == query[0] + assert query[1] + assert expected_query == query[1].strip() + + +def test_generate_hour_partitioned_partition_profiler_query(): + column = BigqueryColumn( + name="partition_column", + field_path="partition_column", + ordinal_position=1, + data_type="TIMESTAMP", + is_partition_column=True, + comment=None, + is_nullable=False, + ) + partition_info = PartitionInfo(type="DAY", field="date", column=column) + profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) + test_table = BigqueryTable( + name="test_table", + comment="test_comment", + rows_count=1, + size_in_bytes=1, + last_altered=datetime.now(timezone.utc), + created=datetime.now(timezone.utc), + partition_info=partition_info, + max_partition_id="2020010103", + ) + query = profiler.generate_partition_profiler_query( + project="test_project", + schema="test_dataset", + table=test_table, + partition_datetime=None, + ) + expected_query = """ +SELECT + * +FROM + `test_project.test_dataset.test_table` +WHERE + TIMESTAMP(`partition_column`) BETWEEN TIMESTAMP('2020-01-01 03:00:00') AND TIMESTAMP('2020-01-01 04:00:00') +""".strip() + + assert "2020010103" == query[0] + assert query[1] + assert expected_query == query[1].strip() + + +# Ingestion partitioned tables do not have partition column in the schema as it uses a psudo column _PARTITIONTIME to partition +def test_generate_ingestion_partitioned_partition_profiler_query(): + partition_info = PartitionInfo(type="DAY", field="date") + profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) + test_table = BigqueryTable( + name="test_table", + comment="test_comment", + rows_count=1, + size_in_bytes=1, + last_altered=datetime.now(timezone.utc), + created=datetime.now(timezone.utc), + partition_info=partition_info, + max_partition_id="20200101", + ) + query = profiler.generate_partition_profiler_query( + project="test_project", + schema="test_dataset", + table=test_table, + ) + expected_query = """ +SELECT + * +FROM + `test_project.test_dataset.test_table` +WHERE + TIMESTAMP(`_PARTITIONTIME`) BETWEEN TIMESTAMP('2020-01-01 00:00:00') AND TIMESTAMP('2020-01-02 00:00:00') +""".strip() + + assert "20200101" == query[0] + assert query[1] + assert expected_query == query[1].strip() + + +def test_generate_sharded_table_profiler_query(): + profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report()) + test_table = BigqueryTable( + name="my_sharded_table", + max_shard_id="20200101", + comment="test_comment", + rows_count=1, + size_in_bytes=1, + last_altered=datetime.now(timezone.utc), + created=datetime.now(timezone.utc), + ) + query = profiler.generate_partition_profiler_query( + project="test_project", + schema="test_dataset", + table=test_table, + ) + + assert "20200101" == query[0] + assert query[1] + assert query[1] is None diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 88efd3fc871e4..604270ebb3b8b 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -1,5 +1,9 @@ import json import os +from typing import Dict +from unittest.mock import patch + +from google.cloud.bigquery.table import TableListItem from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.bigquery_v2.bigquery import BigqueryV2Source @@ -89,7 +93,7 @@ def test_simple_upstream_table_generation(): } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - source.lineage_extractor.lineage_metadata = {str(a): set([str(b)])} + source.lineage_extractor.lineage_metadata = {str(a): {str(b)}} upstreams = source.lineage_extractor.get_upstream_tables(str(a), []) assert list(upstreams) == [b] @@ -112,7 +116,7 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream(): } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - source.lineage_extractor.lineage_metadata = {str(a): set([str(b)])} + source.lineage_extractor.lineage_metadata = {str(a): {str(b)}} upstreams = source.lineage_extractor.get_upstream_tables(str(a), []) assert list(upstreams) == [] @@ -143,8 +147,8 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream(): ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) source.lineage_extractor.lineage_metadata = { - str(a): set([str(b)]), - str(b): set([str(c)]), + str(a): {str(b)}, + str(b): {str(c)}, } upstreams = source.lineage_extractor.get_upstream_tables(str(a), []) assert list(upstreams) == [c] @@ -184,9 +188,76 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) source.lineage_extractor.lineage_metadata = { - str(a): set([str(b)]), - str(b): set([str(c), str(d)]), - str(d): set([str(e)]), + str(a): {str(b)}, + str(b): {str(c), str(d)}, + str(d): {str(e)}, } upstreams = source.lineage_extractor.get_upstream_tables(str(a), []) assert list(upstreams).sort() == [c, e].sort() + + +@patch( + "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_tables_for_dataset" +) +@patch("google.cloud.bigquery.client.Client") +def test_table_processing_logic(client_mock, data_dictionary_mock): + config = BigQueryV2Config.parse_obj( + { + "project_id": "test-project", + } + ) + + tableListItems = [ + TableListItem( + { + "tableReference": { + "projectId": "test-project", + "datasetId": "test-dataset", + "tableId": "test-table", + } + } + ), + TableListItem( + { + "tableReference": { + "projectId": "test-project", + "datasetId": "test-dataset", + "tableId": "test-sharded-table_20220102", + } + } + ), + TableListItem( + { + "tableReference": { + "projectId": "test-project", + "datasetId": "test-dataset", + "tableId": "test-sharded-table_20210101", + } + } + ), + TableListItem( + { + "tableReference": { + "projectId": "test-project", + "datasetId": "test-dataset", + "tableId": "test-sharded-table_20220101", + } + } + ), + ] + + client_mock.list_tables.return_value = tableListItems + data_dictionary_mock.get_tables_for_dataset.return_value = None + + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) + + _ = source.get_tables_for_dataset( + conn=client_mock, project_id="test-project", dataset_name="test-dataset" + ) + + assert data_dictionary_mock.call_count == 1 + tables: Dict[str, TableListItem] = data_dictionary_mock.call_args.args[ + 3 + ] # alternatively + for table in tables.keys(): + assert table in ["test-table", "test-sharded-table_20220102"] From 82f4e329946927c5b037dfce56c542883a436d87 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 13 Feb 2023 14:21:42 +0100 Subject: [PATCH 3/6] Fixing test --- metadata-ingestion/tests/unit/test_bigquery_profiler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/test_bigquery_profiler.py b/metadata-ingestion/tests/unit/test_bigquery_profiler.py index b947832051c2d..a2aec8df93d09 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_profiler.py +++ b/metadata-ingestion/tests/unit/test_bigquery_profiler.py @@ -206,5 +206,4 @@ def test_generate_sharded_table_profiler_query(): ) assert "20200101" == query[0] - assert query[1] assert query[1] is None From 1aa953ab6b50e59f0eb31a867939a329bfc98b55 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 13 Feb 2023 15:20:11 +0100 Subject: [PATCH 4/6] Fixing error printing --- .../datahub/ingestion/source/bigquery_v2/usage.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index ac1c690df26f4..806d8b7af59d8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -2,6 +2,7 @@ import logging import textwrap import time +import traceback from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Union, cast @@ -243,8 +244,9 @@ def generate_usage_for_project( yield from self.get_workunits(aggregated_info) except Exception as e: self.report.usage_failed_extraction.append(project_id) + trace = traceback.format_exc() logger.error( - f"Error getting usage for project {project_id} due to error {e}" + f"Error getting usage for project {project_id} due to error {e}, trace: {trace}" ) def _get_bigquery_log_entries_via_exported_bigquery_audit_metadata( @@ -270,8 +272,7 @@ def _get_bigquery_log_entries_via_exported_bigquery_audit_metadata( except Exception as e: logger.warning( - f"Encountered exception retrieving AuditLogEntries for project {client.project}", - e, + f"Encountered exception retrieving AuditLogEntries for project {client.project} - {e}" ) self.report.report_failure( "lineage-extraction", @@ -375,8 +376,7 @@ def _get_bigquery_log_entries_via_gcp_logging( except Exception as e: logger.warning( - f"Encountered exception retrieving AuditLogEntires for project {client.project}", - e, + f"Encountered exception retrieving AuditLogEntires for project {client.project} - {e}" ) self.report.report_failure( "usage-extraction", @@ -774,7 +774,7 @@ def _aggregate_enriched_read_events( str(event.read_event.resource), f"Failed to clean up resource, {e}" ) logger.warning( - f"Failed to process event {str(event.read_event.resource)}", e + f"Failed to process event {str(event.read_event.resource)} - {e}" ) return datasets From 55cba69863c14635e8c4eac0079573eaba481fca Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 13 Feb 2023 16:09:13 +0100 Subject: [PATCH 5/6] Adding a 3.7 compatible arg check --- metadata-ingestion/tests/unit/test_bigquery_source.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 604270ebb3b8b..c79fa0950457a 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -256,7 +256,9 @@ def test_table_processing_logic(client_mock, data_dictionary_mock): ) assert data_dictionary_mock.call_count == 1 - tables: Dict[str, TableListItem] = data_dictionary_mock.call_args.args[ + + # args only available from python 3.8 and that's why call_args_list is sooo ugly + tables: Dict[str, TableListItem] = data_dictionary_mock.call_args_list[0][0][ 3 ] # alternatively for table in tables.keys(): From 9f0a32b69ba4ed099c312ee307c61695ec4ef44a Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 13 Feb 2023 19:06:59 +0100 Subject: [PATCH 6/6] Lower memory usage of bigquery lineage extraction --- .../ingestion/source/bigquery_v2/bigquery.py | 6 ++- .../ingestion/source/bigquery_v2/lineage.py | 50 +++++++++++-------- .../tests/unit/test_bigquery_source.py | 24 ++++++--- 3 files changed, 50 insertions(+), 30 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index acc4925985258..654f684a31b48 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -652,6 +652,7 @@ def generate_lineage( db_views: Dict[str, List[BigqueryView]], ) -> Iterable[MetadataWorkUnit]: logger.info(f"Generate lineage for {project_id}") + lineage = self.lineage_extractor.calculate_lineage_for_project(project_id) for dataset in db_tables.keys(): for table in db_tables[dataset]: dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name) @@ -660,6 +661,7 @@ def generate_lineage( dataset_name=dataset, table=table, platform=self.platform, + lineage_metadata=lineage, ) if lineage_info: yield from self.gen_lineage(dataset_urn, lineage_info) @@ -671,8 +673,10 @@ def generate_lineage( dataset_name=dataset, table=view, platform=self.platform, + lineage_metadata=lineage, ) - yield from self.gen_lineage(dataset_urn, lineage_info) + if lineage_info: + yield from self.gen_lineage(dataset_urn, lineage_info) def generate_usage_statistics( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 9b4b3e861aed3..ae8233dcdec5b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -1,7 +1,6 @@ import collections import logging import textwrap -from collections import defaultdict from typing import Dict, Iterable, List, Optional, Set, Tuple, Union import humanfriendly @@ -82,7 +81,6 @@ class BigqueryLineageExtractor: def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report): self.config = config self.report = report - self.lineage_metadata: Dict[str, Set[str]] = defaultdict(set) self.loaded_project_ids: List[str] = [] def error(self, log: logging.Logger, key: str, reason: str) -> None: @@ -663,10 +661,13 @@ def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[str]]: return lineage_metadata def get_upstream_tables( - self, bq_table: str, tables_seen: List[str] = [] + self, + bq_table: str, + lineage_metadata: Dict[str, Set[str]], + tables_seen: List[str] = [], ) -> Set[BigQueryTableRef]: upstreams: Set[BigQueryTableRef] = set() - for ref_table in self.lineage_metadata[str(bq_table)]: + for ref_table in lineage_metadata[str(bq_table)]: upstream_table = BigQueryTableRef.from_string_name(ref_table) if upstream_table.is_temporary_table( [self.config.temp_table_dataset_prefix] @@ -678,56 +679,63 @@ def get_upstream_tables( ) continue tables_seen.append(ref_table) - if ref_table in self.lineage_metadata: + if ref_table in lineage_metadata: upstreams = upstreams.union( - self.get_upstream_tables(ref_table, tables_seen=tables_seen) + self.get_upstream_tables( + ref_table, + lineage_metadata=lineage_metadata, + tables_seen=tables_seen, + ) ) else: upstreams.add(upstream_table) return upstreams + def calculate_lineage_for_project(self, project_id: str) -> Dict[str, Set[str]]: + with PerfTimer() as timer: + lineage = self._compute_bigquery_lineage(project_id) + + self.report.lineage_extraction_sec[project_id] = round( + timer.elapsed_seconds(), 2 + ) + + return lineage + def get_upstream_lineage_info( self, project_id: str, dataset_name: str, table: Union[BigqueryTable, BigqueryView], platform: str, + lineage_metadata: Dict[str, Set[str]], ) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]: table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name) - if table_identifier.project_id not in self.loaded_project_ids: - with PerfTimer() as timer: - self.lineage_metadata.update( - self._compute_bigquery_lineage(table_identifier.project_id) - ) - self.report.lineage_extraction_sec[table_identifier.project_id] = round( - timer.elapsed_seconds(), 2 - ) - self.loaded_project_ids.append(table_identifier.project_id) - if self.config.lineage_parse_view_ddl and isinstance(table, BigqueryView): for table_id in self.parse_view_lineage(project_id, dataset_name, table): - if table_identifier.get_table_name() in self.lineage_metadata: - self.lineage_metadata[ + if table_identifier.get_table_name() in lineage_metadata: + lineage_metadata[ str( BigQueryTableRef(table_identifier).get_sanitized_table_ref() ) ].add(str(BigQueryTableRef(table_id).get_sanitized_table_ref())) else: - self.lineage_metadata[ + lineage_metadata[ str( BigQueryTableRef(table_identifier).get_sanitized_table_ref() ) ] = {str(BigQueryTableRef(table_id).get_sanitized_table_ref())} bq_table = BigQueryTableRef.from_bigquery_table(table_identifier) - if str(bq_table) in self.lineage_metadata: + if str(bq_table) in lineage_metadata: upstream_list: List[UpstreamClass] = [] # Sorting the list of upstream lineage events in order to avoid creating multiple aspects in backend # even if the lineage is same but the order is different. for upstream_table in sorted( - self.get_upstream_tables(str(bq_table), tables_seen=[]) + self.get_upstream_tables( + str(bq_table), lineage_metadata, tables_seen=[] + ) ): upstream_table_class = UpstreamClass( mce_builder.make_dataset_urn_with_platform_instance( diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index c79fa0950457a..5512d235cda5d 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -93,8 +93,10 @@ def test_simple_upstream_table_generation(): } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - source.lineage_extractor.lineage_metadata = {str(a): {str(b)}} - upstreams = source.lineage_extractor.get_upstream_tables(str(a), []) + lineage_metadata = {str(a): {str(b)}} + upstreams = source.lineage_extractor.get_upstream_tables( + str(a), lineage_metadata, [] + ) assert list(upstreams) == [b] @@ -116,8 +118,10 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream(): } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - source.lineage_extractor.lineage_metadata = {str(a): {str(b)}} - upstreams = source.lineage_extractor.get_upstream_tables(str(a), []) + lineage_metadata = {str(a): {str(b)}} + upstreams = source.lineage_extractor.get_upstream_tables( + str(a), lineage_metadata, [] + ) assert list(upstreams) == [] @@ -146,11 +150,13 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream(): } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - source.lineage_extractor.lineage_metadata = { + lineage_metadata = { str(a): {str(b)}, str(b): {str(c)}, } - upstreams = source.lineage_extractor.get_upstream_tables(str(a), []) + upstreams = source.lineage_extractor.get_upstream_tables( + str(a), lineage_metadata, [] + ) assert list(upstreams) == [c] @@ -187,12 +193,14 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - source.lineage_extractor.lineage_metadata = { + lineage_metadata = { str(a): {str(b)}, str(b): {str(c), str(d)}, str(d): {str(e)}, } - upstreams = source.lineage_extractor.get_upstream_tables(str(a), []) + upstreams = source.lineage_extractor.get_upstream_tables( + str(a), lineage_metadata, [] + ) assert list(upstreams).sort() == [c, e].sort()