Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Airflow] Support OL Datasets in manual lineage inputs/outputs #1015

Merged
merged 9 commits into from
Aug 17, 2022
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Optional, List
from typing import Optional, List, Dict

from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata
from openlineage.airflow.facets import UnknownOperatorAttributeRunFacet, UnknownOperatorInstance
Expand All @@ -17,20 +17,23 @@ def get_operator_classnames(cls) -> List[str]:
return ["BashOperator"]

def extract(self) -> Optional[TaskMetadata]:
if os.environ.get(
collect_source = os.environ.get(
"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "True"
).lower() in ('true', '1', 't'):
return None
).lower() not in ('true', '1', 't')

return TaskMetadata(
name=f"{self.operator.dag_id}.{self.operator.task_id}",
job_facets={
job_facet: Dict = {}
if collect_source:
job_facet = {
"sourceCode": SourceCodeJobFacet(
"bash",
# We're on worker and should have access to DAG files
self.operator.bash_command
)
},
}

return TaskMetadata(
name=f"{self.operator.dag_id}.{self.operator.task_id}",
job_facets=job_facet,
run_facets={

# The BashOperator is recorded as an "unknownSource" even though we have an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@
from airflow.lineage.entities import Table


def table_to_dataset(table: Table):
return Dataset(
namespace=f"{table.cluster}",
name=f"{table.database}.{table.name}",
facets={},
)
def convert_to_dataset(obj):
if isinstance(obj, Dataset):
return obj
elif isinstance(obj, Table):
return Dataset(
namespace=f"{obj.cluster}",
name=f"{obj.database}.{obj.name}",
facets={},
)
else:
return None
15 changes: 9 additions & 6 deletions integration/airflow/openlineage/airflow/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ def extract_inlets_and_outlets(
inlets: List,
outlets: List,
):
from airflow.lineage.entities import Table
from openlineage.airflow.extractors.converters import table_to_dataset
from openlineage.airflow.extractors.converters import convert_to_dataset

self.log.debug("Manually extracting lineage metadata from inlets and outlets")
task_metadata.inputs = [table_to_dataset(t) for t in inlets
if isinstance(t, Table)]
task_metadata.outputs = [table_to_dataset(t) for t in outlets
if isinstance(t, Table)]
for i in inlets:
d = convert_to_dataset(i)
if d:
task_metadata.inputs.append(d)
for o in outlets:
d = convert_to_dataset(o)
if d:
task_metadata.outputs.append(d)
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ def get_operator_classnames(cls) -> List[str]:
return ["PythonOperator"]

def extract(self) -> Optional[TaskMetadata]:
collect_source = True
if os.environ.get(
collect_source = os.environ.get(
"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "True"
).lower() in ('true', '1', 't'):
collect_source = False
).lower() not in ('true', '1', 't')

source_code = self.get_source_code(self.operator.python_callable)
job_facet: Dict = {}
Expand Down
6 changes: 3 additions & 3 deletions integration/airflow/tests/extractors/test_bash_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
def test_extract_operator_bash_command_disables_without_env():
operator = BashOperator(task_id='taskid', bash_command="exit 0")
extractor = BashExtractor(operator)
assert extractor.extract() is None
assert 'sourceCode' not in extractor.extract().job_facets


@patch.dict(os.environ, {"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE": "False"})
Expand All @@ -26,7 +26,7 @@ def test_extract_operator_bash_command_enables_on_true():

def test_extract_dag_bash_command_disabled_without_env():
extractor = BashExtractor(bash_task)
assert extractor.extract() is None
assert 'sourceCode' not in extractor.extract().job_facets


@patch.dict(os.environ, {"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE": "False"})
Expand All @@ -39,7 +39,7 @@ def test_extract_dag_bash_command_enables_on_true():
@patch.dict(os.environ, {"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE": "True"})
def test_extract_dag_bash_command_env_disables_on_true():
extractor = BashExtractor(bash_task)
assert extractor.extract() is None
assert 'sourceCode' not in extractor.extract().job_facets


@patch.dict(os.environ, {"OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE": "asdftgeragdsfgawef"})
Expand Down
23 changes: 21 additions & 2 deletions integration/airflow/tests/extractors/test_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,34 @@
reason="requires AIRFLOW_VERSION to be higher than 2.0",
)
def test_table_to_dataset_conversion():
from openlineage.airflow.extractors.converters import table_to_dataset
from openlineage.airflow.extractors.converters import convert_to_dataset
from airflow.lineage.entities import Table
t = Table(
database="db",
cluster="c",
name="table1",
)

d = table_to_dataset(t)
d = convert_to_dataset(t)

assert d.namespace == "c"
assert d.name == "db.table1"


@pytest.mark.skipif(
parse_version(AIRFLOW_VERSION) < parse_version("2.0.0"),
reason="requires AIRFLOW_VERSION to be higher than 2.0",
)
def test_dataset_to_dataset_conversion():
from openlineage.airflow.extractors.converters import convert_to_dataset
from openlineage.client.run import Dataset
t = Dataset(
namespace="c",
name="db.table1",
facets={},
)

d = convert_to_dataset(t)

assert d.namespace == "c"
assert d.name == "db.table1"
53 changes: 44 additions & 9 deletions integration/airflow/tests/extractors/test_extractor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ def test_extracting_inlets_and_outlets():
from openlineage.client.run import Dataset

metadata = TaskMetadata(name="fake-name", job_facets={})
inlets = [Table(database="d1", cluster="c1", name="t1")]
inlets = [Dataset(namespace="c1", name="d1.t0", facets={}),
Table(database="d1", cluster="c1", name="t1")]
outlets = [Table(database="d1", cluster="c1", name="t2")]

manager = ExtractorManager()
manager.extract_inlets_and_outlets(metadata, inlets, outlets)

assert len(metadata.inputs) == 1 and len(metadata.outputs) == 1
assert len(metadata.inputs) == 2 and len(metadata.outputs) == 1
assert isinstance(metadata.inputs[0], Dataset)
assert isinstance(metadata.inputs[1], Dataset)
assert isinstance(metadata.outputs[0], Dataset)


Expand All @@ -107,18 +109,44 @@ def test_extraction_from_inlets_and_outlets_without_extractor():

task = FakeOperator(
task_id="task",
inlets=[Table(database="d1", cluster="c1", name="t1")],
inlets=[Dataset(namespace="c1", name="d1.t0", facets={}),
Table(database="d1", cluster="c1", name="t1")],
outlets=[Table(database="d1", cluster="c1", name="t2")],
)

manager = ExtractorManager()

metadata = manager.extract_metadata(dagrun, task)
assert len(metadata.inputs) == 1 and len(metadata.outputs) == 1
assert len(metadata.inputs) == 2 and len(metadata.outputs) == 1
assert isinstance(metadata.inputs[0], Dataset)
assert isinstance(metadata.inputs[1], Dataset)
assert isinstance(metadata.outputs[0], Dataset)


@pytest.mark.skipif(
parse_version(AIRFLOW_VERSION) < parse_version("2.0.0"),
reason="requires AIRFLOW_VERSION to be higher than 2.0",
)
def test_extraction_from_inlets_and_outlets_ignores_unhandled_types():
from airflow.lineage.entities import Table, File
from openlineage.client.run import Dataset

dagrun = MagicMock()

task = FakeOperator(
task_id="task",
inlets=[Dataset(namespace="c1", name="d1.t0", facets={}),
File(url="http://test"), Table(database="d1", cluster="c1", name="t1")],
outlets=[Table(database="d1", cluster="c1", name="t2"), File(url="http://test")],
)

manager = ExtractorManager()

metadata = manager.extract_metadata(dagrun, task)
# The File objects from inlets and outlets should not be converted
assert len(metadata.inputs) == 2 and len(metadata.outputs) == 1


@pytest.mark.skipif(
parse_version(AIRFLOW_VERSION) < parse_version("2.0.0"),
reason="requires AIRFLOW_VERSION to be higher than 2.0",
Expand All @@ -131,19 +159,25 @@ def test_fake_extractor_extracts_from_inlets_and_outlets():

task = FakeOperator(
task_id="task",
inlets=[Table(database="d1", cluster="c1", name="t1")],
outlets=[Table(database="d1", cluster="c1", name="t2")],
inlets=[Dataset(namespace="c1", name="d1.t0", facets={}),
Table(database="d1", cluster="c1", name="t1")],
outlets=[Table(database="d1", cluster="c1", name="t2"),
Dataset(namespace="c1", name="d1.t3", facets={})],
)

manager = ExtractorManager()
manager.add_extractor(FakeOperator.__name__, FakeExtractor)

metadata = manager.extract_metadata(dagrun, task)
assert len(metadata.inputs) == 1 and len(metadata.outputs) == 1
assert len(metadata.inputs) == 2 and len(metadata.outputs) == 2
assert isinstance(metadata.inputs[0], Dataset)
assert isinstance(metadata.inputs[1], Dataset)
assert isinstance(metadata.outputs[0], Dataset)
assert metadata.inputs[0].name == "d1.t1"
assert isinstance(metadata.outputs[1], Dataset)
assert metadata.inputs[0].name == "d1.t0"
assert metadata.inputs[1].name == "d1.t1"
assert metadata.outputs[0].name == "d1.t2"
assert metadata.outputs[1].name == "d1.t3"


@pytest.mark.skipif(
Expand All @@ -158,7 +192,8 @@ def test_fake_extractor_extracts_and_discards_inlets_and_outlets():

task = FakeOperator(
task_id="task",
inlets=[Table(database="d1", cluster="c1", name="t1")],
inlets=[Dataset(namespace="c1", name="d1.t0", facets={}),
Table(database="d1", cluster="c1", name="t1")],
outlets=[Table(database="d1", cluster="c1", name="t2")],
)

Expand Down