Skip to content

Commit

Permalink
Remove openlineage.common dependencies in Google and Snowflake prov…
Browse files Browse the repository at this point in the history
…iders.

Signed-off-by: Jakub Dardzinski <[email protected]>
  • Loading branch information
JDarDagran committed May 16, 2024
1 parent d4a5f4e commit d86cc04
Show file tree
Hide file tree
Showing 13 changed files with 787 additions and 106 deletions.
63 changes: 1 addition & 62 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
BigQueryValueCheckTrigger,
)
from airflow.providers.google.cloud.utils.bigquery import convert_job_id
from airflow.providers.google.cloud.utils.openlineage import _BigQueryOpenLineageMixin
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID
from airflow.utils.helpers import exactly_one

Expand Down Expand Up @@ -141,68 +142,6 @@ def get_db_hook(self: BigQueryCheckOperator) -> BigQueryHook: # type:ignore[mis
)


class _BigQueryOpenLineageMixin:
def get_openlineage_facets_on_complete(self, task_instance):
"""
Retrieve OpenLineage data for a COMPLETE BigQuery job.
This method retrieves statistics for the specified job_ids using the BigQueryDatasetsProvider.
It calls BigQuery API, retrieving input and output dataset info from it, as well as run-level
usage statistics.
Run facets should contain:
- ExternalQueryRunFacet
- BigQueryJobRunFacet
Job facets should contain:
- SqlJobFacet if operator has self.sql
Input datasets should contain facets:
- DataSourceDatasetFacet
- SchemaDatasetFacet
Output datasets should contain facets:
- DataSourceDatasetFacet
- SchemaDatasetFacet
- OutputStatisticsOutputDatasetFacet
"""
from openlineage.client.facet import SqlJobFacet
from openlineage.common.provider.bigquery import BigQueryDatasetsProvider

from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.utils.utils import normalize_sql

if not self.job_id:
return OperatorLineage()

client = self.hook.get_client(project_id=self.hook.project_id)
job_ids = self.job_id
if isinstance(self.job_id, str):
job_ids = [self.job_id]
inputs, outputs, run_facets = {}, {}, {}
for job_id in job_ids:
stats = BigQueryDatasetsProvider(client=client).get_facets(job_id=job_id)
for input in stats.inputs:
input = input.to_openlineage_dataset()
inputs[input.name] = input
if stats.output:
output = stats.output.to_openlineage_dataset()
outputs[output.name] = output
for key, value in stats.run_facets.items():
run_facets[key] = value

job_facets = {}
if hasattr(self, "sql"):
job_facets["sql"] = SqlJobFacet(query=normalize_sql(self.sql))

return OperatorLineage(
inputs=list(inputs.values()),
outputs=list(outputs.values()),
run_facets=run_facets,
job_facets=job_facets,
)


class _BigQueryOperatorsEncryptionConfigurationMixin:
"""A class to handle the configuration for BigQueryHook.insert_job method."""

Expand Down
Loading

0 comments on commit d86cc04

Please sign in to comment.