From 534d75b2a4c37abdc51fe7dd3a9d573d07d0bf12 Mon Sep 17 00:00:00 2001 From: Roland de Boo Date: Fri, 23 Aug 2019 12:30:51 +0200 Subject: [PATCH] [AIRFLOW-5282] Add default timeout on kubeclient & catch HTTPError (#5880) Catch urllib3 httperror in kube executor (cherry picked from commit 4636a306d060186eac10e9b88581b35600b981f6) --- airflow/config_templates/default_airflow.cfg | 4 +++- airflow/contrib/executors/kubernetes_executor.py | 10 ++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index fa9c52eb03191..1bfa8321b7a25 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -795,7 +795,9 @@ tolerations = # List of supported params in **kwargs are similar for all core_v1_apis, hence a single config variable for all apis # See: # https://raw.githubusercontent.com/kubernetes-client/python/master/kubernetes/client/apis/core_v1_api.py -kube_client_request_args = +# Note that if no _request_timeout is specified, the kubernetes client will wait indefinitely for kubernetes +# api responses, which will cause the scheduler to hang. The timeout is specified as [connect timeout, read timeout] +kube_client_request_args = {{"_request_timeout" : [60,60] }} # Worker pods security context options # See: diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 1b9bc033f4034..2adb526337a44 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -27,6 +27,8 @@ import kubernetes from kubernetes import watch, client from kubernetes.client.rest import ApiException +from urllib3.exceptions import HTTPError + from airflow.configuration import conf from airflow.contrib.kubernetes.pod_launcher import PodLauncher from airflow.contrib.kubernetes.kube_client import get_kube_client @@ -339,7 +341,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): if resource_version: kwargs['resource_version'] = resource_version if kube_config.kube_client_request_args: - for key, value in kube_config.kube_client_request_args.iteritems(): + for key, value in kube_config.kube_client_request_args.items(): kwargs[key] = value last_resource_version = None @@ -686,7 +688,7 @@ def clear_not_launched_queued_tasks(self, session=None): ) kwargs = dict(label_selector=dict_string) if self.kube_config.kube_client_request_args: - for key, value in self.kube_config.kube_client_request_args.iteritems(): + for key, value in self.kube_config.kube_client_request_args.items(): kwargs[key] = value pod_list = self.kube_client.list_namespaced_pod( self.kube_config.kube_namespace, **kwargs) @@ -801,6 +803,10 @@ def sync(self): self.log.warning('ApiException when attempting to run task, re-queueing. ' 'Message: %s' % json.loads(e.body)['message']) self.task_queue.put(task) + except HTTPError as e: + self.log.warning('HTTPError when attempting to run task, re-queueing. ' + 'Exception: %s', str(e)) + self.task_queue.put(task) finally: self.task_queue.task_done() except Empty: