From cbfbf8b843f178de1e1aa1066e5ea3377a8de774 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 8 Dec 2022 12:23:21 -0800 Subject: [PATCH] Make live logs reading work for "other" k8s executors (#28213) --- airflow/utils/log/file_task_handler.py | 21 +++++++++++++-- tests/utils/test_log_handlers.py | 36 ++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index e20da8cebc7dce..5a5b55d8a41a3b 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -132,6 +132,24 @@ def _render_filename(self, ti: TaskInstance, try_number: int) -> str: def _read_grouped_logs(self): return False + @staticmethod + def _should_check_k8s(queue): + """ + If the task is running through kubernetes executor, return True. + + When logs aren't available locally, in this case we read from k8s pod logs. + """ + executor = conf.get("core", "executor") + if executor == "KubernetesExecutor": + return True + elif executor == "LocalKubernetesExecutor": + if queue == conf.get("local_kubernetes_executor", "kubernetes_queue"): + return True + elif executor == "CeleryKubernetesExecutor": + if queue == conf.get("celery_kubernetes_executor", "kubernetes_queue"): + return True + return False + def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None): """ Template method that contains custom logic of reading @@ -163,7 +181,6 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No location = os.path.join(self.local_base, log_relative_path) log = "" - if os.path.exists(location): try: with open(location, encoding="utf-8", errors="surrogateescape") as file: @@ -173,7 +190,7 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No log = f"*** Failed to load local log file: {location}\n" log += f"*** {str(e)}\n" return log, {"end_of_log": True} - elif conf.get("core", "executor") == "KubernetesExecutor": + elif self._should_check_k8s(ti.queue): try: from airflow.kubernetes.kube_client import get_kube_client diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 765b5d921aedec..ee2ff2d9cea362 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,6 +21,9 @@ import logging.config import os import re +from unittest.mock import patch + +import pytest from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, DagRun, TaskInstance @@ -264,3 +267,36 @@ def test_log_retrieval_valid(self, create_task_instance): log_url_ti.hostname = "hostname" url = FileTaskHandler._get_log_retrieval_url(log_url_ti, "DYNAMIC_PATH") assert url == "http://hostname:8793/log/DYNAMIC_PATH" + + +@pytest.mark.parametrize( + "config, queue, expected", + [ + (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), None, False), + (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), "kubernetes", False), + (dict(AIRFLOW__CORE__EXECUTOR="KubernetesExecutor"), None, True), + (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "any", False), + (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "kubernetes", True), + ( + dict( + AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor", + AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere", + ), + "hithere", + True, + ), + (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "any", False), + (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "kubernetes", True), + ( + dict( + AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor", + AIRFLOW__LOCAL_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere", + ), + "hithere", + True, + ), + ], +) +def test__should_check_k8s(config, queue, expected): + with patch.dict("os.environ", **config): + assert FileTaskHandler._should_check_k8s(queue) == expected