Skip to content

Commit

Permalink
[AIRFLOW-6641] Better diagnostics for kubernetes flaky tests (#7261)
Browse files Browse the repository at this point in the history
Kubernetes tests are sometimes flaky - they do not reach desired state and we
do not know why. We need to make it less flaky and have enough diagnostics to
understand what's going on.

(cherry picked from commit 4e7bb5f)
  • Loading branch information
potiuk authored and kaxil committed Jan 26, 2020
1 parent 023977f commit 31846f8
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions tests/runtime/kubernetes/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import re
import time
import unittest
from shutil import rmtree
from subprocess import check_call, check_output
from tempfile import mkdtemp

import pytest
import requests
import requests.exceptions
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from airflow.configuration import run_command

KUBERNETES_HOST = (os.environ.get('CLUSTER_NAME') or "docker") + "-worker:30809"


Expand Down Expand Up @@ -68,7 +72,7 @@ def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_sta
tries = 0
state = ''
max_tries = max(int(timeout / 5), 1)
# Wait 100 seconds for the operator to complete
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)

Expand All @@ -92,18 +96,39 @@ def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_sta
tries += 1
except requests.exceptions.ConnectionError as e:
check_call(["echo", "api call failed. trying again. error {}".format(e)])

if state != expected_final_state:
print("The expected state is wrong {} != {} (expected)!".format(state, expected_final_state))
self.dump_kubernetes_logs()
self.assertEqual(state, expected_final_state)

# Maybe check if we can retrieve the logs, but then we need to extend the API
def dump_kubernetes_logs(self):
if os.environ.get('ENABLE_KIND_CLUSTER') == 'true':
self.dump_kind_logs()

def dump_kind_logs(self):
tempdir_path = mkdtemp()
try:
run_command(["kind", "export", "logs", tempdir_path])
for dirpath, _, filenames in os.walk(tempdir_path):
for file_name in filenames:
file_path = os.path.join(dirpath, file_name)
print("###############################################################")
print(file_path)
print("###############################################################")
with open(file_path, 'rU') as file:
text = file.read()
text_array = text.split()
print(text_array)
finally:
rmtree(tempdir_path)

def ensure_dag_expected_state(self, host, execution_date, dag_id,
expected_final_state,
timeout):
tries = 0
state = ''
max_tries = max(int(timeout / 5), 1)
# Wait 100 seconds for the operator to complete
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)

Expand Down Expand Up @@ -181,7 +206,7 @@ def test_integration_run_dag(self):
execution_date = result_json['items'][0]['execution_date']
print("Found the job with execution date {}".format(execution_date))

# Wait 100 seconds for the operator to complete
# Wait some time for the operator to complete
self.monitor_task(host=host,
execution_date=execution_date,
dag_id=dag_id,
Expand All @@ -208,12 +233,12 @@ def test_integration_run_dag_with_scheduler_failure(self):

time.sleep(10) # give time for pod to restart

# Wait 100 seconds for the operator to complete
# Wait some time for the operator to complete
self.monitor_task(host=host,
execution_date=execution_date,
dag_id=dag_id,
task_id='start_task',
expected_final_state='success', timeout=120)
expected_final_state='success', timeout=200)

self.ensure_dag_expected_state(host=host,
execution_date=execution_date,
Expand Down

0 comments on commit 31846f8

Please sign in to comment.