Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/bigquery-gcs-lineage): Add lineage extraction for BigQuery with GCS source #11442

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ source:
# deny:
# - "*.*.*"
#storage_project_id: project-id-1234567
## Lineage with GCS Source
# include_column_lineage_with_gcs: true/false
# gcs_lineage_config:
# path_specs:
# - include: "gs://my-bucket/foo/tests/bar.avro"
# - include: "gs://my-bucket/foo/tests/*.*"
# - include: "gs://my-bucket/foo/tests/{table}/*.avro"
# - include: "gs://my-bucket/foo/tests/{table}/*/*.avro"
# - include: "gs://my-bucket/foo/tests/{table}/*.*"
# - include: "gs://my-bucket/{dept}/tests/{table}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro"
# - include: "gs://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# - include: "gs://my-bucket/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# - include: "gs://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.*"
# strip_urls: false


## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,16 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.bq_schema_extractor.table_refs,
)

# Lineage BQ to GCS
if (
self.config.include_table_lineage
and self.bq_schema_extractor.external_tables
):
for dataset_urn, table in self.bq_schema_extractor.external_tables.items():
yield from self.lineage_extractor.gen_lineage_workunits_for_external_table(
dataset_urn, table.ddl, graph=self.ctx.graph
)

def get_report(self) -> BigQueryV2Report:
return self.report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulLineageConfigMixin,
Expand Down Expand Up @@ -206,6 +207,39 @@ def get_sql_alchemy_url(self) -> str:
return "bigquery://"


class GcsLineageProviderConfig(ConfigModel):
"""
Any source that produces gcs lineage from/to Datasets should inherit this class.
"""

path_specs: List[PathSpec] = Field(
default=[],
description="List of PathSpec. See below the details about PathSpec",
)

strip_urls: bool = Field(
default=True,
description="Strip filename from gcs url. It only applies if path_specs are not specified.",
)

ignore_non_path_spec_path: bool = Field(
default=False,
description="Ignore paths that are not match in path_specs. It only applies if path_specs are specified.",
)


class GcsDatasetLineageProviderConfigBase(ConfigModel):
"""
Any source that produces gcs lineage from/to Datasets should inherit this class.
This is needeed to group all lineage related configs under `gcs_lineage_config` config property.
"""

gcs_lineage_config: GcsLineageProviderConfig = Field(
default=GcsLineageProviderConfig(),
description="Common config for gcs lineage generation",
)


class BigQueryFilterConfig(SQLFilterConfig):
project_ids: List[str] = Field(
default_factory=list,
Expand Down Expand Up @@ -328,6 +362,7 @@ class BigQueryIdentifierConfig(


class BigQueryV2Config(
GcsDatasetLineageProviderConfigBase,
BigQueryConnectionConfig,
BigQueryBaseConfig,
BigQueryFilterConfig,
Expand Down Expand Up @@ -473,6 +508,11 @@ def have_table_data_read_permission(self) -> bool:
description="Option to enable/disable lineage generation. Is enabled by default.",
)

include_column_lineage_with_gcs: bool = Field(
default=True,
description="When enabled, column-level lineage will be extracted from the gcs.",
)

max_query_duration: timedelta = Field(
default=timedelta(minutes=15),
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class BigQueryV2Report(
num_filtered_query_events: int = 0
num_usage_query_hash_collisions: int = 0
num_operational_stats_workunits_emitted: int = 0
num_lineage_dropped_gcs_path: int = 0

snapshots_scanned: int = 0

# view lineage
Expand Down Expand Up @@ -185,6 +187,7 @@ class BigQueryV2Report(
usage_start_time: Optional[datetime] = None
usage_end_time: Optional[datetime] = None
stateful_usage_ingestion_enabled: bool = False
num_skipped_external_table_lineage: int = 0

queries_extractor: Optional[BigQueryQueriesExtractorReport] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class BigqueryTable(BaseTable):
columns_ignore_from_profiling: List[str] = field(default_factory=list)
external: bool = False
constraints: List[BigqueryTableConstraint] = field(default_factory=list)
table_type: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -377,6 +378,7 @@ def _make_bigquery_table(
return BigqueryTable(
name=table.table_name,
created=table.created,
table_type=table.table_type,
last_altered=(
datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ def __init__(
self.view_definitions: FileBackedDict[str] = FileBackedDict()
# Maps snapshot ref -> Snapshot
self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict()
# Add External BQ table
self.external_tables: Dict[str, BigqueryTable] = defaultdict()
self.bq_external_table_pattern = (
r".*create\s+external\s+table\s+`?(?:project_id\.)?.*`?"
)

bq_project = (
self.config.project_on_behalf
Expand Down Expand Up @@ -957,6 +962,15 @@ def gen_dataset_workunits(
project_id, dataset_name, table.name
)

# Added for bigquery to gcs lineage extraction
if (
isinstance(table, BigqueryTable)
and table.table_type == "EXTERNAL"
and table.ddl is not None
and re.search(self.bq_external_table_pattern, table.ddl, re.IGNORECASE)
):
self.external_tables[dataset_urn] = table

status = Status(removed=False)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=status
Expand Down
Loading
Loading