Skip to content

Commit

Permalink
[AIRFLOW-6641] Better diagnostics for kubernetes flaky tests (#7261)
Browse files Browse the repository at this point in the history
Kubernetes tests are sometimes flaky - they do not reach desired state and we
do not know why. We need to make it less flaky and have enough diagnostics to
understand what's going on.
  • Loading branch information
potiuk authored Jan 25, 2020
1 parent 8936180 commit 4e7bb5f
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions tests/runtime/kubernetes/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import re
import time
import unittest
from shutil import rmtree
from subprocess import check_call, check_output
from tempfile import mkdtemp

import pytest
import requests
import requests.exceptions
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from airflow.configuration import run_command

KUBERNETES_HOST = (os.environ.get('CLUSTER_NAME') or "docker") + "-worker:30809"


Expand Down Expand Up @@ -68,7 +72,7 @@ def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_sta
tries = 0
state = ''
max_tries = max(int(timeout / 5), 1)
# Wait 100 seconds for the operator to complete
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)

Expand All @@ -92,18 +96,39 @@ def monitor_task(self, host, execution_date, dag_id, task_id, expected_final_sta
tries += 1
except requests.exceptions.ConnectionError as e:
check_call(["echo", "api call failed. trying again. error {}".format(e)])

if state != expected_final_state:
print("The expected state is wrong {} != {} (expected)!".format(state, expected_final_state))
self.dump_kubernetes_logs()
self.assertEqual(state, expected_final_state)

# Maybe check if we can retrieve the logs, but then we need to extend the API
def dump_kubernetes_logs(self):
if os.environ.get('ENABLE_KIND_CLUSTER') == 'true':
self.dump_kind_logs()

def dump_kind_logs(self):
tempdir_path = mkdtemp()
try:
run_command(["kind", "export", "logs", tempdir_path])
for dirpath, _, filenames in os.walk(tempdir_path):
for file_name in filenames:
file_path = os.path.join(dirpath, file_name)
print("###############################################################")
print(file_path)
print("###############################################################")
with open(file_path, 'rU') as file:
text = file.read()
text_array = text.split()
print(text_array)
finally:
rmtree(tempdir_path)

def ensure_dag_expected_state(self, host, execution_date, dag_id,
expected_final_state,
timeout):
tries = 0
state = ''
max_tries = max(int(timeout / 5), 1)
# Wait 100 seconds for the operator to complete
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)

Expand Down Expand Up @@ -181,7 +206,7 @@ def test_integration_run_dag(self):
execution_date = result_json['items'][0]['execution_date']
print("Found the job with execution date {}".format(execution_date))

# Wait 100 seconds for the operator to complete
# Wait some time for the operator to complete
self.monitor_task(host=host,
execution_date=execution_date,
dag_id=dag_id,
Expand All @@ -208,12 +233,12 @@ def test_integration_run_dag_with_scheduler_failure(self):

time.sleep(10) # give time for pod to restart

# Wait 100 seconds for the operator to complete
# Wait some time for the operator to complete
self.monitor_task(host=host,
execution_date=execution_date,
dag_id=dag_id,
task_id='start_task',
expected_final_state='success', timeout=120)
expected_final_state='success', timeout=200)

self.ensure_dag_expected_state(host=host,
execution_date=execution_date,
Expand Down

0 comments on commit 4e7bb5f

Please sign in to comment.