diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 4d8a48e04fb68..fa007391497d1 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2847,6 +2847,13 @@ components:
type: string
readOnly: true
description: The ID of the DAG.
+ dag_display_name:
+ type: string
+ readOnly: true
+ description: |
+ Human centric display text for the DAG.
+
+ *New in version 2.9.0*
root_dag_id:
type: string
readOnly: true
@@ -3570,6 +3577,12 @@ components:
properties:
task_id:
type: string
+ task_display_name:
+ type: string
+ description: |
+ Human centric display text for the task.
+
+ *New in version 2.9.0*
dag_id:
type: string
dag_run_id:
@@ -3925,6 +3938,9 @@ components:
task_id:
type: string
readOnly: true
+ task_display_name:
+ type: string
+ readOnly: true
owner:
type: string
readOnly: true
diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py
index 0bffdcf685784..9add4f416cc00 100644
--- a/airflow/api_connexion/schemas/dag_schema.py
+++ b/airflow/api_connexion/schemas/dag_schema.py
@@ -50,6 +50,7 @@ class Meta:
model = DagModel
dag_id = auto_field(dump_only=True)
+ dag_display_name = fields.String(attribute="dag_display_name", dump_only=True)
root_dag_id = auto_field(dump_only=True)
is_paused = auto_field()
is_active = auto_field(dump_only=True)
diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py
index 8f0fc0533520d..4777b8bd4c577 100644
--- a/airflow/api_connexion/schemas/task_instance_schema.py
+++ b/airflow/api_connexion/schemas/task_instance_schema.py
@@ -55,6 +55,7 @@ class Meta:
state = TaskInstanceStateField()
_try_number = auto_field(data_key="try_number")
max_tries = auto_field()
+ task_display_name = fields.String(attribute="task_display_name", dump_only=True)
hostname = auto_field()
unixname = auto_field()
pool = auto_field()
diff --git a/airflow/api_connexion/schemas/task_schema.py b/airflow/api_connexion/schemas/task_schema.py
index ac1b465bb25b0..8d8f1e33f2985 100644
--- a/airflow/api_connexion/schemas/task_schema.py
+++ b/airflow/api_connexion/schemas/task_schema.py
@@ -39,6 +39,7 @@ class TaskSchema(Schema):
class_ref = fields.Method("_get_class_reference", dump_only=True)
operator_name = fields.Method("_get_operator_name", dump_only=True)
task_id = fields.String(dump_only=True)
+ task_display_name = fields.String(attribute="task_display_name", dump_only=True)
owner = fields.String(dump_only=True)
start_date = fields.DateTime(dump_only=True)
end_date = fields.DateTime(dump_only=True)
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index 137cd3827e392..5ea8f3a6a219e 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -326,6 +326,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"""Return a dagbag dag details dict."""
return {
"dag_id": dag.dag_id,
+ "dag_display_name": dag.dag_display_name,
"root_dag_id": dag.parent_dag.dag_id if dag.parent_dag else None,
"is_paused": dag.get_is_paused(),
"is_active": dag.get_is_active(),
diff --git a/airflow/example_dags/example_display_name.py b/airflow/example_dags/example_display_name.py
new file mode 100644
index 0000000000000..b6b447d632a6a
--- /dev/null
+++ b/airflow/example_dags/example_display_name.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pendulum
+
+from airflow.decorators import dag, task
+from airflow.operators.empty import EmptyOperator
+
+
+# [START dag_decorator_usage]
+@dag(
+ schedule=None,
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ catchup=False,
+ tags=["example"],
+ dag_display_name="Sample DAG with Display Name",
+)
+def example_display_name():
+ sample_task_1 = EmptyOperator(
+ task_id="sample_task_1",
+ task_display_name="Sample Task 1",
+ )
+
+ @task(task_display_name="Sample Task 2")
+ def sample_task_2():
+ pass
+
+ sample_task_1 >> sample_task_2()
+
+
+example_dag = example_display_name()
+# [END dag_decorator_usage]
diff --git a/airflow/example_dags/example_params_trigger_ui.py b/airflow/example_dags/example_params_trigger_ui.py
index da28a9cc9bc39..2a1e6c9b34e13 100644
--- a/airflow/example_dags/example_params_trigger_ui.py
+++ b/airflow/example_dags/example_params_trigger_ui.py
@@ -37,12 +37,13 @@
with DAG(
dag_id=Path(__file__).stem,
+ dag_display_name="Params Trigger UI",
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime.datetime(2022, 3, 4),
catchup=False,
- tags=["example_ui"],
+ tags=["example", "params"],
params={
"names": Param(
["Linda", "Martha", "Thomas"],
@@ -57,7 +58,7 @@
},
) as dag:
- @task(task_id="get_names")
+ @task(task_id="get_names", task_display_name="Get names")
def get_names(**kwargs) -> list[str]:
ti: TaskInstance = kwargs["ti"]
dag_run: DagRun = ti.dag_run
@@ -66,7 +67,7 @@ def get_names(**kwargs) -> list[str]:
return []
return dag_run.conf["names"]
- @task.branch(task_id="select_languages")
+ @task.branch(task_id="select_languages", task_display_name="Select languages")
def select_languages(**kwargs) -> list[str]:
ti: TaskInstance = kwargs["ti"]
dag_run: DagRun = ti.dag_run
@@ -76,19 +77,19 @@ def select_languages(**kwargs) -> list[str]:
selected_languages.append(f"generate_{lang}_greeting")
return selected_languages
- @task(task_id="generate_english_greeting")
+ @task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
def generate_english_greeting(name: str) -> str:
return f"Hello {name}!"
- @task(task_id="generate_german_greeting")
+ @task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
def generate_german_greeting(name: str) -> str:
return f"Sehr geehrter Herr/Frau {name}."
- @task(task_id="generate_french_greeting")
+ @task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
def generate_french_greeting(name: str) -> str:
return f"Bonjour {name}!"
- @task(task_id="print_greetings", trigger_rule=TriggerRule.ALL_DONE)
+ @task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
def print_greetings(greetings1, greetings2, greetings3) -> None:
for g in greetings1 or []:
print(g)
diff --git a/airflow/example_dags/example_params_ui_tutorial.py b/airflow/example_dags/example_params_ui_tutorial.py
index f4c85c9e430c8..1f293415d15a1 100644
--- a/airflow/example_dags/example_params_ui_tutorial.py
+++ b/airflow/example_dags/example_params_ui_tutorial.py
@@ -33,12 +33,13 @@
with DAG(
dag_id=Path(__file__).stem,
+ dag_display_name="Params UI tutorial",
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime.datetime(2022, 3, 4),
catchup=False,
- tags=["example_ui"],
+ tags=["example", "params", "ui"],
params={
# Let's start simple: Standard dict values are detected from type and offered as entry form fields.
# Detected types are numbers, text, boolean, lists and dicts.
@@ -237,7 +238,7 @@
},
) as dag:
- @task
+ @task(task_display_name="Show used parameters")
def show_params(**kwargs) -> None:
params: ParamsDict = kwargs["params"]
print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")
diff --git a/airflow/migrations/versions/0139_2_9_0_add_display_name_for_dag_and_task_.py b/airflow/migrations/versions/0139_2_9_0_add_display_name_for_dag_and_task_.py
new file mode 100644
index 0000000000000..854eee8e7f82c
--- /dev/null
+++ b/airflow/migrations/versions/0139_2_9_0_add_display_name_for_dag_and_task_.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add display name for dag and task instance
+
+Revision ID: ee1467d4aa35
+Revises: b4078ac230a1
+Create Date: 2024-03-24 22:33:36.824827
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+
+# revision identifiers, used by Alembic.
+revision = 'ee1467d4aa35'
+down_revision = 'b4078ac230a1'
+branch_labels = None
+depends_on = None
+airflow_version = "2.9.0"
+
+
+def upgrade():
+ """Apply add display name for dag and task instance"""
+ op.add_column("dag", sa.Column("dag_display_name", sa.String(2000), nullable=True))
+ op.add_column("task_instance", sa.Column("task_display_name", sa.String(2000), nullable=True))
+
+
+def downgrade():
+ """Unapply add display name for dag and task instance"""
+ op.drop_column("dag", "dag_display_name")
+ op.drop_column("task_instance", "task_display_name")
diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index f2d179f01ba35..380c6fcf00f3d 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -19,6 +19,7 @@
import datetime
import inspect
+from abc import abstractproperty
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Collection, Iterable, Iterator, Sequence
@@ -159,6 +160,20 @@ def dag_id(self) -> str:
def node_id(self) -> str:
return self.task_id
+ @abstractproperty
+ def task_display_name(self) -> str: ...
+
+ @property
+ def label(self) -> str | None:
+ if self.task_display_name and self.task_display_name != self.task_id:
+ return self.task_display_name
+ # Prefix handling if no display is given is cloned from taskmixin for compatibility
+ tg = self.task_group
+ if tg and tg.node_id and tg.prefix_group_id:
+ # "task_group_id.task_id" -> "task_id"
+ return self.task_id[len(tg.node_id) + 1 :]
+ return self.task_id
+
@property
def is_setup(self) -> bool:
raise NotImplementedError()
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 8636dd6c2e92d..5334fc90205aa 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -267,6 +267,7 @@ def partial(
doc_json: str | None | ArgNotSet = NOTSET,
doc_yaml: str | None | ArgNotSet = NOTSET,
doc_rst: str | None | ArgNotSet = NOTSET,
+ task_display_name: str | None | ArgNotSet = NOTSET,
logger_name: str | None | ArgNotSet = NOTSET,
allow_nested_operators: bool = True,
**kwargs,
@@ -334,6 +335,7 @@ def partial(
"doc_md": doc_md,
"doc_rst": doc_rst,
"doc_yaml": doc_yaml,
+ "task_display_name": task_display_name,
"logger_name": logger_name,
"allow_nested_operators": allow_nested_operators,
}
@@ -705,6 +707,7 @@ class derived from this one results in the creation of a task object,
that is visible in Task Instance details View in the Webserver
:param doc_yaml: Add documentation (in YAML format) or notes to your Task objects
that is visible in Task Instance details View in the Webserver
+ :param task_display_name: The display name of the task which appears on the UI.
:param logger_name: Name of the logger used by the Operator to emit logs.
If set to `None` (default), the logger name will fall back to
`airflow.task.operators.{class.__module__}.{class.__name__}` (e.g. SimpleHttpOperator will have
@@ -859,6 +862,7 @@ def __init__(
doc_json: str | None = None,
doc_yaml: str | None = None,
doc_rst: str | None = None,
+ task_display_name: str | None = None,
logger_name: str | None = None,
allow_nested_operators: bool = True,
**kwargs,
@@ -1001,6 +1005,10 @@ def __init__(
self.doc_yaml = doc_yaml
self.doc_rst = doc_rst
self.doc = doc
+ # Populate the display field only if provided and different from task id
+ self._task_display_property_value = (
+ task_display_name if task_display_name and task_display_name != task_id else None
+ )
self.upstream_task_ids: set[str] = set()
self.downstream_task_ids: set[str] = set()
@@ -1188,6 +1196,10 @@ def dag(self, dag: DAG | None):
self._dag = dag
+ @property
+ def task_display_name(self) -> str:
+ return self._task_display_property_value or self.task_id
+
def has_dag(self):
"""Return True if the Operator has been assigned to a DAG."""
return self._dag is not None
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index a230c94fd7529..3e6076e062455 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -156,6 +156,13 @@
from airflow.typing_compat import Literal
from airflow.utils.task_group import TaskGroup
+ # This is a workaround because mypy doesn't work with hybrid_property
+ # TODO: remove this hack and move hybrid_property back to main import block
+ # See https://github.com/python/mypy/issues/4430
+ hybrid_property = property
+else:
+ from sqlalchemy.ext.hybrid import hybrid_property
+
log = logging.getLogger(__name__)
DEFAULT_VIEW_PRESETS = ["grid", "graph", "duration", "gantt", "landing_times"]
@@ -412,6 +419,7 @@ class DAG(LoggingMixin):
:param fail_stop: Fails currently running tasks when task in DAG fails.
**Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success").
An exception will be thrown if any task in a fail stop dag has a non default trigger rule.
+ :param dag_display_name: The display name of the DAG which appears on the UI.
"""
_comps = {
@@ -478,6 +486,7 @@ def __init__(
owner_links: dict[str, str] | None = None,
auto_register: bool = True,
fail_stop: bool = False,
+ dag_display_name: str | None = None,
):
from airflow.utils.task_group import TaskGroup
@@ -510,6 +519,8 @@ def __init__(
validate_key(dag_id)
self._dag_id = dag_id
+ self._dag_display_property_value = dag_display_name
+
if concurrency:
# TODO: Remove in Airflow 3.0
warnings.warn(
@@ -1300,6 +1311,10 @@ def access_control(self):
def access_control(self, value):
self._access_control = DAG._upgrade_outdated_dag_access_control(value)
+ @property
+ def dag_display_name(self) -> str:
+ return self._dag_display_property_value or self._dag_id
+
@property
def description(self) -> str | None:
return self._description
@@ -3133,6 +3148,7 @@ def bulk_write_to_db(
orm_dag.has_import_errors = False
orm_dag.last_parsed_time = timezone.utcnow()
orm_dag.default_view = dag.default_view
+ orm_dag._dag_display_property_value = dag._dag_display_property_value
orm_dag.description = dag.description
orm_dag.max_active_tasks = dag.max_active_tasks
orm_dag.max_active_runs = dag.max_active_runs
@@ -3589,6 +3605,8 @@ class DagModel(Base):
processor_subdir = Column(String(2000), nullable=True)
# String representing the owners
owners = Column(String(2000))
+ # Display name of the dag
+ _dag_display_property_value = Column("dag_display_name", String(2000), nullable=True)
# Description of the dag
description = Column(Text)
# Default view of the DAG inside the webserver
@@ -3783,6 +3801,10 @@ def set_is_paused(self, is_paused: bool, including_subdags: bool = True, session
)
session.commit()
+ @hybrid_property
+ def dag_display_name(self) -> str:
+ return self._dag_display_property_value or self.dag_id
+
@classmethod
@internal_api_call
@provide_session
@@ -3982,6 +4004,7 @@ def dag(
owner_links: dict[str, str] | None = None,
auto_register: bool = True,
fail_stop: bool = False,
+ dag_display_name: str | None = None,
) -> Callable[[Callable], Callable[..., DAG]]:
"""
Python dag decorator which wraps a function into an Airflow DAG.
@@ -4038,6 +4061,7 @@ def factory(*args, **kwargs):
owner_links=owner_links,
auto_register=auto_register,
fail_stop=fail_stop,
+ dag_display_name=dag_display_name,
) as dag_obj:
# Set DAG documentation from function documentation if it exists and doc_md is not set.
if f.__doc__ and not dag_obj.doc_md:
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 862928d7ab2da..994e041d9fa5c 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -386,6 +386,10 @@ def leaves(self) -> Sequence[AbstractOperator]:
"""Implementing DAGNode."""
return [self]
+ @property
+ def task_display_name(self) -> str:
+ return self.partial_kwargs.get("task_display_name") or self.task_id
+
@property
def owner(self) -> str: # type: ignore[override]
return self.partial_kwargs.get("owner", DEFAULT_OWNER)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 9968da5898a66..9125e6912a20b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1307,6 +1307,7 @@ class TaskInstance(Base, LoggingMixin):
next_method = Column(String(1000))
next_kwargs = Column(MutableDict.as_mutable(ExtendedJSON))
+ _task_display_property_value = Column("task_display_name", String(2000), nullable=True)
# If adding new fields here then remember to add them to
# refresh_from_db() or they won't display in the UI correctly
@@ -1475,6 +1476,7 @@ def insert_mapping(run_id: str, task: Operator, map_index: int) -> dict[str, Any
"operator": task.task_type,
"custom_operator_name": getattr(task, "custom_operator_name", None),
"map_index": map_index,
+ "_task_display_property_value": task.task_display_name,
}
@reconstructor
@@ -1535,6 +1537,10 @@ def operator_name(self) -> str | None:
"""@property: use a more friendly display name for the operator, if set."""
return self.custom_operator_name or self.operator
+ @hybrid_property
+ def task_display_name(self) -> str:
+ return self._task_display_property_value or self.task_id
+
@staticmethod
def _command_as_list(
ti: TaskInstance | TaskInstancePydantic,
diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json
index 31e8d58585514..a2a6732763641 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -169,7 +169,8 @@
{"type": "string"}
]
},
- "orientation": { "type" : "string"},
+ "orientation": { "type" : "string"},
+ "_dag_display_property_value": { "type" : "string"},
"_description": { "type" : "string"},
"_concurrency": { "type" : "number"},
"_max_active_tasks": { "type" : "number"},
@@ -239,6 +240,7 @@
"_task_module": { "type": "string" },
"_operator_extra_links": { "$ref": "#/definitions/extra_links" },
"task_id": { "type": "string" },
+ "task_display_name": { "type": "string" },
"label": { "type": "string" },
"owner": { "type": "string" },
"start_date": { "$ref": "#/definitions/datetime" },
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index f0427f8f18190..16a5c9e481d1d 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1395,6 +1395,7 @@ class SerializedDAG(DAG, BaseSerialization):
def __get_constructor_defaults():
param_to_attr = {
"max_active_tasks": "_max_active_tasks",
+ "dag_display_name": "_dag_display_property_value",
"description": "_description",
"default_view": "_default_view",
"access_control": "_access_control",
diff --git a/airflow/www/static/js/dag/details/Header.tsx b/airflow/www/static/js/dag/details/Header.tsx
index 06c1e2dd11a73..57e0afa1b0103 100644
--- a/airflow/www/static/js/dag/details/Header.tsx
+++ b/airflow/www/static/js/dag/details/Header.tsx
@@ -34,6 +34,7 @@ import RunTypeIcon from "src/components/RunTypeIcon";
import BreadcrumbText from "./BreadcrumbText";
const dagId = getMetaValue("dag_id");
+const dagDisplayName = getMetaValue("dag_display_name");
const Header = () => {
const {
@@ -89,7 +90,9 @@ const Header = () => {
const lastIndex = taskId ? taskId.lastIndexOf(".") : null;
const taskName =
- taskId && lastIndex ? taskId.substring(lastIndex + 1) : taskId;
+ taskInstance?.taskDisplayName && lastIndex
+ ? taskInstance?.taskDisplayName.substring(lastIndex + 1)
+ : taskId;
const isDagDetails = !runId && !taskId;
const isRunDetails = !!(runId && !taskId);
@@ -103,7 +106,7 @@ const Header = () => {
onClick={clearSelection}
_hover={isDagDetails ? { cursor: "default" } : undefined}
>
-