diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index 8b35056e47fe4f..562b673ed5ed5b 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -86,6 +86,17 @@ def custom_extractors() -> set[str]: return set(extractor.strip() for extractor in option.split(";") if extractor.strip()) +@cache +def custom_run_facets() -> set[str]: + """[openlineage] custom_run_facets.""" + option = conf.get(_CONFIG_SECTION, "custom_run_facets", fallback="") + return set( + custom_facet_function.strip() + for custom_facet_function in option.split(";") + if custom_facet_function.strip() + ) + + @cache def namespace() -> str: """[openlineage] namespace.""" diff --git a/airflow/providers/openlineage/provider.yaml b/airflow/providers/openlineage/provider.yaml index cfa001fdbaae57..733b0474184860 100644 --- a/airflow/providers/openlineage/provider.yaml +++ b/airflow/providers/openlineage/provider.yaml @@ -105,6 +105,13 @@ config: example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass default: ~ version_added: ~ + custom_run_facets: + description: | + Register custom run facet functions by passing a string of semicolon separated full import paths. + type: string + example: full.path.to.custom_facet_function;full.path.to.another_custom_facet_function + default: '' + version_added: 1.10.0 config_path: description: | Specify the path to the YAML configuration file. diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 0484d11f534b47..0689ea39774e4c 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -24,7 +24,7 @@ from contextlib import redirect_stdout, suppress from functools import wraps from io import StringIO -from typing import TYPE_CHECKING, Any, Iterable +from typing import TYPE_CHECKING, Any, Callable, Iterable import attrs from deprecated import deprecated @@ -79,11 +79,47 @@ def get_job_name(task: TaskInstance) -> str: def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, Any]: + from airflow.providers.openlineage.extractors.manager import try_import_from_string + custom_facets = {} # check for -1 comes from SmartSensor compatibility with dynamic task mapping # this comes from Airflow code if hasattr(task_instance, "map_index") and getattr(task_instance, "map_index") != -1: custom_facets["airflow_mappedTask"] = AirflowMappedTaskRunFacet.from_task_instance(task_instance) + + # Append custom run facets by executing the custom_run_facet functions. + for custom_facet_func in conf.custom_run_facets(): + try: + func: Callable[[Any], dict] | None = try_import_from_string(custom_facet_func) + if not func: + log.warning( + "OpenLineage is unable to import custom facet function `%s`; will ignore it.", + custom_facet_func, + ) + continue + facet: dict[str, dict[Any, Any]] | None = func(task_instance) + if facet and isinstance(facet, dict): + duplicate_facet_keys = [facet_key for facet_key in facet.keys() if facet_key in custom_facets] + if duplicate_facet_keys: + log.warning( + "Duplicate OpenLineage custom facets key(s) found: `%s` from function `%s`; " + "this will overwrite the previous value.", + ", ".join(duplicate_facet_keys), + custom_facet_func, + ) + log.debug( + "Adding OpenLineage custom facet with key(s): `%s` from function `%s`.", + tuple(facet), + custom_facet_func, + ) + custom_facets.update(facet) + except Exception as exc: + log.warning( + "Error processing custom facet function `%s`; will ignore it. Error was: %s: %s", + custom_facet_func, + type(exc).__name__, + exc, + ) return custom_facets diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst b/docs/apache-airflow-providers-openlineage/guides/developer.rst index 2ce4ef74931f68..86f57ac3e113ec 100644 --- a/docs/apache-airflow-providers-openlineage/guides/developer.rst +++ b/docs/apache-airflow-providers-openlineage/guides/developer.rst @@ -446,15 +446,100 @@ Conversion from Airflow Table entity to OpenLineage Dataset is made in the follo .. _custom_facets:openlineage: -Custom facets +Custom Facets ============= To learn more about facets in OpenLineage, please refer to `facet documentation `_. -Also check out `available Facets `_ +Also check out `available facets `_ The OpenLineage spec might not contain all the facets you need to write your extractor, in which case you will have to make your own `custom facets `_. More on creating custom facets can be found `here `_. +Custom Run Facets +================= + +You can inject your own custom facets in the lineage event's run facet using the ``custom_run_facets`` Airflow configuration. + +Steps to be taken, + +1. Write a function that returns the custom facet. You can write as many custom facet functions as needed. +2. Register the functions using the ``custom_run_facets`` Airflow configuration. + +Once done, Airflow OpenLineage listener will automatically execute these functions during the lineage event generation +and append their return values to the run facet in the lineage event. + +Writing a custom facet function +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- **Input arguments:** The function should accept the ``TaskInstance`` as an input argument. +- **Function body:** Perform the logic needed to generate the custom facet. The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` and ``_schemaURL`` to be automatically added for the facet. +- **Return value:** The custom facet to be added to the lineage event. Return type should be ``dict[str, dict]`` or ``None``. You may choose to return ``None``, if you do not want to add custom facets for certain criteria. + +**Example custom facet function** + +.. code-block:: python + + import attrs + from airflow.models import TaskInstance + from openlineage.client.facet import BaseFacet + + + @attrs.define(slots=False) + class MyCustomRunFacet(BaseFacet): + """Define a custom facet.""" + + name: str + jobState: str + uniqueName: str + displayName: str + dagId: str + taskId: str + cluster: str + + + def get_my_custom_facet(task_instance: TaskInstance) -> dict[str, dict] | None: + operator_name = task_instance.task.operator_name + if operator_name == "BashOperator": + return + job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}" + return { + "additional_run_facet": attrs.asdict( + MyCustomRunFacet( + name="test-lineage-namespace", + jobState=task_instance.state, + uniqueName=job_unique_name, + displayName=f"{task_instance.dag_id}.{task_instance.task_id}", + dagId=task_instance.dag_id, + taskId=task_instance.task_id, + cluster="TEST", + ) + ) + } + +Register the custom facet functions +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the ``custom_run_facets`` Airflow configuration to register the custom run facet functions by passing +a string of semicolon separated full import path to the functions. + +.. code-block:: ini + + [openlineage] + transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"} + custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function + +``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an equivalent. + +.. code-block:: ini + + AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function' + +.. note:: + + - The custom facet functions are only executed at the start of the TaskInstance and added to the OpenLineage START event. + - Duplicate functions if registered, will be executed only once. + - When duplicate custom facet keys are returned by different functions, the last processed function will be added to the lineage event. + .. _job_hierarchy:openlineage: Job Hierarchy diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst b/docs/apache-airflow-providers-openlineage/guides/user.rst index 437da6d0fac19a..19b8ef9d789e9f 100644 --- a/docs/apache-airflow-providers-openlineage/guides/user.rst +++ b/docs/apache-airflow-providers-openlineage/guides/user.rst @@ -271,7 +271,7 @@ serializing only a few known attributes, we exclude certain non-serializable ele Custom Extractors ^^^^^^^^^^^^^^^^^ -If you use :ref:`custom Extractors ` feature, register the extractors by passing +To use :ref:`custom Extractors ` feature, register the extractors by passing a string of semicolon separated Airflow Operators full import paths to ``extractors`` option in Airflow configuration. .. code-block:: ini @@ -286,6 +286,24 @@ a string of semicolon separated Airflow Operators full import paths to ``extract AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass' +Custom Run Facets +^^^^^^^^^^^^^^^^^ + +To inject :ref:`custom run facets `, register the custom run facet functions by passing +a string of semicolon separated full import paths to ``custom_run_facets`` option in Airflow configuration. + +.. code-block:: ini + + [openlineage] + transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"} + custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function + +``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an equivalent. + +.. code-block:: ini + + AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function' + Enabling OpenLineage on DAG/task level ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/tests/providers/openlineage/test_conf.py b/tests/providers/openlineage/test_conf.py index 3ee606dbdeea26..7eeea35db73aa3 100644 --- a/tests/providers/openlineage/test_conf.py +++ b/tests/providers/openlineage/test_conf.py @@ -26,6 +26,7 @@ _is_true, config_path, custom_extractors, + custom_run_facets, dag_state_change_process_pool_size, disabled_operators, execution_timeout, @@ -41,6 +42,7 @@ _CONFIG_SECTION = "openlineage" _VAR_CONFIG_PATH = "OPENLINEAGE_CONFIG" _CONFIG_OPTION_CONFIG_PATH = "config_path" +_CONFIG_OPTION_CUSTOM_RUN_FACETS = "custom_run_facets" _VAR_DISABLE_SOURCE_CODE = "OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE" _CONFIG_OPTION_DISABLE_SOURCE_CODE = "disable_source_code" _CONFIG_OPTION_DISABLED_FOR_OPERATORS = "disabled_for_operators" @@ -255,6 +257,30 @@ def test_extractors_do_not_fail_if_conf_option_missing(): assert custom_extractors() == set() +@conf_vars(dict()) +def test_custom_run_facets_not_set(): + assert custom_run_facets() == set() + + +def test_custom_run_facets_with_no_values(): + with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_RUN_FACETS): None}): + assert custom_run_facets() == set() + with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_RUN_FACETS): ""}): + assert custom_run_facets() == set() + + +@conf_vars( + { + ( + _CONFIG_SECTION, + _CONFIG_OPTION_CUSTOM_RUN_FACETS, + ): " tests.my_function;; tests.my_function ; my_function_2; ", + } +) +def test_custom_run_facets(): + assert custom_run_facets() == {"tests.my_function", "my_function_2"} + + @env_vars({_VAR_NAMESPACE: "my_custom_namespace"}) @conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_NAMESPACE): None}) def test_namespace_legacy_env_var_is_used_when_no_conf_option_set(): diff --git a/tests/providers/openlineage/utils/custom_facet_fixture.py b/tests/providers/openlineage/utils/custom_facet_fixture.py new file mode 100644 index 00000000000000..5a051218e2e9d7 --- /dev/null +++ b/tests/providers/openlineage/utils/custom_facet_fixture.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import attrs +from openlineage.client.facet import BaseFacet + +if TYPE_CHECKING: + from airflow.models import TaskInstance + + +@attrs.define(slots=False) +class MyCustomRunFacet(BaseFacet): + """Define a custom run facet.""" + + name: str + jobState: str + uniqueName: str + displayName: str + dagId: str + taskId: str + cluster: str + + +def get_additional_test_facet(task_instance: TaskInstance) -> dict[str, dict] | None: + operator_name = task_instance.task.operator_name if task_instance.task else None + if operator_name == "BashOperator": + return None + job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}" + return { + "additional_run_facet": attrs.asdict( + MyCustomRunFacet( + name="test-lineage-namespace", + jobState=task_instance.state, + uniqueName=job_unique_name, + displayName=f"{task_instance.dag_id}.{task_instance.task_id}", + dagId=task_instance.dag_id, + taskId=task_instance.task_id, + cluster="TEST", + ) + ) + } + + +def get_duplicate_test_facet_key(task_instance: TaskInstance): + job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}" + return { + "additional_run_facet": attrs.asdict( + MyCustomRunFacet( + name="test-lineage-namespace", + jobState=task_instance.state, + uniqueName=job_unique_name, + displayName=f"{task_instance.dag_id}.{task_instance.task_id}", + dagId=task_instance.dag_id, + taskId=task_instance.task_id, + cluster="TEST", + ) + ) + } + + +def get_another_test_facet(task_instance: TaskInstance): + return {"another_run_facet": {"name": "another-lineage-namespace"}} + + +def return_type_is_not_dict(task_instance: TaskInstance): + return "return type is not dict" + + +def get_custom_facet_throws_exception(task_instance: TaskInstance): + raise Exception("fake exception from custom fcet function") diff --git a/tests/providers/openlineage/utils/test_utils.py b/tests/providers/openlineage/utils/test_utils.py index 381743141a07ad..d3a9d89445ffa6 100644 --- a/tests/providers/openlineage/utils/test_utils.py +++ b/tests/providers/openlineage/utils/test_utils.py @@ -18,12 +18,13 @@ from __future__ import annotations import datetime -from unittest.mock import MagicMock +from unittest.mock import ANY, MagicMock, patch from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import BaseOperator from airflow.models.mappedoperator import MappedOperator +from airflow.models.taskinstance import TaskInstance from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator @@ -34,6 +35,7 @@ _get_tasks_details, _safe_get_dag_tree_view, get_airflow_job_facet, + get_custom_facets, get_fully_qualified_class_name, get_job_name, get_operator_class, @@ -482,3 +484,169 @@ def test_get_task_groups_details_nested(): def test_get_task_groups_details_no_task_groups(): assert _get_task_groups_details(DAG("test_dag", start_date=datetime.datetime(2024, 6, 1))) == {} + + +@patch("airflow.providers.openlineage.conf.custom_run_facets", return_value=set()) +def test_get_custom_facets_with_no_function_definition(mock_custom_facet_funcs): + sample_ti = TaskInstance( + task=EmptyOperator( + task_id="test-task", dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)) + ), + state="running", + ) + result = get_custom_facets(sample_ti) + assert result == {} + + +@patch( + "airflow.providers.openlineage.conf.custom_run_facets", + return_value={"tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet"}, +) +def test_get_custom_facets_with_function_definition(mock_custom_facet_funcs): + sample_ti = TaskInstance( + task=EmptyOperator( + task_id="test-task", dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)) + ), + state="running", + ) + result = get_custom_facets(sample_ti) + assert result == { + "additional_run_facet": { + "_producer": ANY, + "_schemaURL": ANY, + "name": "test-lineage-namespace", + "jobState": "running", + "uniqueName": "TEST.test-dag.test-task", + "displayName": "test-dag.test-task", + "dagId": "test-dag", + "taskId": "test-task", + "cluster": "TEST", + } + } + + +@patch( + "airflow.providers.openlineage.conf.custom_run_facets", + return_value={ + "tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet", + }, +) +def test_get_custom_facets_with_return_value_as_none(mock_custom_facet_funcs): + sample_ti = TaskInstance( + task=BashOperator( + task_id="test-task", + bash_command="exit 0;", + dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)), + ), + state="running", + ) + result = get_custom_facets(sample_ti) + assert result == {} + + +@patch( + "airflow.providers.openlineage.conf.custom_run_facets", + return_value={ + "invalid_function", + "tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet", + "tests.providers.openlineage.utils.custom_facet_fixture.return_type_is_not_dict", + "tests.providers.openlineage.utils.custom_facet_fixture.get_another_test_facet", + }, +) +def test_get_custom_facets_with_multiple_function_definition(mock_custom_facet_funcs): + sample_ti = TaskInstance( + task=EmptyOperator( + task_id="test-task", dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)) + ), + state="running", + ) + result = get_custom_facets(sample_ti) + assert result == { + "additional_run_facet": { + "_producer": ANY, + "_schemaURL": ANY, + "name": "test-lineage-namespace", + "jobState": "running", + "uniqueName": "TEST.test-dag.test-task", + "displayName": "test-dag.test-task", + "dagId": "test-dag", + "taskId": "test-task", + "cluster": "TEST", + }, + "another_run_facet": {"name": "another-lineage-namespace"}, + } + + +@patch( + "airflow.providers.openlineage.conf.custom_run_facets", + return_value={ + "tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet", + "tests.providers.openlineage.utils.custom_facet_fixture.get_duplicate_test_facet_key", + }, +) +def test_get_custom_facets_with_duplicate_facet_keys(mock_custom_facet_funcs): + sample_ti = TaskInstance( + task=EmptyOperator( + task_id="test-task", dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)) + ), + state="running", + ) + result = get_custom_facets(sample_ti) + assert result == { + "additional_run_facet": { + "_producer": ANY, + "_schemaURL": ANY, + "name": "test-lineage-namespace", + "jobState": "running", + "uniqueName": "TEST.test-dag.test-task", + "displayName": "test-dag.test-task", + "dagId": "test-dag", + "taskId": "test-task", + "cluster": "TEST", + } + } + + +@patch( + "airflow.providers.openlineage.conf.custom_run_facets", + return_value={"invalid_function"}, +) +def test_get_custom_facets_with_invalid_function_definition(mock_custom_facet_funcs): + sample_ti = TaskInstance( + task=EmptyOperator( + task_id="test-task", dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)) + ), + state="running", + ) + result = get_custom_facets(sample_ti) + assert result == {} + + +@patch( + "airflow.providers.openlineage.conf.custom_run_facets", + return_value={"tests.providers.openlineage.utils.custom_facet_fixture.return_type_is_not_dict"}, +) +def test_get_custom_facets_with_wrong_return_type_function(mock_custom_facet_funcs): + sample_ti = TaskInstance( + task=EmptyOperator( + task_id="test-task", dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)) + ), + state="running", + ) + result = get_custom_facets(sample_ti) + assert result == {} + + +@patch( + "airflow.providers.openlineage.conf.custom_run_facets", + return_value={"tests.providers.openlineage.utils.custom_facet_fixture.get_custom_facet_throws_exception"}, +) +def test_get_custom_facets_with_exception(mock_custom_facet_funcs): + sample_ti = TaskInstance( + task=EmptyOperator( + task_id="test-task", dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)) + ), + state="running", + ) + result = get_custom_facets(sample_ti) + assert result == {}