Skip to content

Commit

Permalink
[OpenLineage#2488] Fixed format returned by `airflow.macros.lineage_p…
Browse files Browse the repository at this point in the history
…arent_id`.

- Returned format: `<namespace>/<name>/<run_id>`.

- `name` should be `<dag_id>.<task_id>`, not a UUID.

- `run_id` should be a UUID, not `<run_timestamp>.<try_number>`.

- Both `lineage_run_id` and `lineage_parent_id` should expose the same
  interface - only a `TaskInstance` object is now required as argument.

- Import `_DAG_NAMESPACE` instead of inferring it again.

Airflow reference: apache/airflow#37877

Signed-off-by: Fabio Manganiello <[email protected]>
  • Loading branch information
blacklight committed Apr 4, 2024
1 parent 468ca67 commit 7f68f72
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 24 deletions.
2 changes: 1 addition & 1 deletion integration/airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ t1 = DataProcPySparkOperator(
job_name=job_name,
dataproc_pyspark_properties={
'spark.driver.extraJavaOptions':
f"-javaagent:{jar}={os.environ.get('OPENLINEAGE_URL')}/api/v1/namespaces/{os.getenv('OPENLINEAGE_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{macros.OpenLineagePlugin.lineage_run_id(task, task_instance)}}}}?api_key={os.environ.get('OPENLINEAGE_API_KEY')}"
f"-javaagent:{jar}={os.environ.get('OPENLINEAGE_URL')}/api/v1/namespaces/{os.getenv('OPENLINEAGE_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{macros.OpenLineagePlugin.lineage_run_id(task_instance)}}}}?api_key={os.environ.get('OPENLINEAGE_API_KEY')}"
dag=dag)
```

Expand Down
29 changes: 16 additions & 13 deletions integration/airflow/openlineage/airflow/macros.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
# Copyright 2018-2024 contributors to the OpenLineage project
# SPDX-License-Identifier: Apache-2.0

import os
import typing

from openlineage.airflow.adapter import OpenLineageAdapter
from openlineage.airflow.adapter import _DAG_NAMESPACE, OpenLineageAdapter

if typing.TYPE_CHECKING:
from airflow.models import BaseOperator, TaskInstance
from airflow.models import TaskInstance

_JOB_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", "default")


def lineage_run_id(task: "BaseOperator", task_instance: "TaskInstance"):
def lineage_run_id(task_instance: "TaskInstance"):
"""
Macro function which returns the generated run id for a given task. This
can be used to forward the run id from a task to a child run so the job
Expand All @@ -21,17 +18,20 @@ def lineage_run_id(task: "BaseOperator", task_instance: "TaskInstance"):
PythonOperator(
task_id='render_template',
python_callable=my_task_function,
op_args=['{{ lineage_run_id(task, task_instance) }}'], # lineage_run_id macro invoked
op_args=['{{ lineage_run_id(task_instance) }}'], # lineage_run_id macro invoked
provide_context=False,
dag=dag
)
"""
return OpenLineageAdapter.build_task_instance_run_id(
task_instance.dag_id, task.task_id, task_instance.execution_date, task_instance.try_number
task_instance.dag_id,
task_instance.task.task_id,
task_instance.execution_date,
task_instance.try_number,
)


def lineage_parent_id(run_id: str, task: "BaseOperator", task_instance: "TaskInstance"):
def lineage_parent_id(task_instance: "TaskInstance"):
"""
Macro function which returns the generated job and run id for a given task. This
can be used to forward the ids from a task to a child run so the job
Expand All @@ -41,12 +41,15 @@ def lineage_parent_id(run_id: str, task: "BaseOperator", task_instance: "TaskIns
PythonOperator(
task_id='render_template',
python_callable=my_task_function,
op_args=['{{ lineage_parent_id(run_id, task, task_instance) }}'], # macro invoked
op_args=['{{ lineage_parent_id(task_instance) }}'], # macro invoked
provide_context=False,
dag=dag
)
"""
job_name = OpenLineageAdapter.build_task_instance_run_id(
task_instance.dag_id, task.task_id, task_instance.execution_date, task_instance.try_number
return "/".join(
[
_DAG_NAMESPACE,
f"{task_instance.dag_id}.{task_instance.task.task_id}",
lineage_run_id(task_instance=task_instance),
]
)
return f"{_JOB_NAMESPACE}/{job_name}/{run_id}"
18 changes: 15 additions & 3 deletions integration/airflow/openlineage/airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import subprocess
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from uuid import uuid4

import attr
from openlineage.airflow.facets import (
Expand Down Expand Up @@ -383,8 +382,21 @@ def get_unknown_source_attribute_run_facet(task: "BaseOperator", name: Optional[
}


def new_lineage_run_id(dag_run_id: str, task_id: str) -> str:
return str(uuid4())
def get_unknown_source_attribute_run_facet(task: "BaseOperator", name: Optional[str] = None):
if not name:
name = get_operator_class(task).__name__
return {
"unknownSourceAttribute": attr.asdict(
UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name=name,
properties=TaskInfo(task),
)
]
)
)
}


def get_dagrun_start_end(dagrun: "DagRun", dag: "DAG"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.utils.dates import days_ago
from airflow.version import version as AIRFLOW_VERSION

PLUGIN_MACRO = "{{ macros.OpenLineagePlugin.lineage_parent_id(run_id, task, task_instance) }}"
PLUGIN_MACRO = "{{ macros.OpenLineagePlugin.lineage_parent_id(task_instance) }}"


PROJECT_DIR = "/opt/data/dbt/testproject"
Expand Down
17 changes: 11 additions & 6 deletions integration/airflow/tests/test_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,38 @@
def test_lineage_run_id():
task_instance = mock.MagicMock(
dag_id="dag_id",
task=mock.MagicMock(task_id="task_id"),
execution_date="execution_date",
try_number=1,
)
task = mock.MagicMock(task_id="task_id")
actual = lineage_run_id(task=task, task_instance=task_instance)

actual = lineage_run_id(task_instance)
expected = str(
uuid.uuid3(
uuid.NAMESPACE_URL,
f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
)
)

assert actual == expected


def test_lineage_parent_id():
task_instance = mock.MagicMock(
dag_id="dag_id",
task=mock.MagicMock(task_id="task_id"),
execution_date="execution_date",
try_number=1,
)
task = mock.MagicMock(task_id="task_id")
actual = lineage_parent_id(run_id="run_id", task_instance=task_instance, task=task)
job_name = str(

actual = lineage_parent_id(task_instance)
job_name = f"{task_instance.dag_id}.{task_instance.task.task_id}"
run_id = str(
uuid.uuid3(
uuid.NAMESPACE_URL,
f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
)
)
expected = f"{_DAG_NAMESPACE}/{job_name}/run_id"

expected = f"{_DAG_NAMESPACE}/{job_name}/{run_id}"
assert actual == expected

0 comments on commit 7f68f72

Please sign in to comment.