From 7f68f724068480646aa9412079097a460ba66a30 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 4 Apr 2024 23:10:52 +0200 Subject: [PATCH] [#2488] Fixed format returned by `airflow.macros.lineage_parent_id`. - Returned format: `//`. - `name` should be `.`, not a UUID. - `run_id` should be a UUID, not `.`. - Both `lineage_run_id` and `lineage_parent_id` should expose the same interface - only a `TaskInstance` object is now required as argument. - Import `_DAG_NAMESPACE` instead of inferring it again. Airflow reference: https://github.com/apache/airflow/pull/37877 Signed-off-by: Fabio Manganiello --- integration/airflow/README.md | 2 +- .../airflow/openlineage/airflow/macros.py | 29 ++++++++++--------- .../airflow/openlineage/airflow/utils.py | 18 ++++++++++-- .../integration/tests/airflow/dags/dbt_dag.py | 2 +- integration/airflow/tests/test_macros.py | 17 +++++++---- 5 files changed, 44 insertions(+), 24 deletions(-) diff --git a/integration/airflow/README.md b/integration/airflow/README.md index 5c4006c7bf..c6f307e0bf 100644 --- a/integration/airflow/README.md +++ b/integration/airflow/README.md @@ -243,7 +243,7 @@ t1 = DataProcPySparkOperator( job_name=job_name, dataproc_pyspark_properties={ 'spark.driver.extraJavaOptions': - f"-javaagent:{jar}={os.environ.get('OPENLINEAGE_URL')}/api/v1/namespaces/{os.getenv('OPENLINEAGE_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{macros.OpenLineagePlugin.lineage_run_id(task, task_instance)}}}}?api_key={os.environ.get('OPENLINEAGE_API_KEY')}" + f"-javaagent:{jar}={os.environ.get('OPENLINEAGE_URL')}/api/v1/namespaces/{os.getenv('OPENLINEAGE_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{macros.OpenLineagePlugin.lineage_run_id(task_instance)}}}}?api_key={os.environ.get('OPENLINEAGE_API_KEY')}" dag=dag) ``` diff --git a/integration/airflow/openlineage/airflow/macros.py b/integration/airflow/openlineage/airflow/macros.py index 5a0f75d231..9d16b8e584 100644 --- a/integration/airflow/openlineage/airflow/macros.py +++ b/integration/airflow/openlineage/airflow/macros.py @@ -1,18 +1,15 @@ # Copyright 2018-2024 contributors to the OpenLineage project # SPDX-License-Identifier: Apache-2.0 -import os import typing -from openlineage.airflow.adapter import OpenLineageAdapter +from openlineage.airflow.adapter import _DAG_NAMESPACE, OpenLineageAdapter if typing.TYPE_CHECKING: - from airflow.models import BaseOperator, TaskInstance + from airflow.models import TaskInstance -_JOB_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", "default") - -def lineage_run_id(task: "BaseOperator", task_instance: "TaskInstance"): +def lineage_run_id(task_instance: "TaskInstance"): """ Macro function which returns the generated run id for a given task. This can be used to forward the run id from a task to a child run so the job @@ -21,17 +18,20 @@ def lineage_run_id(task: "BaseOperator", task_instance: "TaskInstance"): PythonOperator( task_id='render_template', python_callable=my_task_function, - op_args=['{{ lineage_run_id(task, task_instance) }}'], # lineage_run_id macro invoked + op_args=['{{ lineage_run_id(task_instance) }}'], # lineage_run_id macro invoked provide_context=False, dag=dag ) """ return OpenLineageAdapter.build_task_instance_run_id( - task_instance.dag_id, task.task_id, task_instance.execution_date, task_instance.try_number + task_instance.dag_id, + task_instance.task.task_id, + task_instance.execution_date, + task_instance.try_number, ) -def lineage_parent_id(run_id: str, task: "BaseOperator", task_instance: "TaskInstance"): +def lineage_parent_id(task_instance: "TaskInstance"): """ Macro function which returns the generated job and run id for a given task. This can be used to forward the ids from a task to a child run so the job @@ -41,12 +41,15 @@ def lineage_parent_id(run_id: str, task: "BaseOperator", task_instance: "TaskIns PythonOperator( task_id='render_template', python_callable=my_task_function, - op_args=['{{ lineage_parent_id(run_id, task, task_instance) }}'], # macro invoked + op_args=['{{ lineage_parent_id(task_instance) }}'], # macro invoked provide_context=False, dag=dag ) """ - job_name = OpenLineageAdapter.build_task_instance_run_id( - task_instance.dag_id, task.task_id, task_instance.execution_date, task_instance.try_number + return "/".join( + [ + _DAG_NAMESPACE, + f"{task_instance.dag_id}.{task_instance.task.task_id}", + lineage_run_id(task_instance=task_instance), + ] ) - return f"{_JOB_NAMESPACE}/{job_name}/{run_id}" diff --git a/integration/airflow/openlineage/airflow/utils.py b/integration/airflow/openlineage/airflow/utils.py index 493310f1e4..af9c222a95 100644 --- a/integration/airflow/openlineage/airflow/utils.py +++ b/integration/airflow/openlineage/airflow/utils.py @@ -9,7 +9,6 @@ import subprocess from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse -from uuid import uuid4 import attr from openlineage.airflow.facets import ( @@ -383,8 +382,21 @@ def get_unknown_source_attribute_run_facet(task: "BaseOperator", name: Optional[ } -def new_lineage_run_id(dag_run_id: str, task_id: str) -> str: - return str(uuid4()) +def get_unknown_source_attribute_run_facet(task: "BaseOperator", name: Optional[str] = None): + if not name: + name = get_operator_class(task).__name__ + return { + "unknownSourceAttribute": attr.asdict( + UnknownOperatorAttributeRunFacet( + unknownItems=[ + UnknownOperatorInstance( + name=name, + properties=TaskInfo(task), + ) + ] + ) + ) + } def get_dagrun_start_end(dagrun: "DagRun", dag: "DAG"): diff --git a/integration/airflow/tests/integration/tests/airflow/dags/dbt_dag.py b/integration/airflow/tests/integration/tests/airflow/dags/dbt_dag.py index 57056ba562..3903e9d4df 100644 --- a/integration/airflow/tests/integration/tests/airflow/dags/dbt_dag.py +++ b/integration/airflow/tests/integration/tests/airflow/dags/dbt_dag.py @@ -8,7 +8,7 @@ from airflow.utils.dates import days_ago from airflow.version import version as AIRFLOW_VERSION -PLUGIN_MACRO = "{{ macros.OpenLineagePlugin.lineage_parent_id(run_id, task, task_instance) }}" +PLUGIN_MACRO = "{{ macros.OpenLineagePlugin.lineage_parent_id(task_instance) }}" PROJECT_DIR = "/opt/data/dbt/testproject" diff --git a/integration/airflow/tests/test_macros.py b/integration/airflow/tests/test_macros.py index 5e9beda192..c31e730736 100644 --- a/integration/airflow/tests/test_macros.py +++ b/integration/airflow/tests/test_macros.py @@ -10,33 +10,38 @@ def test_lineage_run_id(): task_instance = mock.MagicMock( dag_id="dag_id", + task=mock.MagicMock(task_id="task_id"), execution_date="execution_date", try_number=1, ) - task = mock.MagicMock(task_id="task_id") - actual = lineage_run_id(task=task, task_instance=task_instance) + + actual = lineage_run_id(task_instance) expected = str( uuid.uuid3( uuid.NAMESPACE_URL, f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1", ) ) + assert actual == expected def test_lineage_parent_id(): task_instance = mock.MagicMock( dag_id="dag_id", + task=mock.MagicMock(task_id="task_id"), execution_date="execution_date", try_number=1, ) - task = mock.MagicMock(task_id="task_id") - actual = lineage_parent_id(run_id="run_id", task_instance=task_instance, task=task) - job_name = str( + + actual = lineage_parent_id(task_instance) + job_name = f"{task_instance.dag_id}.{task_instance.task.task_id}" + run_id = str( uuid.uuid3( uuid.NAMESPACE_URL, f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1", ) ) - expected = f"{_DAG_NAMESPACE}/{job_name}/run_id" + + expected = f"{_DAG_NAMESPACE}/{job_name}/{run_id}" assert actual == expected