Skip to content

Commit

Permalink
Set slots to True for facets used in DagRun (#40972)
Browse files Browse the repository at this point in the history
state change level OpenLineage listener hooks.

Signed-off-by: Jakub Dardzinski <[email protected]>
  • Loading branch information
JDarDagran authored and sunank200 committed Jul 24, 2024
1 parent 5191f45 commit 8b86d00
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 4 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/openlineage/plugins/facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def from_task_instance(cls, task_instance):
)


@define(slots=False)
@define(slots=True)
class AirflowJobFacet(JobFacet):
"""
Composite Airflow job facet.
Expand All @@ -70,7 +70,7 @@ class AirflowJobFacet(JobFacet):
tasks: dict


@define(slots=False)
@define(slots=True)
class AirflowStateRunFacet(RunFacet):
"""
Airflow facet providing state information.
Expand Down Expand Up @@ -100,7 +100,7 @@ class AirflowRunFacet(RunFacet):
taskUuid: str


@define(slots=False)
@define(slots=True)
class AirflowDagRunFacet(RunFacet):
"""Composite Airflow DAG run facet."""

Expand Down
66 changes: 65 additions & 1 deletion tests/providers/openlineage/plugins/test_facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@
# under the License.
from __future__ import annotations

from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowRunFacet
import pickle

import attrs
import pytest

from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
AirflowJobFacet,
AirflowRunFacet,
AirflowStateRunFacet,
)


def test_airflow_run_facet():
Expand Down Expand Up @@ -52,3 +62,57 @@ def test_airflow_dag_run_facet():

assert airflow_dag_run_facet.dag == dag
assert airflow_dag_run_facet.dagRun == dag_run


@pytest.mark.parametrize(
"instance",
[
pytest.param(
AirflowJobFacet(
taskTree={"task_0": {"section_1.task_3": {}}},
taskGroups={
"section_1": {
"parent_group": None,
"tooltip": "",
"ui_color": "CornflowerBlue",
"ui_fgcolor": "#000",
"ui_label": "section_1",
}
},
tasks={
"task_0": {
"operator": "airflow.operators.bash.BashOperator",
"task_group": None,
"emits_ol_events": True,
"ui_color": "#f0ede4",
"ui_fgcolor": "#000",
"ui_label": "task_0",
"is_setup": False,
"is_teardown": False,
}
},
),
id="AirflowJobFacet",
),
pytest.param(
AirflowStateRunFacet(dagRunState="SUCCESS", tasksState={"task_0": "SKIPPED"}),
id="AirflowStateRunFacet",
),
pytest.param(
AirflowDagRunFacet(
dag={
"timetable": {"delta": 86400.0},
"owner": "airflow",
"start_date": "2024-06-01T00:00:00+00:00",
},
dagRun={"conf": {}, "dag_id": "dag_id"},
),
id="AirflowDagRunFacet",
),
],
)
def test_facets_are_pickled_correctly(instance):
cls = instance.__class__
instance = pickle.loads(pickle.dumps(instance))
for field in attrs.fields(cls):
getattr(instance, field.name)

0 comments on commit 8b86d00

Please sign in to comment.