Skip to content

Commit

Permalink
openlineage: extend custom_run_facets to also be executed on complete…
Browse files Browse the repository at this point in the history
… and fail (apache#40953)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored and sunank200 committed Jul 24, 2024
1 parent 22bd176 commit 0486811
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 174 deletions.
24 changes: 16 additions & 8 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def complete_task(
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
run_facets: dict[str, RunFacet] | None = None, # Custom run facets
) -> RunEvent:
"""
Emit openlineage event of type COMPLETE.
Expand All @@ -254,7 +255,11 @@ def complete_task(
:param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
"""
run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=end_time,
Expand All @@ -263,7 +268,7 @@ def complete_task(
job_name=job_name,
parent_job_name=parent_job_name,
parent_run_id=parent_run_id,
run_facets=task.run_facets,
run_facets=run_facets,
),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets),
inputs=task.inputs,
Expand All @@ -280,6 +285,7 @@ def fail_task(
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
run_facets: dict[str, RunFacet] | None = None, # Custom run facets
error: str | BaseException | None = None,
) -> RunEvent:
"""
Expand All @@ -292,20 +298,22 @@ def fail_task(
:param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
:param error: error
"""
error_facet = {}
run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}

if error:
stack_trace = None
if isinstance(error, BaseException) and error.__traceback__:
import traceback

stack_trace = "\\n".join(traceback.format_exception(type(error), error, error.__traceback__))
error_facet = {
"errorMessage": error_message_run.ErrorMessageRunFacet(
message=str(error), programmingLanguage="python", stackTrace=stack_trace
)
}
run_facets["errorMessage"] = error_message_run.ErrorMessageRunFacet(
message=str(error), programmingLanguage="python", stackTrace=stack_trace
)

event = RunEvent(
eventType=RunState.FAIL,
Expand All @@ -315,7 +323,7 @@ def fail_task(
job_name=job_name,
parent_job_name=parent_job_name,
parent_run_id=parent_run_id,
run_facets={**task.run_facets, **error_facet},
run_facets=run_facets,
),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets),
inputs=task.inputs,
Expand Down
10 changes: 7 additions & 3 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,24 @@
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.utils import (
get_airflow_job_facet,
get_airflow_mapped_task_facet,
get_airflow_run_facet,
get_custom_facets,
get_job_name,
get_user_provided_run_facets,
is_operator_disabled,
is_selective_lineage_enabled,
print_warning,
)
from airflow.settings import configure_orm
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState
from airflow.utils.timeout import timeout

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.models import DagRun, TaskInstance
from airflow.utils.state import TaskInstanceState

_openlineage_listener: OpenLineageListener | None = None
_IS_AIRFLOW_2_10_OR_HIGHER = Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.10.0")
Expand Down Expand Up @@ -163,7 +164,8 @@ def on_running():
owners=dag.owner.split(", "),
task=task_metadata,
run_facets={
**get_custom_facets(task_instance),
**get_user_provided_run_facets(task_instance, TaskInstanceState.RUNNING),
**get_airflow_mapped_task_facet(task_instance),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
},
)
Expand Down Expand Up @@ -233,6 +235,7 @@ def on_success():
parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
run_facets=get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS),
)
Stats.gauge(
f"ol.event.size.{event_type}.{operator_name}",
Expand Down Expand Up @@ -327,6 +330,7 @@ def on_failure():
parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
run_facets=get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED),
error=error,
)
Stats.gauge(
Expand Down
29 changes: 17 additions & 12 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@
from airflow.utils.module_loading import import_string

if TYPE_CHECKING:
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
from openlineage.client.facet_v2 import RunFacet
from openlineage.client.run import Dataset as OpenLineageDataset

from airflow.models import DagRun, TaskInstance
from airflow.utils.state import TaskInstanceState


log = logging.getLogger(__name__)
Expand All @@ -81,28 +82,32 @@ def get_job_name(task: TaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"


def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, Any]:
from airflow.providers.openlineage.extractors.manager import try_import_from_string

custom_facets = {}
def get_airflow_mapped_task_facet(task_instance: TaskInstance) -> dict[str, Any]:
# check for -1 comes from SmartSensor compatibility with dynamic task mapping
# this comes from Airflow code
if hasattr(task_instance, "map_index") and getattr(task_instance, "map_index") != -1:
custom_facets["airflow_mappedTask"] = AirflowMappedTaskRunFacet.from_task_instance(task_instance)
return {"airflow_mappedTask": AirflowMappedTaskRunFacet.from_task_instance(task_instance)}
return {}


def get_user_provided_run_facets(ti: TaskInstance, ti_state: TaskInstanceState) -> dict[str, RunFacet]:
custom_facets = {}

# Append custom run facets by executing the custom_run_facet functions.
for custom_facet_func in conf.custom_run_facets():
try:
func: Callable[[Any], dict] | None = try_import_from_string(custom_facet_func)
func: Callable[[TaskInstance, TaskInstanceState], dict[str, RunFacet]] | None = (
try_import_from_string(custom_facet_func)
)
if not func:
log.warning(
"OpenLineage is unable to import custom facet function `%s`; will ignore it.",
custom_facet_func,
)
continue
facet: dict[str, dict[Any, Any]] | None = func(task_instance)
if facet and isinstance(facet, dict):
duplicate_facet_keys = [facet_key for facet_key in facet.keys() if facet_key in custom_facets]
facets: dict[str, RunFacet] | None = func(ti, ti_state)
if facets and isinstance(facets, dict):
duplicate_facet_keys = [facet_key for facet_key in facets if facet_key in custom_facets]
if duplicate_facet_keys:
log.warning(
"Duplicate OpenLineage custom facets key(s) found: `%s` from function `%s`; "
Expand All @@ -112,10 +117,10 @@ def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, An
)
log.debug(
"Adding OpenLineage custom facet with key(s): `%s` from function `%s`.",
tuple(facet),
tuple(facets),
custom_facet_func,
)
custom_facets.update(facet)
custom_facets.update(facets)
except Exception as exc:
log.warning(
"Error processing custom facet function `%s`; will ignore it. Error was: %s: %s",
Expand Down
81 changes: 40 additions & 41 deletions docs/apache-airflow-providers-openlineage/guides/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Instead of returning complete OpenLineage event, the provider defines ``Operator
class OperatorLineage:
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, BaseFacet] = Factory(dict)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
OpenLineage integration itself takes care to enrich it with things like general Airflow facets, proper event time and type, creating proper OpenLineage RunEvent.
Expand Down Expand Up @@ -214,11 +214,11 @@ Both methods return ``OperatorLineage`` structure:
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, BaseFacet] = Factory(dict)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
Inputs and outputs are lists of plain OpenLineage datasets (`openlineage.client.run.Dataset`).
Inputs and outputs are lists of plain OpenLineage datasets (`openlineage.client.event_v2.Dataset`).

``run_facets`` and ``job_facets`` are dictionaries of optional RunFacets and JobFacets that would be attached to the job - for example,
you might want to attach ``SqlJobFacet`` if your Operator is executing SQL.
Expand Down Expand Up @@ -303,23 +303,20 @@ like extracting column level lineage and inputs/outputs from SQL query with SQL

.. code-block:: python
from airflow.models.baseoperator import BaseOperator
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.common.compat.openlineage.facet import (
BaseFacet,
Dataset,
ExternalQueryRunFacet,
SQLJobFacet,
)
from airflow.models.baseoperator import BaseOperator
from airflow.providers.openlineage.extractors.base import BaseExtractor
class ExampleOperator(BaseOperator):
def __init__(self, query, bq_table_reference, s3_path) -> None:
self.bq_table_reference = bq_table_reference
self.s3_path = s3_path
self.s3_file_name = s3_file_name
self.query = query
self._job_id = None
def execute(self, context) -> Any:
Expand All @@ -334,20 +331,20 @@ like extracting column level lineage and inputs/outputs from SQL query with SQL
def _execute_extraction(self) -> OperatorLineage:
"""Define what we know before Operator's extract is called."""
return OperatorLineage(
inputs=[Dataset(namespace="bigquery", name=self.bq_table_reference)],
outputs=[Dataset(namespace=self.s3_path, name=self.s3_file_name)],
inputs=[Dataset(namespace="bigquery", name=self.operator.bq_table_reference)],
outputs=[Dataset(namespace=self.operator.s3_path, name=self.operator.s3_file_name)],
job_facets={
"sql": SQLJobFacet(
query="EXPORT INTO ... OPTIONS(FORMAT=csv, SEP=';' ...) AS SELECT * FROM ... "
)
},
)
def extract_on_complete(self) -> OperatorLineage:
def extract_on_complete(self, task_instance) -> OperatorLineage:
"""Add what we received after Operator's extract call."""
lineage_metadata = self.extract()
lineage_metadata.run_facets = {
"parent": ExternalQueryRunFacet(externalQueryId=self._job_id, source="bigquery")
"parent": ExternalQueryRunFacet(externalQueryId=task_instance.task._job_id, source="bigquery")
}
return lineage_metadata
Expand Down Expand Up @@ -454,42 +451,38 @@ Custom Facets
=============
To learn more about facets in OpenLineage, please refer to `facet documentation <https://openlineage.io/docs/spec/facets/>`_.
Also check out `available facets <https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
and a blog post about `extending with facets <https://openlineage.io/blog/extending-with-facets/>`_.

The OpenLineage spec might not contain all the facets you need to write your extractor,
in which case you will have to make your own `custom facets <https://openlineage.io/docs/spec/facets/custom-facets>`_.
More on creating custom facets can be found `here <https://openlineage.io/blog/extending-with-facets/>`_.

Custom Run Facets
=================

You can inject your own custom facets in the lineage event's run facet using the ``custom_run_facets`` Airflow configuration.
You can also inject your own custom facets in the lineage event's run facet using the ``custom_run_facets`` Airflow configuration.

Steps to be taken,

1. Write a function that returns the custom facet. You can write as many custom facet functions as needed.
1. Write a function that returns the custom facets. You can write as many custom facet functions as needed.
2. Register the functions using the ``custom_run_facets`` Airflow configuration.

Once done, Airflow OpenLineage listener will automatically execute these functions during the lineage event generation
and append their return values to the run facet in the lineage event.
Airflow OpenLineage listener will automatically execute these functions during the lineage event generation and append their return values to the run facet in the lineage event.

Writing a custom facet function
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- **Input arguments:** The function should accept the ``TaskInstance`` as an input argument.
- **Function body:** Perform the logic needed to generate the custom facet. The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` and ``_schemaURL`` to be automatically added for the facet.
- **Return value:** The custom facet to be added to the lineage event. Return type should be ``dict[str, dict]`` or ``None``. You may choose to return ``None``, if you do not want to add custom facets for certain criteria.
- **Input arguments:** The function should accept two input arguments: ``TaskInstance`` and ``TaskInstanceState``.
- **Function body:** Perform the logic needed to generate the custom facets. The custom facets must inherit from the ``RunFacet`` for the ``_producer`` and ``_schemaURL`` to be automatically added for the facet.
- **Return value:** The custom facets to be added to the lineage event. Return type should be ``dict[str, RunFacet]`` or ``None``. You may choose to return ``None``, if you do not want to add custom facets for certain criteria.

**Example custom facet function**

.. code-block:: python
import attrs
from airflow.models import TaskInstance
from airflow.providers.common.compat.openlineage.facet import BaseFacet
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.common.compat.openlineage.facet import RunFacet
@attrs.define(slots=False)
class MyCustomRunFacet(BaseFacet):
class MyCustomRunFacet(RunFacet):
"""Define a custom facet."""
name: str
Expand All @@ -499,24 +492,29 @@ Writing a custom facet function
dagId: str
taskId: str
cluster: str
custom_metadata: dict
def get_my_custom_facet(task_instance: TaskInstance) -> dict[str, dict] | None:
def get_my_custom_facet(
task_instance: TaskInstance, ti_state: TaskInstanceState
) -> dict[str, RunFacet] | None:
operator_name = task_instance.task.operator_name
custom_metadata = {}
if operator_name == "BashOperator":
return
return None
if ti_state == TaskInstanceState.FAILED:
custom_metadata["custom_key_failed"] = "custom_value"
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
"additional_run_facet": MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
custom_metadata=custom_metadata,
)
}
Expand All @@ -540,9 +538,10 @@ a string of semicolon separated full import path to the functions.
.. note::

- The custom facet functions are only executed at the start of the TaskInstance and added to the OpenLineage START event.
- Duplicate functions if registered, will be executed only once.
- When duplicate custom facet keys are returned by different functions, the last processed function will be added to the lineage event.
- The custom facet functions are executed both at the START and COMPLETE/FAIL of the TaskInstance and added to the corresponding OpenLineage event.
- When creating conditions on TaskInstance state, you should use second argument provided (``TaskInstanceState``) that will contain the state the task should be in. This may vary from ti.current_state() as the OpenLineage listener may get called before the TaskInstance's state is updated in Airflow database.
- When path to a single function is registered more than once, it will still be executed only once.
- When duplicate custom facet keys are returned by multiple functions registered, the result of random function result will be added to the lineage event. Please avoid using duplicate facet keys as it can produce unexpected behaviour.

.. _job_hierarchy:openlineage:

Expand Down
Loading

0 comments on commit 0486811

Please sign in to comment.