Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update metrics names to allow multiple executors to report metrics #40778

Merged
merged 8 commits into from
Jul 23, 2024
72 changes: 52 additions & 20 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
from airflow.stats import Stats
from airflow.traces import NO_TRACE_ID
Expand Down Expand Up @@ -238,44 +239,75 @@ def heartbeat(self) -> None:
num_running_tasks = len(self.running)
num_queued_tasks = len(self.queued_tasks)

self.log.debug("%s running task instances", num_running_tasks)
self.log.debug("%s in queue", num_queued_tasks)
if open_slots == 0:
self.log.info("Executor parallelism limit reached. 0 open slots.")
else:
self.log.debug("%s open slots", open_slots)
self._emit_metrics(open_slots, num_running_tasks, num_queued_tasks)
self.trigger_tasks(open_slots)

# Calling child class sync method
self.log.debug("Calling the %s sync method", self.__class__)
self.sync()

def _emit_metrics(self, open_slots, num_running_tasks, num_queued_tasks):
"""
Emit metrics relevant to the Executor.

In the case of multiple executors being configured, the metric names include the name of
executor to differentiate them from metrics from other executors.

If only one executor is configured, the metric names will not be changed.
"""
name = self.__class__.__name__
multiple_executors_configured = len(ExecutorLoader.get_executor_names()) > 1
if multiple_executors_configured:
metric_suffix = name

open_slots_metric_name = (
f"executor.open_slots.{metric_suffix}" if multiple_executors_configured else "executor.open_slots"
)
queued_tasks_metric_name = (
f"executor.queued_tasks.{metric_suffix}"
if multiple_executors_configured
else "executor.queued_tasks"
)
running_tasks_metric_name = (
f"executor.running_tasks.{metric_suffix}"
if multiple_executors_configured
else "executor.running_tasks"
)

span = Trace.get_current_span()
if span.is_recording():
span.add_event(
name="executor",
attributes={
"executor.open_slots": open_slots,
"executor.queued_tasks": num_queued_tasks,
"executor.running_tasks": num_running_tasks,
open_slots_metric_name: open_slots,
queued_tasks_metric_name: num_queued_tasks,
running_tasks_metric_name: num_running_tasks,
},
)

self.log.debug("%s running task instances for executor %s", num_running_tasks, name)
self.log.debug("%s in queue for executor %s", num_queued_tasks, name)
if open_slots == 0:
self.log.info("Executor parallelism limit reached. 0 open slots.")
else:
self.log.debug("%s open slots for executor %s", open_slots, name)

Stats.gauge(
"executor.open_slots", value=open_slots, tags={"status": "open", "name": self.__class__.__name__}
open_slots_metric_name,
value=open_slots,
tags={"status": "open", "name": name},
)
Stats.gauge(
"executor.queued_tasks",
queued_tasks_metric_name,
value=num_queued_tasks,
tags={"status": "queued", "name": self.__class__.__name__},
tags={"status": "queued", "name": name},
)
Stats.gauge(
"executor.running_tasks",
running_tasks_metric_name,
value=num_running_tasks,
tags={"status": "running", "name": self.__class__.__name__},
tags={"status": "running", "name": name},
)

self.trigger_tasks(open_slots)

# Calling child class sync method
self.log.debug("Calling the %s sync method", self.__class__)
self.sync()

def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, QueuedTaskInstanceType]]:
"""
Orders the queued tasks by priority.
Expand Down
15 changes: 15 additions & 0 deletions chart/files/statsd-mappings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,18 @@ mappings:
name: "airflow_pool_starving_tasks"
labels:
pool: "$1"

- match: airflow.executor.open_slots.*
name: "airflow_executor_open_slots"
labels:
executor: "$1"

- match: airflow.executor.queued_tasks.*
name: "airflow_executor_queued_tasks"
labels:
executor: "$1"

- match: airflow.executor.running_tasks.*
name: "airflow_executor_running_tasks"
labels:
executor: "$1"
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,11 @@ Name Description
``scheduler.tasks.executable`` Number of tasks that are ready for execution (set to queued)
with respect to pool limits, DAG concurrency, executor state,
and priority.
``executor.open_slots.<executor_class_name>`` Number of open slots on a specific executor. Only emitted when multiple executors are configured.
``executor.open_slots`` Number of open slots on executor
``executor.queued_tasks.<executor_class_name>`` Number of queued tasks on on a specific executor. Only emitted when multiple executors are configured.
``executor.queued_tasks`` Number of queued tasks on executor
``executor.running_tasks.<executor_class_name>`` Number of running tasks on on a specific executor. Only emitted when multiple executors are configured.
``executor.running_tasks`` Number of running tasks on executor
``pool.open_slots.<pool_name>`` Number of open slots in the pool
``pool.open_slots`` Number of open slots in the pool. Metric with pool_name tagging.
Expand Down
5 changes: 3 additions & 2 deletions helm_tests/other/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ def test_statsd_configmap_by_default(self):
mappings_yml = jmespath.search('data."mappings.yml"', docs[0])
mappings_yml_obj = yaml.safe_load(mappings_yml)

assert "airflow_dagrun_dependency_check" == mappings_yml_obj["mappings"][0]["name"]
assert "airflow_pool_starving_tasks" == mappings_yml_obj["mappings"][-1]["name"]
names = [mapping["name"] for mapping in mappings_yml_obj["mappings"]]
assert "airflow_dagrun_dependency_check" in names
assert "airflow_pool_starving_tasks" in names

def test_statsd_configmap_when_exist_extra_mappings(self):
extra_mapping = {
Expand Down
48 changes: 47 additions & 1 deletion tests/executors/test_base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from airflow.cli.cli_config import DefaultHelpParser, GroupCommand
from airflow.cli.cli_parser import AirflowHelpFormatter
from airflow.executors.base_executor import BaseExecutor, RunningRetryAttemptType
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.utils import timezone
Expand Down Expand Up @@ -120,7 +122,7 @@ def test_fail_and_success():
@mock.patch("airflow.executors.base_executor.BaseExecutor.sync")
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
@mock.patch("airflow.executors.base_executor.Stats.gauge")
def test_gauge_executor_metrics(mock_stats_gauge, mock_trigger_tasks, mock_sync):
def test_gauge_executor_metrics_single_executor(mock_stats_gauge, mock_trigger_tasks, mock_sync):
executor = BaseExecutor()
executor.heartbeat()
calls = [
Expand All @@ -133,6 +135,50 @@ def test_gauge_executor_metrics(mock_stats_gauge, mock_trigger_tasks, mock_sync)
mock_stats_gauge.assert_has_calls(calls)


@pytest.mark.parametrize(
"executor_class, executor_name",
[(LocalExecutor, "LocalExecutor"), (SequentialExecutor, "SequentialExecutor")],
)
@mock.patch("airflow.executors.local_executor.LocalExecutor.sync")
@mock.patch("airflow.executors.sequential_executor.SequentialExecutor.sync")
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
@mock.patch("airflow.executors.base_executor.Stats.gauge")
@mock.patch("airflow.executors.executor_loader.ExecutorLoader.get_executor_names")
def test_gauge_executor_metrics_with_multiple_executors(
mock_get_executor_names,
mock_stats_gauge,
mock_trigger_tasks,
mock_sequential_sync,
mock_local_sync,
executor_class,
executor_name,
):
# The names of the executors aren't relevant for this test, so long as a list of length > 1
# is returned. This forces the executor to use the multiple executors gauge logic.
mock_get_executor_names.return_value = ["Exec1", "Exec2"]
syedahsn marked this conversation as resolved.
Show resolved Hide resolved
executor = executor_class()
executor.heartbeat()

calls = [
mock.call(
f"executor.open_slots.{executor_name}",
value=mock.ANY,
tags={"status": "open", "name": executor_name},
),
mock.call(
f"executor.queued_tasks.{executor_name}",
value=mock.ANY,
tags={"status": "queued", "name": executor_name},
),
mock.call(
f"executor.running_tasks.{executor_name}",
value=mock.ANY,
tags={"status": "running", "name": executor_name},
),
]
mock_stats_gauge.assert_has_calls(calls)


@mock.patch("airflow.executors.base_executor.BaseExecutor.sync")
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
@mock.patch("airflow.executors.base_executor.Stats.gauge")
Expand Down