Skip to content

Commit

Permalink
Allow users to write dag_id and task_id in their national characters,…
Browse files Browse the repository at this point in the history
… added display name for dag / task (v2) (#38446)

* Add display name in DAGs and tasks

Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: Aleksandr Shelukheev <[email protected]>

* Fix frontend issues

* Fix pytests

* Add DAG display tool tips

* Implement national language char display example oon two DAGs

* Review nit from other PR

* Fix WWW pytest on HTML code

* Review feedback, make task_display_name a property, not a field in mapped operator

* Review feedback, rename internal fields

* Small nit, optimize sorting on DAG home page with display name

* Review feedback

* Fix pytests in API plus review feedback on API

* Review feedback, extend schema, update apispecs and fix breadcrumb navigation

* nit, update page titles as well

* Fix pytests for extended API model

* Ensure mapped operator also provides a display string

* Ensure display field is only serialized if different from task id

* fix pytest for API

* Review feedback from TP

* Move labels down to base and mapped operator

* Move labels up to abstract operator

* Only one decorator is needed

---------

Co-authored-by: Vincent Gao <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: Aleksandr Shelukheev <[email protected]>
  • Loading branch information
4 people authored Mar 27, 2024
1 parent a033243 commit 8c44bcb
Show file tree
Hide file tree
Showing 39 changed files with 916 additions and 620 deletions.
16 changes: 16 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2847,6 +2847,13 @@ components:
type: string
readOnly: true
description: The ID of the DAG.
dag_display_name:
type: string
readOnly: true
description: |
Human centric display text for the DAG.
*New in version 2.9.0*
root_dag_id:
type: string
readOnly: true
Expand Down Expand Up @@ -3570,6 +3577,12 @@ components:
properties:
task_id:
type: string
task_display_name:
type: string
description: |
Human centric display text for the task.
*New in version 2.9.0*
dag_id:
type: string
dag_run_id:
Expand Down Expand Up @@ -3925,6 +3938,9 @@ components:
task_id:
type: string
readOnly: true
task_display_name:
type: string
readOnly: true
owner:
type: string
readOnly: true
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Meta:
model = DagModel

dag_id = auto_field(dump_only=True)
dag_display_name = fields.String(attribute="dag_display_name", dump_only=True)
root_dag_id = auto_field(dump_only=True)
is_paused = auto_field()
is_active = auto_field(dump_only=True)
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Meta:
state = TaskInstanceStateField()
_try_number = auto_field(data_key="try_number")
max_tries = auto_field()
task_display_name = fields.String(attribute="task_display_name", dump_only=True)
hostname = auto_field()
unixname = auto_field()
pool = auto_field()
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/task_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class TaskSchema(Schema):
class_ref = fields.Method("_get_class_reference", dump_only=True)
operator_name = fields.Method("_get_operator_name", dump_only=True)
task_id = fields.String(dump_only=True)
task_display_name = fields.String(attribute="task_display_name", dump_only=True)
owner = fields.String(dump_only=True)
start_date = fields.DateTime(dump_only=True)
end_date = fields.DateTime(dump_only=True)
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"""Return a dagbag dag details dict."""
return {
"dag_id": dag.dag_id,
"dag_display_name": dag.dag_display_name,
"root_dag_id": dag.parent_dag.dag_id if dag.parent_dag else None,
"is_paused": dag.get_is_paused(),
"is_active": dag.get_is_active(),
Expand Down
48 changes: 48 additions & 0 deletions airflow/example_dags/example_display_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pendulum

from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator


# [START dag_decorator_usage]
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
dag_display_name="Sample DAG with Display Name",
)
def example_display_name():
sample_task_1 = EmptyOperator(
task_id="sample_task_1",
task_display_name="Sample Task 1",
)

@task(task_display_name="Sample Task 2")
def sample_task_2():
pass

sample_task_1 >> sample_task_2()


example_dag = example_display_name()
# [END dag_decorator_usage]
15 changes: 8 additions & 7 deletions airflow/example_dags/example_params_trigger_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@

with DAG(
dag_id=Path(__file__).stem,
dag_display_name="Params Trigger UI",
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime.datetime(2022, 3, 4),
catchup=False,
tags=["example_ui"],
tags=["example", "params"],
params={
"names": Param(
["Linda", "Martha", "Thomas"],
Expand All @@ -57,7 +58,7 @@
},
) as dag:

@task(task_id="get_names")
@task(task_id="get_names", task_display_name="Get names")
def get_names(**kwargs) -> list[str]:
ti: TaskInstance = kwargs["ti"]
dag_run: DagRun = ti.dag_run
Expand All @@ -66,7 +67,7 @@ def get_names(**kwargs) -> list[str]:
return []
return dag_run.conf["names"]

@task.branch(task_id="select_languages")
@task.branch(task_id="select_languages", task_display_name="Select languages")
def select_languages(**kwargs) -> list[str]:
ti: TaskInstance = kwargs["ti"]
dag_run: DagRun = ti.dag_run
Expand All @@ -76,19 +77,19 @@ def select_languages(**kwargs) -> list[str]:
selected_languages.append(f"generate_{lang}_greeting")
return selected_languages

@task(task_id="generate_english_greeting")
@task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
def generate_english_greeting(name: str) -> str:
return f"Hello {name}!"

@task(task_id="generate_german_greeting")
@task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
def generate_german_greeting(name: str) -> str:
return f"Sehr geehrter Herr/Frau {name}."

@task(task_id="generate_french_greeting")
@task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
def generate_french_greeting(name: str) -> str:
return f"Bonjour {name}!"

@task(task_id="print_greetings", trigger_rule=TriggerRule.ALL_DONE)
@task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
def print_greetings(greetings1, greetings2, greetings3) -> None:
for g in greetings1 or []:
print(g)
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/example_params_ui_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@

with DAG(
dag_id=Path(__file__).stem,
dag_display_name="Params UI tutorial",
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime.datetime(2022, 3, 4),
catchup=False,
tags=["example_ui"],
tags=["example", "params", "ui"],
params={
# Let's start simple: Standard dict values are detected from type and offered as entry form fields.
# Detected types are numbers, text, boolean, lists and dicts.
Expand Down Expand Up @@ -237,7 +238,7 @@
},
) as dag:

@task
@task(task_display_name="Show used parameters")
def show_params(**kwargs) -> None:
params: ParamsDict = kwargs["params"]
print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add display name for dag and task instance
Revision ID: ee1467d4aa35
Revises: b4078ac230a1
Create Date: 2024-03-24 22:33:36.824827
"""

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision = 'ee1467d4aa35'
down_revision = 'b4078ac230a1'
branch_labels = None
depends_on = None
airflow_version = "2.9.0"


def upgrade():
"""Apply add display name for dag and task instance"""
op.add_column("dag", sa.Column("dag_display_name", sa.String(2000), nullable=True))
op.add_column("task_instance", sa.Column("task_display_name", sa.String(2000), nullable=True))


def downgrade():
"""Unapply add display name for dag and task instance"""
op.drop_column("dag", "dag_display_name")
op.drop_column("task_instance", "task_display_name")
15 changes: 15 additions & 0 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import datetime
import inspect
from abc import abstractproperty
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Collection, Iterable, Iterator, Sequence

Expand Down Expand Up @@ -159,6 +160,20 @@ def dag_id(self) -> str:
def node_id(self) -> str:
return self.task_id

@abstractproperty
def task_display_name(self) -> str: ...

@property
def label(self) -> str | None:
if self.task_display_name and self.task_display_name != self.task_id:
return self.task_display_name
# Prefix handling if no display is given is cloned from taskmixin for compatibility
tg = self.task_group
if tg and tg.node_id and tg.prefix_group_id:
# "task_group_id.task_id" -> "task_id"
return self.task_id[len(tg.node_id) + 1 :]
return self.task_id

@property
def is_setup(self) -> bool:
raise NotImplementedError()
Expand Down
12 changes: 12 additions & 0 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def partial(
doc_json: str | None | ArgNotSet = NOTSET,
doc_yaml: str | None | ArgNotSet = NOTSET,
doc_rst: str | None | ArgNotSet = NOTSET,
task_display_name: str | None | ArgNotSet = NOTSET,
logger_name: str | None | ArgNotSet = NOTSET,
allow_nested_operators: bool = True,
**kwargs,
Expand Down Expand Up @@ -334,6 +335,7 @@ def partial(
"doc_md": doc_md,
"doc_rst": doc_rst,
"doc_yaml": doc_yaml,
"task_display_name": task_display_name,
"logger_name": logger_name,
"allow_nested_operators": allow_nested_operators,
}
Expand Down Expand Up @@ -705,6 +707,7 @@ class derived from this one results in the creation of a task object,
that is visible in Task Instance details View in the Webserver
:param doc_yaml: Add documentation (in YAML format) or notes to your Task objects
that is visible in Task Instance details View in the Webserver
:param task_display_name: The display name of the task which appears on the UI.
:param logger_name: Name of the logger used by the Operator to emit logs.
If set to `None` (default), the logger name will fall back to
`airflow.task.operators.{class.__module__}.{class.__name__}` (e.g. SimpleHttpOperator will have
Expand Down Expand Up @@ -859,6 +862,7 @@ def __init__(
doc_json: str | None = None,
doc_yaml: str | None = None,
doc_rst: str | None = None,
task_display_name: str | None = None,
logger_name: str | None = None,
allow_nested_operators: bool = True,
**kwargs,
Expand Down Expand Up @@ -1001,6 +1005,10 @@ def __init__(
self.doc_yaml = doc_yaml
self.doc_rst = doc_rst
self.doc = doc
# Populate the display field only if provided and different from task id
self._task_display_property_value = (
task_display_name if task_display_name and task_display_name != task_id else None
)

self.upstream_task_ids: set[str] = set()
self.downstream_task_ids: set[str] = set()
Expand Down Expand Up @@ -1188,6 +1196,10 @@ def dag(self, dag: DAG | None):

self._dag = dag

@property
def task_display_name(self) -> str:
return self._task_display_property_value or self.task_id

def has_dag(self):
"""Return True if the Operator has been assigned to a DAG."""
return self._dag is not None
Expand Down
Loading

0 comments on commit 8c44bcb

Please sign in to comment.