From d1f93960d82103d6ce9b15beaa75d189935230c2 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 26 Jan 2020 00:25:39 +0100 Subject: [PATCH] [AIRFLOW-6641] Better diagnostics for kubernetes flaky tests (#7261) Kubernetes tests are sometimes flaky - they do not reach desired state and we do not know why. We need to make it less flaky and have enough diagnostics to understand what's going on. (cherry picked from commit 4e7bb5f2553d4dee68b2bdcbebd944462acee957) (cherry picked from commit eb3974b3010f9ae168a0cba05fbd0bc137c405a9) --- .../kubernetes/test_kubernetes_executor.py | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/tests/runtime/kubernetes/test_kubernetes_executor.py b/tests/runtime/kubernetes/test_kubernetes_executor.py index d93bbc74266461..0c41b1e08ab0f3 100644 --- a/tests/runtime/kubernetes/test_kubernetes_executor.py +++ b/tests/runtime/kubernetes/test_kubernetes_executor.py @@ -20,7 +20,9 @@ import re import time import unittest +from shutil import rmtree from subprocess import check_call, check_output +from tempfile import mkdtemp import pytest import requests @@ -28,6 +30,8 @@ from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +from airflow.configuration import run_command + KUBERNETES_HOST = (os.environ.get('CLUSTER_NAME') or "docker") + "-worker:30809" @@ -68,7 +72,7 @@ def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_sta tries = 0 state = '' max_tries = max(int(timeout / 5), 1) - # Wait 100 seconds for the operator to complete + # Wait some time for the operator to complete while tries < max_tries: time.sleep(5) @@ -92,10 +96,31 @@ def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_sta tries += 1 except requests.exceptions.ConnectionError as e: check_call(["echo", "api call failed. trying again. error {}".format(e)]) - + if state != expected_final_state: + print("The expected state is wrong {} != {} (expected)!".format(state, expected_final_state)) + self.dump_kubernetes_logs() self.assertEqual(state, expected_final_state) - # Maybe check if we can retrieve the logs, but then we need to extend the API + def dump_kubernetes_logs(self): + if os.environ.get('ENABLE_KIND_CLUSTER') == 'true': + self.dump_kind_logs() + + def dump_kind_logs(self): + tempdir_path = mkdtemp() + try: + run_command(["kind", "export", "logs", tempdir_path]) + for dirpath, _, filenames in os.walk(tempdir_path): + for file_name in filenames: + file_path = os.path.join(dirpath, file_name) + print("###############################################################") + print(file_path) + print("###############################################################") + with open(file_path, 'rU') as file: + text = file.read() + text_array = text.split() + print(text_array) + finally: + rmtree(tempdir_path) def ensure_dag_expected_state(self, host, execution_date, dag_id, expected_final_state, @@ -103,7 +128,7 @@ def ensure_dag_expected_state(self, host, execution_date, dag_id, tries = 0 state = '' max_tries = max(int(timeout / 5), 1) - # Wait 100 seconds for the operator to complete + # Wait some time for the operator to complete while tries < max_tries: time.sleep(5) @@ -181,7 +206,7 @@ def test_integration_run_dag(self): execution_date = result_json['items'][0]['execution_date'] print("Found the job with execution date {}".format(execution_date)) - # Wait 100 seconds for the operator to complete + # Wait some time for the operator to complete self.monitor_task(host=host, execution_date=execution_date, dag_id=dag_id, @@ -208,12 +233,12 @@ def test_integration_run_dag_with_scheduler_failure(self): time.sleep(10) # give time for pod to restart - # Wait 100 seconds for the operator to complete + # Wait some time for the operator to complete self.monitor_task(host=host, execution_date=execution_date, dag_id=dag_id, task_id='start_task', - expected_final_state='success', timeout=120) + expected_final_state='success', timeout=200) self.ensure_dag_expected_state(host=host, execution_date=execution_date,