Skip to content

Commit

Permalink
Fixes after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Anandhi authored and eladkal committed Jun 30, 2024
1 parent 6406690 commit 1eea113
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 50 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, An
if not func:
log.warning("OpenLineage is unable to import custom facet function `%s`; will ignore it.", custom_facet_func)
continue
facets: dict[str, BaseFacet] = func(task_instance)
facet: dict[str, BaseFacet] = func(task_instance)
if facet and isinstance(facet, dict):
duplicate_facet_keys = [facet_key for facet_key in facet.keys() if facet_key in custom_facets]
if duplicate_facet_keys:
Expand Down
26 changes: 26 additions & 0 deletions tests/providers/openlineage/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
_safe_int_convert,
config_path,
custom_extractors,
custom_facet_functions,
dag_state_change_process_pool_size,
disabled_operators,
is_disabled,
Expand All @@ -39,6 +40,7 @@
_CONFIG_SECTION = "openlineage"
_VAR_CONFIG_PATH = "OPENLINEAGE_CONFIG"
_CONFIG_OPTION_CONFIG_PATH = "config_path"
_CONFIG_OPTION_CUSTOM_FACET_FUNCTIONS = "custom_facet_functions"
_VAR_DISABLE_SOURCE_CODE = "OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE"
_CONFIG_OPTION_DISABLE_SOURCE_CODE = "disable_source_code"
_CONFIG_OPTION_DISABLED_FOR_OPERATORS = "disabled_for_operators"
Expand Down Expand Up @@ -276,6 +278,30 @@ def test_extractors_do_not_fail_if_conf_option_missing():
assert custom_extractors() == set()


@conf_vars(dict())
def test_custom_facet_functions_not_set():
assert custom_facet_functions() == set()


def test_custom_facet_functions_with_no_values():
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_FACET_FUNCTIONS): None}):
assert custom_facet_functions() == set()
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_FACET_FUNCTIONS): ""}):
assert custom_facet_functions() == set()


@conf_vars(
{
(
_CONFIG_SECTION,
_CONFIG_OPTION_CUSTOM_FACET_FUNCTIONS,
): " tests.my_function;; tests.my_function ; my_function_2; ",
}
)
def test_custom_facet_functions():
assert custom_facet_functions() == {"tests.my_function", "my_function_2"}


@env_vars({_VAR_NAMESPACE: "my_custom_namespace"})
@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_NAMESPACE): None})
def test_namespace_legacy_env_var_is_used_when_no_conf_option_set():
Expand Down
90 changes: 41 additions & 49 deletions tests/providers/openlineage/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
from __future__ import annotations

import datetime
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, patch, ANY

from airflow import DAG
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION
from airflow.providers.openlineage.plugins.facets import AirflowJobFacet
from airflow.providers.openlineage.utils.utils import (
_get_parsed_dag_tree,
Expand All @@ -42,7 +41,6 @@
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.task_group import TaskGroup
from tests.test_utils.mock_operators import MockOperator
from tests.test_utils.config import conf_vars


class CustomOperatorForTest(BashOperator):
Expand Down Expand Up @@ -463,8 +461,8 @@ def test_get_task_groups_details_no_task_groups():
assert _get_task_groups_details(DAG("test_dag", start_date=datetime.datetime(2024, 6, 1))) == {}


@patch.dict("os.environ", {})
def test_get_custom_facets_with_no_function_definition():
@patch("airflow.providers.openlineage.conf.custom_facet_functions", return_value=set())
def test_get_custom_facets_with_no_function_definition(mock_custom_facet_funcs):
sample_ti = TaskInstance(
task=EmptyOperator(task_id="test-task", dag=DAG("test-dag")),
state="running",
Expand All @@ -473,24 +471,20 @@ def test_get_custom_facets_with_no_function_definition():
assert result == {}


@conf_vars(
{
(
"openlineage",
"custom_facet_functions",
): "tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet"
}
@patch(
"airflow.providers.openlineage.conf.custom_facet_functions",
return_value={"tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet"},
)
def test_get_custom_facets_with_function_definition():
def test_get_custom_facets_with_function_definition(mock_custom_facet_funcs):
sample_ti = TaskInstance(
task=EmptyOperator(task_id="test-task", dag=DAG("test-dag")),
state="running",
)
result = get_custom_facets(sample_ti)
assert result == {
"additional_run_facet": {
"_producer": f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"_producer": ANY,
"_schemaURL": ANY,
"name": "test-lineage-namespace",
"jobState": "running",
"uniqueName": "TEST.test-dag.test-task",
Expand All @@ -502,26 +496,25 @@ def test_get_custom_facets_with_function_definition():
}


@conf_vars(
{
(
"openlineage",
"custom_facet_functions",
): "tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet; ;"
"invalid_function; tests.providers.openlineage.utils.custom_facet_fixture.return_type_is_not_dict;"
" tests.providers.openlineage.utils.custom_facet_fixture.get_another_test_facet "
@patch(
"airflow.providers.openlineage.conf.custom_facet_functions",
return_value={
"invalid_function",
"tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet",
"tests.providers.openlineage.utils.custom_facet_fixture.return_type_is_not_dict",
"tests.providers.openlineage.utils.custom_facet_fixture.get_another_test_facet",
},
)
def test_get_custom_facets_with_multiple_function_definition():
def test_get_custom_facets_with_multiple_function_definition(mock_custom_facet_funcs):
sample_ti = TaskInstance(
task=EmptyOperator(task_id="test-task", dag=DAG("test-dag")),
state="running",
)
result = get_custom_facets(sample_ti)
assert result == {
"additional_run_facet": {
"_producer": f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"_producer": ANY,
"_schemaURL": ANY,
"name": "test-lineage-namespace",
"jobState": "running",
"uniqueName": "TEST.test-dag.test-task",
Expand All @@ -534,21 +527,23 @@ def test_get_custom_facets_with_multiple_function_definition():
}


@conf_vars(
{
(
"openlineage",
"custom_facet_functions",
): "tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet;"
"tests.providers.openlineage.utils.custom_facet_fixture.get_duplicate_test_facet_key"
}
@patch(
"airflow.providers.openlineage.conf.custom_facet_functions",
return_value={
"tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet",
"tests.providers.openlineage.utils.custom_facet_fixture.get_duplicate_test_facet_key",
},
)
def test_get_custom_facets_with_duplicate_facet_keys():
result = get_custom_facets(SAMPLE_TI)
def test_get_custom_facets_with_duplicate_facet_keys(mock_custom_facet_funcs):
sample_ti = TaskInstance(
task=EmptyOperator(task_id="test-task", dag=DAG("test-dag")),
state="running",
)
result = get_custom_facets(sample_ti)
assert result == {
"additional_run_facet": {
"_producer": f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"_producer": ANY,
"_schemaURL": ANY,
"name": "test-lineage-namespace",
"jobState": "running",
"uniqueName": "TEST.test-dag.test-task",
Expand All @@ -560,10 +555,11 @@ def test_get_custom_facets_with_duplicate_facet_keys():
}


@conf_vars(
{("openlineage", "custom_facet_functions"): "invalid_function"},
@patch(
"airflow.providers.openlineage.conf.custom_facet_functions",
return_value={"invalid_function"},
)
def test_get_custom_facets_with_invalid_function_definition():
def test_get_custom_facets_with_invalid_function_definition(mock_custom_facet_funcs):
sample_ti = TaskInstance(
task=EmptyOperator(task_id="test-task", dag=DAG("test-dag")),
state="running",
Expand All @@ -572,15 +568,11 @@ def test_get_custom_facets_with_invalid_function_definition():
assert result == {}


@conf_vars(
{
(
"openlineage",
"custom_facet_functions",
): "tests.providers.openlineage.utils.custom_facet_fixture.return_type_is_not_dict"
},
@patch(
"airflow.providers.openlineage.conf.custom_facet_functions",
return_value={"tests.providers.openlineage.utils.custom_facet_fixture.return_type_is_not_dict"},
)
def test_get_custom_facets_with_wrong_return_type_function():
def test_get_custom_facets_with_wrong_return_type_function(mock_custom_facet_funcs):
sample_ti = TaskInstance(
task=EmptyOperator(task_id="test-task", dag=DAG("test-dag")),
state="running",
Expand Down

0 comments on commit 1eea113

Please sign in to comment.