Skip to content

Commit

Permalink
Ability to add custom facet in OpenLineage events (#38982)
Browse files Browse the repository at this point in the history
* Ability to add custom facet in OpenLineage events

* Update airflow/providers/openlineage/provider.yaml

Co-authored-by: Elad Kalif <[email protected]>

* Update airflow/providers/openlineage/provider.yaml

Co-authored-by: Kacper Muda <[email protected]>

* Adding None type hint for the custom facet function

* Fix a test after rebase

* Removed the legacy OPENLINEAGE_ configs format for OPENLINEAGE_CUSTOM_FACET_FUNCTIONS

* Duplicate facet key check

* Update airflow/providers/openlineage/utils/utils.py

Co-authored-by: Kacper Muda <[email protected]>

* Update airflow/providers/openlineage/utils/utils.py

Co-authored-by: Kacper Muda <[email protected]>

* Fixes after rebase

* Adding user docs for custom_facet_functions

* Rename custom_facet_functions as custom_run_facets

* Increment version for custom_run_facets feature

* Enrich example with access to operator and return value as None.

* Add try-except for custom facet function execution

* Fix the typing for the custom facet fucntion return type

* Documentation: funcs are executed only for START events

* Fix the typing for the custom facet function return type

* Fixes after pre-commit hook checks

* Adding start_date to test DAGs for 2.7 compatibility tests

* Removing a out of scope __init__ file added by pre-commit check

---------

Co-authored-by: Anandhi <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
Co-authored-by: Kacper Muda <[email protected]>
  • Loading branch information
4 people authored Jul 22, 2024
1 parent 05c39b4 commit e30f810
Show file tree
Hide file tree
Showing 8 changed files with 443 additions and 5 deletions.
11 changes: 11 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ def custom_extractors() -> set[str]:
return set(extractor.strip() for extractor in option.split(";") if extractor.strip())


@cache
def custom_run_facets() -> set[str]:
"""[openlineage] custom_run_facets."""
option = conf.get(_CONFIG_SECTION, "custom_run_facets", fallback="")
return set(
custom_facet_function.strip()
for custom_facet_function in option.split(";")
if custom_facet_function.strip()
)


@cache
def namespace() -> str:
"""[openlineage] namespace."""
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ config:
example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
default: ~
version_added: ~
custom_run_facets:
description: |
Register custom run facet functions by passing a string of semicolon separated full import paths.
type: string
example: full.path.to.custom_facet_function;full.path.to.another_custom_facet_function
default: ''
version_added: 1.10.0
config_path:
description: |
Specify the path to the YAML configuration file.
Expand Down
38 changes: 37 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from contextlib import redirect_stdout, suppress
from functools import wraps
from io import StringIO
from typing import TYPE_CHECKING, Any, Iterable
from typing import TYPE_CHECKING, Any, Callable, Iterable

import attrs
from deprecated import deprecated
Expand Down Expand Up @@ -79,11 +79,47 @@ def get_job_name(task: TaskInstance) -> str:


def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, Any]:
from airflow.providers.openlineage.extractors.manager import try_import_from_string

custom_facets = {}
# check for -1 comes from SmartSensor compatibility with dynamic task mapping
# this comes from Airflow code
if hasattr(task_instance, "map_index") and getattr(task_instance, "map_index") != -1:
custom_facets["airflow_mappedTask"] = AirflowMappedTaskRunFacet.from_task_instance(task_instance)

# Append custom run facets by executing the custom_run_facet functions.
for custom_facet_func in conf.custom_run_facets():
try:
func: Callable[[Any], dict] | None = try_import_from_string(custom_facet_func)
if not func:
log.warning(
"OpenLineage is unable to import custom facet function `%s`; will ignore it.",
custom_facet_func,
)
continue
facet: dict[str, dict[Any, Any]] | None = func(task_instance)
if facet and isinstance(facet, dict):
duplicate_facet_keys = [facet_key for facet_key in facet.keys() if facet_key in custom_facets]
if duplicate_facet_keys:
log.warning(
"Duplicate OpenLineage custom facets key(s) found: `%s` from function `%s`; "
"this will overwrite the previous value.",
", ".join(duplicate_facet_keys),
custom_facet_func,
)
log.debug(
"Adding OpenLineage custom facet with key(s): `%s` from function `%s`.",
tuple(facet),
custom_facet_func,
)
custom_facets.update(facet)
except Exception as exc:
log.warning(
"Error processing custom facet function `%s`; will ignore it. Error was: %s: %s",
custom_facet_func,
type(exc).__name__,
exc,
)
return custom_facets


Expand Down
89 changes: 87 additions & 2 deletions docs/apache-airflow-providers-openlineage/guides/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,100 @@ Conversion from Airflow Table entity to OpenLineage Dataset is made in the follo

.. _custom_facets:openlineage:

Custom facets
Custom Facets
=============
To learn more about facets in OpenLineage, please refer to `facet documentation <https://openlineage.io/docs/spec/facets/>`_.
Also check out `available Facets <https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
Also check out `available facets <https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_

The OpenLineage spec might not contain all the facets you need to write your extractor,
in which case you will have to make your own `custom facets <https://openlineage.io/docs/spec/facets/custom-facets>`_.
More on creating custom facets can be found `here <https://openlineage.io/blog/extending-with-facets/>`_.

Custom Run Facets
=================

You can inject your own custom facets in the lineage event's run facet using the ``custom_run_facets`` Airflow configuration.

Steps to be taken,

1. Write a function that returns the custom facet. You can write as many custom facet functions as needed.
2. Register the functions using the ``custom_run_facets`` Airflow configuration.

Once done, Airflow OpenLineage listener will automatically execute these functions during the lineage event generation
and append their return values to the run facet in the lineage event.

Writing a custom facet function
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- **Input arguments:** The function should accept the ``TaskInstance`` as an input argument.
- **Function body:** Perform the logic needed to generate the custom facet. The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` and ``_schemaURL`` to be automatically added for the facet.
- **Return value:** The custom facet to be added to the lineage event. Return type should be ``dict[str, dict]`` or ``None``. You may choose to return ``None``, if you do not want to add custom facets for certain criteria.

**Example custom facet function**

.. code-block:: python
import attrs
from airflow.models import TaskInstance
from openlineage.client.facet import BaseFacet
@attrs.define(slots=False)
class MyCustomRunFacet(BaseFacet):
"""Define a custom facet."""
name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str
def get_my_custom_facet(task_instance: TaskInstance) -> dict[str, dict] | None:
operator_name = task_instance.task.operator_name
if operator_name == "BashOperator":
return
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
)
}
Register the custom facet functions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Use the ``custom_run_facets`` Airflow configuration to register the custom run facet functions by passing
a string of semicolon separated full import path to the functions.

.. code-block:: ini
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an equivalent.

.. code-block:: ini
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
.. note::

- The custom facet functions are only executed at the start of the TaskInstance and added to the OpenLineage START event.
- Duplicate functions if registered, will be executed only once.
- When duplicate custom facet keys are returned by different functions, the last processed function will be added to the lineage event.

.. _job_hierarchy:openlineage:

Job Hierarchy
Expand Down
20 changes: 19 additions & 1 deletion docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ serializing only a few known attributes, we exclude certain non-serializable ele
Custom Extractors
^^^^^^^^^^^^^^^^^

If you use :ref:`custom Extractors <custom_extractors:openlineage>` feature, register the extractors by passing
To use :ref:`custom Extractors <custom_extractors:openlineage>` feature, register the extractors by passing
a string of semicolon separated Airflow Operators full import paths to ``extractors`` option in Airflow configuration.

.. code-block:: ini
Expand All @@ -286,6 +286,24 @@ a string of semicolon separated Airflow Operators full import paths to ``extract
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
Custom Run Facets
^^^^^^^^^^^^^^^^^

To inject :ref:`custom run facets <custom_facets:openlineage>`, register the custom run facet functions by passing
a string of semicolon separated full import paths to ``custom_run_facets`` option in Airflow configuration.

.. code-block:: ini
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an equivalent.

.. code-block:: ini
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
Enabling OpenLineage on DAG/task level
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
26 changes: 26 additions & 0 deletions tests/providers/openlineage/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
_is_true,
config_path,
custom_extractors,
custom_run_facets,
dag_state_change_process_pool_size,
disabled_operators,
execution_timeout,
Expand All @@ -41,6 +42,7 @@
_CONFIG_SECTION = "openlineage"
_VAR_CONFIG_PATH = "OPENLINEAGE_CONFIG"
_CONFIG_OPTION_CONFIG_PATH = "config_path"
_CONFIG_OPTION_CUSTOM_RUN_FACETS = "custom_run_facets"
_VAR_DISABLE_SOURCE_CODE = "OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE"
_CONFIG_OPTION_DISABLE_SOURCE_CODE = "disable_source_code"
_CONFIG_OPTION_DISABLED_FOR_OPERATORS = "disabled_for_operators"
Expand Down Expand Up @@ -255,6 +257,30 @@ def test_extractors_do_not_fail_if_conf_option_missing():
assert custom_extractors() == set()


@conf_vars(dict())
def test_custom_run_facets_not_set():
assert custom_run_facets() == set()


def test_custom_run_facets_with_no_values():
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_RUN_FACETS): None}):
assert custom_run_facets() == set()
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_RUN_FACETS): ""}):
assert custom_run_facets() == set()


@conf_vars(
{
(
_CONFIG_SECTION,
_CONFIG_OPTION_CUSTOM_RUN_FACETS,
): " tests.my_function;; tests.my_function ; my_function_2; ",
}
)
def test_custom_run_facets():
assert custom_run_facets() == {"tests.my_function", "my_function_2"}


@env_vars({_VAR_NAMESPACE: "my_custom_namespace"})
@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_NAMESPACE): None})
def test_namespace_legacy_env_var_is_used_when_no_conf_option_set():
Expand Down
87 changes: 87 additions & 0 deletions tests/providers/openlineage/utils/custom_facet_fixture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

import attrs
from openlineage.client.facet import BaseFacet

if TYPE_CHECKING:
from airflow.models import TaskInstance


@attrs.define(slots=False)
class MyCustomRunFacet(BaseFacet):
"""Define a custom run facet."""

name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str


def get_additional_test_facet(task_instance: TaskInstance) -> dict[str, dict] | None:
operator_name = task_instance.task.operator_name if task_instance.task else None
if operator_name == "BashOperator":
return None
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
)
}


def get_duplicate_test_facet_key(task_instance: TaskInstance):
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
)
}


def get_another_test_facet(task_instance: TaskInstance):
return {"another_run_facet": {"name": "another-lineage-namespace"}}


def return_type_is_not_dict(task_instance: TaskInstance):
return "return type is not dict"


def get_custom_facet_throws_exception(task_instance: TaskInstance):
raise Exception("fake exception from custom fcet function")
Loading

0 comments on commit e30f810

Please sign in to comment.