Skip to content

Commit

Permalink
[AIRFLOW-5282] Add default timeout on kubeclient & catch HTTPError (a…
Browse files Browse the repository at this point in the history
…pache#5880)

Catch urllib3 httperror in kube executor

(cherry picked from commit 4636a30)
  • Loading branch information
rolanddb authored and ashb committed Aug 29, 2019
1 parent 1469e50 commit 534d75b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
4 changes: 3 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,9 @@ tolerations =
# List of supported params in **kwargs are similar for all core_v1_apis, hence a single config variable for all apis
# See:
# https://raw.githubusercontent.com/kubernetes-client/python/master/kubernetes/client/apis/core_v1_api.py
kube_client_request_args =
# Note that if no _request_timeout is specified, the kubernetes client will wait indefinitely for kubernetes
# api responses, which will cause the scheduler to hang. The timeout is specified as [connect timeout, read timeout]
kube_client_request_args = {{"_request_timeout" : [60,60] }}

# Worker pods security context options
# See:
Expand Down
10 changes: 8 additions & 2 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import kubernetes
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from urllib3.exceptions import HTTPError

from airflow.configuration import conf
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.kube_client import get_kube_client
Expand Down Expand Up @@ -339,7 +341,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config):
if resource_version:
kwargs['resource_version'] = resource_version
if kube_config.kube_client_request_args:
for key, value in kube_config.kube_client_request_args.iteritems():
for key, value in kube_config.kube_client_request_args.items():
kwargs[key] = value

last_resource_version = None
Expand Down Expand Up @@ -686,7 +688,7 @@ def clear_not_launched_queued_tasks(self, session=None):
)
kwargs = dict(label_selector=dict_string)
if self.kube_config.kube_client_request_args:
for key, value in self.kube_config.kube_client_request_args.iteritems():
for key, value in self.kube_config.kube_client_request_args.items():
kwargs[key] = value
pod_list = self.kube_client.list_namespaced_pod(
self.kube_config.kube_namespace, **kwargs)
Expand Down Expand Up @@ -801,6 +803,10 @@ def sync(self):
self.log.warning('ApiException when attempting to run task, re-queueing. '
'Message: %s' % json.loads(e.body)['message'])
self.task_queue.put(task)
except HTTPError as e:
self.log.warning('HTTPError when attempting to run task, re-queueing. '
'Exception: %s', str(e))
self.task_queue.put(task)
finally:
self.task_queue.task_done()
except Empty:
Expand Down

0 comments on commit 534d75b

Please sign in to comment.