diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 098bb93215e3a9..173b6da8b5cad2 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -30,6 +30,7 @@ from airflow.cli.cli_config import DefaultHelpParser from airflow.configuration import conf from airflow.exceptions import RemovedInAirflow3Warning +from airflow.executors.executor_loader import ExecutorLoader from airflow.models import Log from airflow.stats import Stats from airflow.traces import NO_TRACE_ID @@ -238,44 +239,75 @@ def heartbeat(self) -> None: num_running_tasks = len(self.running) num_queued_tasks = len(self.queued_tasks) - self.log.debug("%s running task instances", num_running_tasks) - self.log.debug("%s in queue", num_queued_tasks) - if open_slots == 0: - self.log.info("Executor parallelism limit reached. 0 open slots.") - else: - self.log.debug("%s open slots", open_slots) + self._emit_metrics(open_slots, num_running_tasks, num_queued_tasks) + self.trigger_tasks(open_slots) + + # Calling child class sync method + self.log.debug("Calling the %s sync method", self.__class__) + self.sync() + + def _emit_metrics(self, open_slots, num_running_tasks, num_queued_tasks): + """ + Emit metrics relevant to the Executor. + + In the case of multiple executors being configured, the metric names include the name of + executor to differentiate them from metrics from other executors. + + If only one executor is configured, the metric names will not be changed. + """ + name = self.__class__.__name__ + multiple_executors_configured = len(ExecutorLoader.get_executor_names()) > 1 + if multiple_executors_configured: + metric_suffix = name + + open_slots_metric_name = ( + f"executor.open_slots.{metric_suffix}" if multiple_executors_configured else "executor.open_slots" + ) + queued_tasks_metric_name = ( + f"executor.queued_tasks.{metric_suffix}" + if multiple_executors_configured + else "executor.queued_tasks" + ) + running_tasks_metric_name = ( + f"executor.running_tasks.{metric_suffix}" + if multiple_executors_configured + else "executor.running_tasks" + ) span = Trace.get_current_span() if span.is_recording(): span.add_event( name="executor", attributes={ - "executor.open_slots": open_slots, - "executor.queued_tasks": num_queued_tasks, - "executor.running_tasks": num_running_tasks, + open_slots_metric_name: open_slots, + queued_tasks_metric_name: num_queued_tasks, + running_tasks_metric_name: num_running_tasks, }, ) + self.log.debug("%s running task instances for executor %s", num_running_tasks, name) + self.log.debug("%s in queue for executor %s", num_queued_tasks, name) + if open_slots == 0: + self.log.info("Executor parallelism limit reached. 0 open slots.") + else: + self.log.debug("%s open slots for executor %s", open_slots, name) + Stats.gauge( - "executor.open_slots", value=open_slots, tags={"status": "open", "name": self.__class__.__name__} + open_slots_metric_name, + value=open_slots, + tags={"status": "open", "name": name}, ) Stats.gauge( - "executor.queued_tasks", + queued_tasks_metric_name, value=num_queued_tasks, - tags={"status": "queued", "name": self.__class__.__name__}, + tags={"status": "queued", "name": name}, ) Stats.gauge( - "executor.running_tasks", + running_tasks_metric_name, value=num_running_tasks, - tags={"status": "running", "name": self.__class__.__name__}, + tags={"status": "running", "name": name}, ) - self.trigger_tasks(open_slots) - - # Calling child class sync method - self.log.debug("Calling the %s sync method", self.__class__) - self.sync() - def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, QueuedTaskInstanceType]]: """ Orders the queued tasks by priority. diff --git a/chart/files/statsd-mappings.yml b/chart/files/statsd-mappings.yml index 14d95df7319c7f..86d773fd20b7f2 100644 --- a/chart/files/statsd-mappings.yml +++ b/chart/files/statsd-mappings.yml @@ -91,3 +91,18 @@ mappings: name: "airflow_pool_starving_tasks" labels: pool: "$1" + + - match: airflow.executor.open_slots.* + name: "airflow_executor_open_slots" + labels: + executor: "$1" + + - match: airflow.executor.queued_tasks.* + name: "airflow_executor_queued_tasks" + labels: + executor: "$1" + + - match: airflow.executor.running_tasks.* + name: "airflow_executor_running_tasks" + labels: + executor: "$1" diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index 49ba7cf422fbda..82597712a8abc9 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -228,8 +228,11 @@ Name Description ``scheduler.tasks.executable`` Number of tasks that are ready for execution (set to queued) with respect to pool limits, DAG concurrency, executor state, and priority. +``executor.open_slots.`` Number of open slots on a specific executor. Only emitted when multiple executors are configured. ``executor.open_slots`` Number of open slots on executor +``executor.queued_tasks.`` Number of queued tasks on on a specific executor. Only emitted when multiple executors are configured. ``executor.queued_tasks`` Number of queued tasks on executor +``executor.running_tasks.`` Number of running tasks on on a specific executor. Only emitted when multiple executors are configured. ``executor.running_tasks`` Number of running tasks on executor ``pool.open_slots.`` Number of open slots in the pool ``pool.open_slots`` Number of open slots in the pool. Metric with pool_name tagging. diff --git a/helm_tests/other/test_statsd.py b/helm_tests/other/test_statsd.py index 8f65a0f6525bfe..a2f4b870bd9669 100644 --- a/helm_tests/other/test_statsd.py +++ b/helm_tests/other/test_statsd.py @@ -243,8 +243,9 @@ def test_statsd_configmap_by_default(self): mappings_yml = jmespath.search('data."mappings.yml"', docs[0]) mappings_yml_obj = yaml.safe_load(mappings_yml) - assert "airflow_dagrun_dependency_check" == mappings_yml_obj["mappings"][0]["name"] - assert "airflow_pool_starving_tasks" == mappings_yml_obj["mappings"][-1]["name"] + names = [mapping["name"] for mapping in mappings_yml_obj["mappings"]] + assert "airflow_dagrun_dependency_check" in names + assert "airflow_pool_starving_tasks" in names def test_statsd_configmap_when_exist_extra_mappings(self): extra_mapping = { diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index e6dc057ca576c3..7c60cd42bc2962 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -30,6 +30,8 @@ from airflow.cli.cli_config import DefaultHelpParser, GroupCommand from airflow.cli.cli_parser import AirflowHelpFormatter from airflow.executors.base_executor import BaseExecutor, RunningRetryAttemptType +from airflow.executors.local_executor import LocalExecutor +from airflow.executors.sequential_executor import SequentialExecutor from airflow.models.baseoperator import BaseOperator from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.utils import timezone @@ -120,7 +122,7 @@ def test_fail_and_success(): @mock.patch("airflow.executors.base_executor.BaseExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") @mock.patch("airflow.executors.base_executor.Stats.gauge") -def test_gauge_executor_metrics(mock_stats_gauge, mock_trigger_tasks, mock_sync): +def test_gauge_executor_metrics_single_executor(mock_stats_gauge, mock_trigger_tasks, mock_sync): executor = BaseExecutor() executor.heartbeat() calls = [ @@ -133,6 +135,50 @@ def test_gauge_executor_metrics(mock_stats_gauge, mock_trigger_tasks, mock_sync) mock_stats_gauge.assert_has_calls(calls) +@pytest.mark.parametrize( + "executor_class, executor_name", + [(LocalExecutor, "LocalExecutor"), (SequentialExecutor, "SequentialExecutor")], +) +@mock.patch("airflow.executors.local_executor.LocalExecutor.sync") +@mock.patch("airflow.executors.sequential_executor.SequentialExecutor.sync") +@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") +@mock.patch("airflow.executors.base_executor.Stats.gauge") +@mock.patch("airflow.executors.executor_loader.ExecutorLoader.get_executor_names") +def test_gauge_executor_metrics_with_multiple_executors( + mock_get_executor_names, + mock_stats_gauge, + mock_trigger_tasks, + mock_sequential_sync, + mock_local_sync, + executor_class, + executor_name, +): + # The names of the executors aren't relevant for this test, so long as a list of length > 1 + # is returned. This forces the executor to use the multiple executors gauge logic. + mock_get_executor_names.return_value = ["Exec1", "Exec2"] + executor = executor_class() + executor.heartbeat() + + calls = [ + mock.call( + f"executor.open_slots.{executor_name}", + value=mock.ANY, + tags={"status": "open", "name": executor_name}, + ), + mock.call( + f"executor.queued_tasks.{executor_name}", + value=mock.ANY, + tags={"status": "queued", "name": executor_name}, + ), + mock.call( + f"executor.running_tasks.{executor_name}", + value=mock.ANY, + tags={"status": "running", "name": executor_name}, + ), + ] + mock_stats_gauge.assert_has_calls(calls) + + @mock.patch("airflow.executors.base_executor.BaseExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") @mock.patch("airflow.executors.base_executor.Stats.gauge")