Skip to content

Commit

Permalink
Adding user docs for custom_facet_functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Anandhi committed Jul 3, 2024
1 parent 27a55c6 commit 0d63f59
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
83 changes: 83 additions & 0 deletions docs/apache-airflow-providers-openlineage/guides/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,89 @@ The OpenLineage spec might not contain all the facets you need to write your ext
in which case you will have to make your own `custom facets <https://openlineage.io/docs/spec/facets/custom-facets>`_.
More on creating custom facets can be found `here <https://openlineage.io/blog/extending-with-facets/>`_.

Adding custom Facets in the Lineage event
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can inject your own custom facets in the lineage event's run facets using the ``custom_facet_functions`` Airflow configuration.

Steps to be taken,

1. Write a function that returns the custom facet. You can write as many custom facets functions as needed.
2. Register the function using the ``custom_facet_functions`` Airflow configuration.

Once done, Airflow OpenLineage listener will automatically execute these functions during the lineage event generation
and append their return values to the ``run facet`` in the lineage event.

Writing a custom facet functions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- **Input arguments:** The functions should accept the ``TaskInstance`` as an input argument.
- **Function body:** Perform the logic needed in order to generate the custom facet. The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` and ``_schemaURL`` to be automatically added for the facet.
- **Return value:** The custom facet to be append to the ``run facet``. Return type should be ``None`` or ``dict``.

The ``dict`` returned by the custom facet functions are append to the ``run facet`` in the lineage events generated by Airflow OpenLineage listener.
You may choose to return ``None`` or ``{}``, if you do not want to add custom facets for certain criteria.

**Example custom facet function**

.. code-block:: python
import attrs
from airflow.models import TaskInstance
from openlineage.client.facet import BaseFacet
@attrs.define(slots=False)
class MyCustomRunFacet(BaseFacet):
"""Define a custom run facet."""
name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str
def get_my_custom_facet(task_instance: TaskInstance):
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
)
}
Register the custom facet functions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Use the ``custom_facet_functions`` Airflow configuration to register the custom facet functions by passing
a string of semicolon separated full import paths to the functions.

.. code-block:: ini
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_facet_functions = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
``AIRFLOW__OPENLINEAGE__CUSTOM_FACET_FUNCTIONS`` environment variable is an equivalent.

.. code-block:: ini
AIRFLOW__OPENLINEAGE__CUSTOM_FACET_FUNCTIONS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
.. note::

- Duplicate functions if registered, will be executed only once.
- When duplicate custom facet keys are returned by different functions, the last processed function will be added to the run facet.

.. _job_hierarchy:openlineage:

Job Hierarchy
Expand Down
18 changes: 18 additions & 0 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,24 @@ a string of semicolon separated Airflow Operators full import paths to ``extract
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
Custom Facets
^^^^^^^^^^^^^

If you use :ref:`custom Facets <custom_facets:openlineage>` feature, register the custom facet functions by passing
a string of semicolon separated full import paths to ``custom_facet_functions`` option in Airflow configuration.

.. code-block:: ini
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_facet_functions = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
``AIRFLOW__OPENLINEAGE__CUSTOM_FACET_FUNCTIONS`` environment variable is an equivalent.

.. code-block:: ini
AIRFLOW__OPENLINEAGE__CUSTOM_FACET_FUNCTIONS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
Enabling OpenLineage on DAG/task level
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down

0 comments on commit 0d63f59

Please sign in to comment.