diff --git a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml index 86f4898d9d502..4210d0599a215 100644 --- a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml +++ b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml @@ -41,6 +41,23 @@ source: # deny: # - "*.*.*" #storage_project_id: project-id-1234567 + ## Lineage with GCS Source + # include_column_lineage_with_gcs: true/false + # gcs_lineage_config: + # path_specs: + # - include: "gs://my-bucket/foo/tests/bar.avro" + # - include: "gs://my-bucket/foo/tests/*.*" + # - include: "gs://my-bucket/foo/tests/{table}/*.avro" + # - include: "gs://my-bucket/foo/tests/{table}/*/*.avro" + # - include: "gs://my-bucket/foo/tests/{table}/*.*" + # - include: "gs://my-bucket/{dept}/tests/{table}/*.avro" + # - include: "gs://my-bucket/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro" + # - include: "gs://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro" + # - include: "gs://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*" + # - include: "gs://my-bucket/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*" + # - include: "gs://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*" + # strip_urls: false + ## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation sink: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index a1bbb9dd6b0b9..76c2fbf48ccab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -309,6 +309,16 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.bq_schema_extractor.table_refs, ) + # Lineage BQ to GCS + if ( + self.config.include_table_lineage + and self.bq_schema_extractor.external_tables + ): + for dataset_urn, table in self.bq_schema_extractor.external_tables.items(): + yield from self.lineage_extractor.gen_lineage_workunits_for_external_table( + dataset_urn, table.ddl, graph=self.ctx.graph + ) + def get_report(self) -> BigQueryV2Report: return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 331b583423093..ad293c702a520 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -21,6 +21,7 @@ from datahub.ingestion.glossary.classification_mixin import ( ClassificationSourceConfigMixin, ) +from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulLineageConfigMixin, @@ -206,6 +207,39 @@ def get_sql_alchemy_url(self) -> str: return "bigquery://" +class GcsLineageProviderConfig(ConfigModel): + """ + Any source that produces gcs lineage from/to Datasets should inherit this class. + """ + + path_specs: List[PathSpec] = Field( + default=[], + description="List of PathSpec. See below the details about PathSpec", + ) + + strip_urls: bool = Field( + default=True, + description="Strip filename from gcs url. It only applies if path_specs are not specified.", + ) + + ignore_non_path_spec_path: bool = Field( + default=False, + description="Ignore paths that are not match in path_specs. It only applies if path_specs are specified.", + ) + + +class GcsDatasetLineageProviderConfigBase(ConfigModel): + """ + Any source that produces gcs lineage from/to Datasets should inherit this class. + This is needeed to group all lineage related configs under `gcs_lineage_config` config property. + """ + + gcs_lineage_config: GcsLineageProviderConfig = Field( + default=GcsLineageProviderConfig(), + description="Common config for gcs lineage generation", + ) + + class BigQueryFilterConfig(SQLFilterConfig): project_ids: List[str] = Field( default_factory=list, @@ -328,6 +362,7 @@ class BigQueryIdentifierConfig( class BigQueryV2Config( + GcsDatasetLineageProviderConfigBase, BigQueryConnectionConfig, BigQueryBaseConfig, BigQueryFilterConfig, @@ -473,6 +508,11 @@ def have_table_data_read_permission(self) -> bool: description="Option to enable/disable lineage generation. Is enabled by default.", ) + include_column_lineage_with_gcs: bool = Field( + default=True, + description="When enabled, column-level lineage will be extracted from the gcs.", + ) + max_query_duration: timedelta = Field( default=timedelta(minutes=15), description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 88c6eb1885f3b..7e8b2931282ff 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -157,6 +157,8 @@ class BigQueryV2Report( num_filtered_query_events: int = 0 num_usage_query_hash_collisions: int = 0 num_operational_stats_workunits_emitted: int = 0 + num_lineage_dropped_gcs_path: int = 0 + snapshots_scanned: int = 0 # view lineage @@ -185,6 +187,7 @@ class BigQueryV2Report( usage_start_time: Optional[datetime] = None usage_end_time: Optional[datetime] = None stateful_usage_ingestion_enabled: bool = False + num_skipped_external_table_lineage: int = 0 queries_extractor: Optional[BigQueryQueriesExtractorReport] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 6361d5f266cb7..4f18c22c108a6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -121,6 +121,7 @@ class BigqueryTable(BaseTable): columns_ignore_from_profiling: List[str] = field(default_factory=list) external: bool = False constraints: List[BigqueryTableConstraint] = field(default_factory=list) + table_type: Optional[str] = None @dataclass @@ -377,6 +378,7 @@ def _make_bigquery_table( return BigqueryTable( name=table.table_name, created=table.created, + table_type=table.table_type, last_altered=( datetime.fromtimestamp( table.get("last_altered") / 1000, tz=timezone.utc diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index dc53e2f74959e..907e5c12e99a1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -204,6 +204,11 @@ def __init__( self.view_definitions: FileBackedDict[str] = FileBackedDict() # Maps snapshot ref -> Snapshot self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict() + # Add External BQ table + self.external_tables: Dict[str, BigqueryTable] = defaultdict() + self.bq_external_table_pattern = ( + r".*create\s+external\s+table\s+`?(?:project_id\.)?.*`?" + ) bq_project = ( self.config.project_on_behalf @@ -957,6 +962,15 @@ def gen_dataset_workunits( project_id, dataset_name, table.name ) + # Added for bigquery to gcs lineage extraction + if ( + isinstance(table, BigqueryTable) + and table.table_type == "EXTERNAL" + and table.ddl is not None + and re.search(self.bq_external_table_pattern, table.ddl, re.IGNORECASE) + ): + self.external_tables[dataset_urn] = table + status = Status(removed=False) yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=status diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index c9d0738bea7dc..b542992a7924a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -1,6 +1,8 @@ import collections import itertools +import json import logging +import re from dataclasses import dataclass from datetime import datetime, timezone from typing import ( @@ -15,17 +17,20 @@ Tuple, Union, ) +from urllib.parse import urlparse import humanfriendly import sqlglot from google.cloud.datacatalog import lineage_v1 from google.cloud.logging_v2.client import Client as GCPLoggingClient +from datahub.api.entities.dataset.dataset import Dataset from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter import mce_builder from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.source_helpers import auto_workunit from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( AuditLogEntry, BigQueryAuditMetadata, @@ -51,16 +56,19 @@ BQ_FILTER_RULE_TEMPLATE_V2_LINEAGE, bigquery_audit_metadata_query_template_lineage, ) +from datahub.ingestion.source.gcs import gcs_utils from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantLineageRunSkipHandler, ) from datahub.ingestion.source_report.ingestion_stage import LINEAGE_EXTRACTION +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata from datahub.metadata.schema_classes import ( AuditStampClass, DatasetLineageTypeClass, FineGrainedLineageClass, FineGrainedLineageDownstreamTypeClass, FineGrainedLineageUpstreamTypeClass, + SchemaMetadataClass, UpstreamClass, UpstreamLineageClass, ) @@ -247,6 +255,7 @@ def __init__( format_queries=True, ) self.report.sql_aggregator = self.aggregator.report + self.gcs_uris_regex = re.compile(r"uris=\[([^\]]+)\]") def get_time_window(self) -> Tuple[datetime, datetime]: if self.redundant_run_skip_handler: @@ -918,3 +927,190 @@ def test_capability(self, project_id: str) -> None: def report_status(self, step: str, status: bool) -> None: if self.redundant_run_skip_handler: self.redundant_run_skip_handler.report_current_run_status(step, status) + + def gen_lineage_workunits_for_external_table( + self, + dataset_urn: str, + ddl: Optional[str], + graph: Optional[DataHubGraph] = None, + ) -> Iterable[MetadataWorkUnit]: + + if not ddl: + return + + # Expect URIs in `uris=[""]` format + uris_match = self.gcs_uris_regex.search(ddl) + if not uris_match: + self.report.num_skipped_external_table_lineage += 1 + logger.warning(f"Unable to parse GCS URI from the provided DDL {ddl}.") + return + + uris_str = uris_match.group(1) + try: + source_uris = json.loads(f"[{uris_str}]") + except json.JSONDecodeError as e: + self.report.num_skipped_external_table_lineage += 1 + logger.warning( + f"Json load failed on loading source uri with error: {e}. The field value was: {uris_str}" + ) + return + + lineage_info = self.get_lineage_for_external_table( + dataset_urn=dataset_urn, + source_uris=source_uris, + graph=graph, + ) + + if lineage_info: + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=lineage_info + ).as_workunit() + + def get_lineage_for_external_table( + self, + dataset_urn: str, + source_uris: List[str], + graph: Optional[DataHubGraph] = None, + ) -> Optional[UpstreamLineageClass]: + + upstreams_list: List[UpstreamClass] = [] + fine_grained_lineages: List[FineGrainedLineageClass] = [] + gcs_urns: Set[str] = set() + + for source_uri in source_uris: + # Check that storage_location have the gs:// prefix. + # Right now we are only supporting GCS lineage + if not gcs_utils.is_gcs_uri(source_uri): + continue + gcs_path = self._get_gcs_path(source_uri) + + if gcs_path is None: + continue + + path = gcs_utils.strip_gcs_prefix(gcs_path) + urn = mce_builder.make_dataset_urn_with_platform_instance( + platform="gcs", + name=path, + env=self.config.env, + platform_instance=( + self.config.platform_instance + if self.config.platform_instance is not None + else None + ), + ) + gcs_urns.add(urn) + + upstreams_list.extend( + [ + UpstreamClass( + dataset=source_dataset_urn, + type=DatasetLineageTypeClass.COPY, + ) + for source_dataset_urn in gcs_urns + ] + ) + + if not upstreams_list: + return None + + if self.config.include_column_lineage_with_gcs: + assert graph + schema_metadata: Optional[SchemaMetadataClass] = graph.get_schema_metadata( + dataset_urn + ) + for gcs_dataset_urn in gcs_urns: + schema_metadata_for_gcs: Optional[ + SchemaMetadataClass + ] = graph.get_schema_metadata(gcs_dataset_urn) + if schema_metadata and schema_metadata_for_gcs: + fine_grained_lineage = self.get_fine_grained_lineages_with_gcs( + dataset_urn, + gcs_dataset_urn, + schema_metadata, + schema_metadata_for_gcs, + ) + if not fine_grained_lineage: + logger.warning( + f"Failed to retrieve fine-grained lineage for dataset {dataset_urn} and GCS {gcs_dataset_urn}. " + f"Check schema metadata: {schema_metadata} and GCS metadata: {schema_metadata_for_gcs}." + ) + continue + + fine_grained_lineages.extend(fine_grained_lineage) + + upstream_lineage = UpstreamLineageClass( + upstreams=upstreams_list, fineGrainedLineages=fine_grained_lineages or None + ) + return upstream_lineage + + def _get_gcs_path(self, path: str) -> Optional[str]: + if self.config.gcs_lineage_config: + for path_spec in self.config.gcs_lineage_config.path_specs: + if not path_spec.allowed(path): + logger.debug( + f"Skipping gcs path {path} as it does not match any path spec." + ) + self.report.num_lineage_dropped_gcs_path += 1 + continue + + _, table_path = path_spec.extract_table_name_and_path(path) + return table_path + + if ( + self.config.gcs_lineage_config.ignore_non_path_spec_path + and len(self.config.gcs_lineage_config.path_specs) > 0 + ): + self.report.num_lineage_dropped_gcs_path += 1 + logger.debug( + f"Skipping gcs path {path} as it does not match any path spec." + ) + return None + + if self.config.gcs_lineage_config.strip_urls: + if "/" in urlparse(path).path: + return str(path.rsplit("/", 1)[0]) + + return path + + def get_fine_grained_lineages_with_gcs( + self, + dataset_urn: str, + gcs_dataset_urn: str, + schema_metadata: SchemaMetadata, + schema_metadata_for_gcs: SchemaMetadata, + ) -> Optional[List[FineGrainedLineageClass]]: + def simplify_field_path(field_path): + return Dataset._simplify_field_path(field_path) + + if schema_metadata and schema_metadata_for_gcs: + fine_grained_lineages: List[FineGrainedLineageClass] = [] + for field in schema_metadata.fields: + field_path_v1 = simplify_field_path(field.fieldPath) + matching_gcs_field = next( + ( + f + for f in schema_metadata_for_gcs.fields + if simplify_field_path(f.fieldPath) == field_path_v1 + ), + None, + ) + if matching_gcs_field: + fine_grained_lineages.append( + FineGrainedLineageClass( + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + downstreams=[ + mce_builder.make_schema_field_urn( + dataset_urn, field_path_v1 + ) + ], + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + upstreams=[ + mce_builder.make_schema_field_urn( + gcs_dataset_urn, + simplify_field_path(matching_gcs_field.fieldPath), + ) + ], + ) + ) + return fine_grained_lineages + return None diff --git a/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py index 7456f2fd1d91c..415977b0f8467 100644 --- a/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/bigquery/test_bigquery_lineage.py @@ -1,19 +1,25 @@ import datetime -from typing import Dict, List, Set +from typing import Dict, List, Optional, Set import pytest +import datahub.metadata.schema_classes as models +from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( BigQueryTableRef, QueryEvent, ) -from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config +from datahub.ingestion.source.bigquery_v2.bigquery_config import ( + BigQueryV2Config, + GcsLineageProviderConfig, +) from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.common import BigQueryIdentifierBuilder from datahub.ingestion.source.bigquery_v2.lineage import ( BigqueryLineageExtractor, LineageEdge, ) +from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.sql_parsing.schema_resolver import SchemaResolver @@ -135,3 +141,220 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: upstream_lineage.fineGrainedLineages and len(upstream_lineage.fineGrainedLineages) == 2 ) + + +def test_lineage_for_external_bq_table(mock_datahub_graph_instance): + + pipeline_context = PipelineContext(run_id="bq_gcs_lineage") + pipeline_context.graph = mock_datahub_graph_instance + + def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass: + return models.SchemaMetadataClass( + schemaName="sample_schema", + platform="urn:li:dataPlatform:gcs", # important <- platform must be an urn + version=0, + hash="", + platformSchema=models.OtherSchemaClass( + rawSchema="__insert raw schema here__" + ), + fields=[ + models.SchemaFieldClass( + fieldPath="age", + type=models.SchemaFieldDataTypeClass(type=models.NumberTypeClass()), + nativeDataType="int", + ), + models.SchemaFieldClass( + fieldPath="firstname", + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + ), + models.SchemaFieldClass( + fieldPath="lastname", + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + ), + ], + ) + + pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore + path_specs: List[PathSpec] = [ + PathSpec(include="gs://bigquery_data/{table}/*.parquet"), + PathSpec(include="gs://bigquery_data/customer3/{table}/*.parquet"), + ] + gcs_lineage_config: GcsLineageProviderConfig = GcsLineageProviderConfig( + path_specs=path_specs + ) + + config = BigQueryV2Config( + include_table_lineage=True, + include_column_lineage_with_gcs=True, + gcs_lineage_config=gcs_lineage_config, + ) + + report = BigQueryV2Report() + extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( + config, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(config, report), + ) + + upstream_lineage = extractor.get_lineage_for_external_table( + dataset_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)", + source_uris=[ + "gs://bigquery_data/customer1/*.parquet", + "gs://bigquery_data/customer2/*.parquet", + "gs://bigquery_data/customer3/my_table/*.parquet", + ], + graph=pipeline_context.graph, + ) + + expected_schema_field_urns = [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),age)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),firstname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD),lastname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),age)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),firstname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD),lastname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),age)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),firstname)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD),lastname)", + ] + assert upstream_lineage + assert len(upstream_lineage.upstreams) == 3 + assert ( + upstream_lineage.fineGrainedLineages + and len(upstream_lineage.fineGrainedLineages) == 9 + ) + # Extracting column URNs from upstream_lineage.upstreams + actual_schema_field_urns = [ + fine_grained_lineage.upstreams[0] + if fine_grained_lineage.upstreams is not None + else [] + for fine_grained_lineage in upstream_lineage.fineGrainedLineages + ] + assert all( + urn in expected_schema_field_urns for urn in actual_schema_field_urns + ), "Some expected column URNs are missing from fine grained lineage." + + +def test_lineage_for_external_bq_table_no_column_lineage(mock_datahub_graph_instance): + + pipeline_context = PipelineContext(run_id="bq_gcs_lineage") + pipeline_context.graph = mock_datahub_graph_instance + + def fake_schema_metadata(entity_urn: str) -> Optional[models.SchemaMetadataClass]: + return None + + pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore + path_specs: List[PathSpec] = [ + PathSpec(include="gs://bigquery_data/{table}/*.parquet"), + PathSpec(include="gs://bigquery_data/customer3/{table}/*.parquet"), + ] + gcs_lineage_config: GcsLineageProviderConfig = GcsLineageProviderConfig( + path_specs=path_specs + ) + + config = BigQueryV2Config( + include_table_lineage=True, + include_column_lineage_with_gcs=True, + gcs_lineage_config=gcs_lineage_config, + ) + + report = BigQueryV2Report() + extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( + config, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(config, report), + ) + + upstream_lineage = extractor.get_lineage_for_external_table( + dataset_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)", + source_uris=[ + "gs://bigquery_data/customer1/*.parquet", + "gs://bigquery_data/customer2/*.parquet", + "gs://bigquery_data/customer3/my_table/*.parquet", + ], + graph=pipeline_context.graph, + ) + + expected_dataset_urns = [ + "urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer1,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer2,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:gcs,bigquery_data/customer3/my_table,PROD)", + ] + assert upstream_lineage + assert len(upstream_lineage.upstreams) == 3 + # Extracting dataset URNs from upstream_lineage.upstreams + actual_dataset_urns = [upstream.dataset for upstream in upstream_lineage.upstreams] + assert all( + urn in actual_dataset_urns for urn in expected_dataset_urns + ), "Some expected dataset URNs are missing from upstream lineage." + assert upstream_lineage.fineGrainedLineages is None + + +def test_lineage_for_external_table_with_non_gcs_uri(mock_datahub_graph_instance): + pipeline_context = PipelineContext(run_id="non_gcs_lineage") + pipeline_context.graph = mock_datahub_graph_instance + + config = BigQueryV2Config( + include_table_lineage=True, + include_column_lineage_with_gcs=False, # Column lineage disabled for simplicity + ) + report = BigQueryV2Report() + extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( + config, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(config, report), + ) + + upstream_lineage = extractor.get_lineage_for_external_table( + dataset_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)", + source_uris=[ + "https://some_non_gcs_path/customer1/file.csv", + "https://another_path/file.txt", + ], + graph=pipeline_context.graph, + ) + + assert upstream_lineage is None + + +def test_lineage_for_external_table_path_not_matching_specs( + mock_datahub_graph_instance, +): + pipeline_context = PipelineContext(run_id="path_not_matching_lineage") + pipeline_context.graph = mock_datahub_graph_instance + + path_specs: List[PathSpec] = [ + PathSpec(include="gs://different_data/db2/db3/{table}/*.parquet"), + ] + gcs_lineage_config: GcsLineageProviderConfig = GcsLineageProviderConfig( + path_specs=path_specs, ignore_non_path_spec_path=True + ) + config = BigQueryV2Config( + include_table_lineage=True, + include_column_lineage_with_gcs=False, + gcs_lineage_config=gcs_lineage_config, + ) + + report = BigQueryV2Report() + extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( + config, + report, + schema_resolver=SchemaResolver(platform="bigquery"), + identifiers=BigQueryIdentifierBuilder(config, report), + ) + + upstream_lineage = extractor.get_lineage_for_external_table( + dataset_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)", + source_uris=[ + "gs://bigquery_data/customer1/*.parquet", + "gs://bigquery_data/customer2/*.parquet", + ], + graph=pipeline_context.graph, + ) + + assert upstream_lineage is None