diff --git a/tests/runtime/kubernetes/test_kubernetes_executor.py b/tests/runtime/kubernetes/test_kubernetes_executor.py index d93bbc74266461..0c41b1e08ab0f3 100644 --- a/tests/runtime/kubernetes/test_kubernetes_executor.py +++ b/tests/runtime/kubernetes/test_kubernetes_executor.py @@ -20,7 +20,9 @@ import re import time import unittest +from shutil import rmtree from subprocess import check_call, check_output +from tempfile import mkdtemp import pytest import requests @@ -28,6 +30,8 @@ from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +from airflow.configuration import run_command + KUBERNETES_HOST = (os.environ.get('CLUSTER_NAME') or "docker") + "-worker:30809" @@ -68,7 +72,7 @@ def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_sta tries = 0 state = '' max_tries = max(int(timeout / 5), 1) - # Wait 100 seconds for the operator to complete + # Wait some time for the operator to complete while tries < max_tries: time.sleep(5) @@ -92,10 +96,31 @@ def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_sta tries += 1 except requests.exceptions.ConnectionError as e: check_call(["echo", "api call failed. trying again. error {}".format(e)]) - + if state != expected_final_state: + print("The expected state is wrong {} != {} (expected)!".format(state, expected_final_state)) + self.dump_kubernetes_logs() self.assertEqual(state, expected_final_state) - # Maybe check if we can retrieve the logs, but then we need to extend the API + def dump_kubernetes_logs(self): + if os.environ.get('ENABLE_KIND_CLUSTER') == 'true': + self.dump_kind_logs() + + def dump_kind_logs(self): + tempdir_path = mkdtemp() + try: + run_command(["kind", "export", "logs", tempdir_path]) + for dirpath, _, filenames in os.walk(tempdir_path): + for file_name in filenames: + file_path = os.path.join(dirpath, file_name) + print("###############################################################") + print(file_path) + print("###############################################################") + with open(file_path, 'rU') as file: + text = file.read() + text_array = text.split() + print(text_array) + finally: + rmtree(tempdir_path) def ensure_dag_expected_state(self, host, execution_date, dag_id, expected_final_state, @@ -103,7 +128,7 @@ def ensure_dag_expected_state(self, host, execution_date, dag_id, tries = 0 state = '' max_tries = max(int(timeout / 5), 1) - # Wait 100 seconds for the operator to complete + # Wait some time for the operator to complete while tries < max_tries: time.sleep(5) @@ -181,7 +206,7 @@ def test_integration_run_dag(self): execution_date = result_json['items'][0]['execution_date'] print("Found the job with execution date {}".format(execution_date)) - # Wait 100 seconds for the operator to complete + # Wait some time for the operator to complete self.monitor_task(host=host, execution_date=execution_date, dag_id=dag_id, @@ -208,12 +233,12 @@ def test_integration_run_dag_with_scheduler_failure(self): time.sleep(10) # give time for pod to restart - # Wait 100 seconds for the operator to complete + # Wait some time for the operator to complete self.monitor_task(host=host, execution_date=execution_date, dag_id=dag_id, task_id='start_task', - expected_final_state='success', timeout=120) + expected_final_state='success', timeout=200) self.ensure_dag_expected_state(host=host, execution_date=execution_date,