Skip to content

Commit

Permalink
Duplicate facet key check
Browse files Browse the repository at this point in the history
  • Loading branch information
Anandhi committed May 16, 2024
1 parent ec33047 commit 03ed172
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 7 deletions.
27 changes: 20 additions & 7 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,20 @@ def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, An
# Append custom run facets by executing the custom_facet_functions.
for custom_facet_func in conf.custom_facet_functions():
func: type[function] | None = try_import_from_string(custom_facet_func)
if not func:
log.warning("Unable to import `%s`, will ignore it.", custom_facet_func)
facet = func(task_instance) if func else None
if facet and isinstance(facet, dict):
custom_facets.update(facet)
duplicate_facet_keys = [facet_key for facet_key in facet.keys() if facet_key in custom_facets]
if duplicate_facet_keys:
log.warning(
"Got duplicate facets key(s), `%s` from `%s`, will ignore it.",
", ".join(duplicate_facet_keys),
custom_facet_func,
)
else:
log.info(f"Appending custom facets from {custom_facet_func}.")
custom_facets.update(facet)
return custom_facets


Expand Down Expand Up @@ -194,9 +205,9 @@ class TaskInstanceInfo(InfoJsonEncodable):

includes = ["duration", "try_number", "pool"]
casts = {
"map_index": lambda ti: ti.map_index
if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1
else None
"map_index": lambda ti: (
ti.map_index if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1 else None
)
}


Expand Down Expand Up @@ -235,9 +246,11 @@ class TaskInfo(InfoJsonEncodable):
]
casts = {
"operator_class": lambda task: task.task_type,
"task_group": lambda task: TaskGroupInfo(task.task_group)
if hasattr(task, "task_group") and getattr(task.task_group, "_group_id", None)
else None,
"task_group": lambda task: (
TaskGroupInfo(task.task_group)
if hasattr(task, "task_group") and getattr(task.task_group, "_group_id", None)
else None
),
}


Expand Down
17 changes: 17 additions & 0 deletions tests/providers/openlineage/utils/custom_facet_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ def get_additional_test_facet(task_instance: TaskInstance):
}


def get_duplicate_test_facet_key(task_instance: TaskInstance):
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": attrs.asdict(
MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
)
)
}


def get_another_test_facet(task_instance: TaskInstance):
return {"another_run_facet": {"name": "another-lineage-namespace"}}

Expand Down
26 changes: 26 additions & 0 deletions tests/providers/openlineage/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,32 @@ def test_get_custom_facets_with_multiple_function_definition():
}


@conf_vars(
{
(
"openlineage",
"custom_facet_functions",
): "tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet;"
"tests.providers.openlineage.utils.custom_facet_fixture.get_duplicate_test_facet_key"
}
)
def test_get_custom_facets_with_duplicate_facet_keys():
result = get_custom_facets(SAMPLE_TI)
assert result == {
"additional_run_facet": {
"_producer": f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"name": "test-lineage-namespace",
"jobState": "running",
"uniqueName": "TEST.test-dag.test-task",
"displayName": "test-dag.test-task",
"dagId": "test-dag",
"taskId": "test-task",
"cluster": "TEST",
}
}


@conf_vars(
{("openlineage", "custom_facet_functions"): "invalid_function"},
)
Expand Down

0 comments on commit 03ed172

Please sign in to comment.