Skip to content

Commit

Permalink
fix(ingestion/bigquery): Add lineage extraction for BigQuery with GCS…
Browse files Browse the repository at this point in the history
… source (#11442)
  • Loading branch information
sagar-salvi-apptware authored Oct 29, 2024
1 parent a11ac8d commit bb63cbd
Show file tree
Hide file tree
Showing 8 changed files with 507 additions and 2 deletions.
17 changes: 17 additions & 0 deletions metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ source:
# deny:
# - "*.*.*"
#storage_project_id: project-id-1234567
## Lineage with GCS Source
# include_column_lineage_with_gcs: true/false
# gcs_lineage_config:
# path_specs:
# - include: "gs://my-bucket/foo/tests/bar.avro"
# - include: "gs://my-bucket/foo/tests/*.*"
# - include: "gs://my-bucket/foo/tests/{table}/*.avro"
# - include: "gs://my-bucket/foo/tests/{table}/*/*.avro"
# - include: "gs://my-bucket/foo/tests/{table}/*.*"
# - include: "gs://my-bucket/{dept}/tests/{table}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# - include: "gs://my-bucket/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# - include: "gs://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# strip_urls: false


## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.bq_schema_extractor.table_refs,
)

# Lineage BQ to GCS
if (
self.config.include_table_lineage
and self.bq_schema_extractor.external_tables
):
for dataset_urn, table in self.bq_schema_extractor.external_tables.items():
yield from self.lineage_extractor.gen_lineage_workunits_for_external_table(
dataset_urn, table.ddl, graph=self.ctx.graph
)

def get_report(self) -> BigQueryV2Report:
return self.report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulLineageConfigMixin,
Expand Down Expand Up @@ -206,6 +207,39 @@ def get_sql_alchemy_url(self) -> str:
return "bigquery://"


class GcsLineageProviderConfig(ConfigModel):
"""
Any source that produces gcs lineage from/to Datasets should inherit this class.
"""

path_specs: List[PathSpec] = Field(
default=[],
description="List of PathSpec. See below the details about PathSpec",
)

strip_urls: bool = Field(
default=True,
description="Strip filename from gcs url. It only applies if path_specs are not specified.",
)

ignore_non_path_spec_path: bool = Field(
default=False,
description="Ignore paths that are not match in path_specs. It only applies if path_specs are specified.",
)


class GcsDatasetLineageProviderConfigBase(ConfigModel):
"""
Any source that produces gcs lineage from/to Datasets should inherit this class.
This is needeed to group all lineage related configs under `gcs_lineage_config` config property.
"""

gcs_lineage_config: GcsLineageProviderConfig = Field(
default=GcsLineageProviderConfig(),
description="Common config for gcs lineage generation",
)


class BigQueryFilterConfig(SQLFilterConfig):
project_ids: List[str] = Field(
default_factory=list,
Expand Down Expand Up @@ -328,6 +362,7 @@ class BigQueryIdentifierConfig(


class BigQueryV2Config(
GcsDatasetLineageProviderConfigBase,
BigQueryConnectionConfig,
BigQueryBaseConfig,
BigQueryFilterConfig,
Expand Down Expand Up @@ -473,6 +508,11 @@ def have_table_data_read_permission(self) -> bool:
description="Option to enable/disable lineage generation. Is enabled by default.",
)

include_column_lineage_with_gcs: bool = Field(
default=True,
description="When enabled, column-level lineage will be extracted from the gcs.",
)

max_query_duration: timedelta = Field(
default=timedelta(minutes=15),
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class BigQueryV2Report(
num_filtered_query_events: int = 0
num_usage_query_hash_collisions: int = 0
num_operational_stats_workunits_emitted: int = 0
num_lineage_dropped_gcs_path: int = 0

snapshots_scanned: int = 0

# view lineage
Expand Down Expand Up @@ -185,6 +187,7 @@ class BigQueryV2Report(
usage_start_time: Optional[datetime] = None
usage_end_time: Optional[datetime] = None
stateful_usage_ingestion_enabled: bool = False
num_skipped_external_table_lineage: int = 0

queries_extractor: Optional[BigQueryQueriesExtractorReport] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class BigqueryTable(BaseTable):
columns_ignore_from_profiling: List[str] = field(default_factory=list)
external: bool = False
constraints: List[BigqueryTableConstraint] = field(default_factory=list)
table_type: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -377,6 +378,7 @@ def _make_bigquery_table(
return BigqueryTable(
name=table.table_name,
created=table.created,
table_type=table.table_type,
last_altered=(
datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ def __init__(
self.view_definitions: FileBackedDict[str] = FileBackedDict()
# Maps snapshot ref -> Snapshot
self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict()
# Add External BQ table
self.external_tables: Dict[str, BigqueryTable] = defaultdict()
self.bq_external_table_pattern = (
r".*create\s+external\s+table\s+`?(?:project_id\.)?.*`?"
)

bq_project = (
self.config.project_on_behalf
Expand Down Expand Up @@ -957,6 +962,15 @@ def gen_dataset_workunits(
project_id, dataset_name, table.name
)

# Added for bigquery to gcs lineage extraction
if (
isinstance(table, BigqueryTable)
and table.table_type == "EXTERNAL"
and table.ddl is not None
and re.search(self.bq_external_table_pattern, table.ddl, re.IGNORECASE)
):
self.external_tables[dataset_urn] = table

status = Status(removed=False)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=status
Expand Down
Loading

0 comments on commit bb63cbd

Please sign in to comment.