Skip to content

Commit

Permalink
Makes multi-namespace mode optional (#9570)
Browse files Browse the repository at this point in the history
Running the airflow k8sexecutor with multiple namespace abilities
requires creating a ClusterRole which can break existing deployments

Co-authored-by: Daniel Imberman <[email protected]>
(cherry picked from commit 2e3c878)
  • Loading branch information
dimberman committed Aug 11, 2020
1 parent 14ca77d commit 71c6ace
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 5 deletions.
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,13 @@
type: string
example: ~
default: "default"
- name: multi_namespace_mode
description: |
Allows users to launch pods in multiple namespaces.
Will require creating a cluster-role for the scheduler
type: boolean
example: ~
default: "False"
- name: airflow_configmap
description: |
The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
# The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
namespace = default

# Allows users to launch pods in multiple namespaces.
# Will require creating a cluster-role for the scheduler
multi_namespace_mode = False

# The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
# Example: airflow_configmap = airflow-configmap
airflow_configmap =
Expand Down
32 changes: 27 additions & 5 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
:ref:`executor:KubernetesExecutor`
"""
import base64
import functools
import json
import multiprocessing
import time
Expand Down Expand Up @@ -162,6 +163,7 @@ def __init__(self):
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode')
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
Expand Down Expand Up @@ -254,9 +256,17 @@ def _validate(self):

class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs"""
def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config):

def __init__(self,
namespace,
mult_namespace_mode,
watcher_queue,
resource_version,
worker_uuid,
kube_config):
multiprocessing.Process.__init__(self)
self.namespace = namespace
self.multi_namespace_mode = mult_namespace_mode
self.worker_uuid = worker_uuid
self.watcher_queue = watcher_queue
self.resource_version = resource_version
Expand Down Expand Up @@ -295,8 +305,16 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config):
kwargs[key] = value

last_resource_version = None
for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
**kwargs):
if self.multi_namespace_mode:
list_worker_pods = functools.partial(watcher.stream,
kube_client.list_pod_for_all_namespaces,
**kwargs)
else:
list_worker_pods = functools.partial(watcher.stream,
kube_client.list_namespaced_pod,
self.namespace,
**kwargs)
for event in list_worker_pods():
task = event['object']
self.log.info(
'Event: %s had an event of type %s',
Expand Down Expand Up @@ -377,8 +395,12 @@ def __init__(self, kube_config, task_queue, result_queue, kube_client, worker_uu

def _make_kube_watcher(self):
resource_version = KubeResourceVersion.get_current_resource_version()
watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue,
resource_version, self.worker_uuid, self.kube_config)
watcher = KubernetesJobWatcher(watcher_queue=self.watcher_queue,
namespace=self.kube_config.kube_namespace,
mult_namespace_mode=self.kube_config.multi_namespace_mode,
resource_version=resource_version,
worker_uuid=self.worker_uuid,
kube_config=self.kube_config)
watcher.start()
return watcher

Expand Down

0 comments on commit 71c6ace

Please sign in to comment.