Skip to content

Commit

Permalink
Makes multi-namespace mode optional (#9570)
Browse files Browse the repository at this point in the history
Running the airflow k8sexecutor with multiple namespace abilities
requires creating a ClusterRole which can break existing deployments

Co-authored-by: Daniel Imberman <[email protected]>
  • Loading branch information
dimberman and astro-sql-decorator authored Aug 10, 2020
1 parent dd17c0e commit 2e3c878
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,13 @@
type: string
example: ~
default: "default"
- name: multi_namespace_mode
description: |
Allows users to launch pods in multiple namespaces.
Will require creating a cluster-role for the scheduler
type: boolean
example: ~
default: "False"
- name: airflow_configmap
description: |
The name of the Kubernetes ConfigMap containing ``airflow.cfg`` file.
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,10 @@ worker_pods_creation_batch_size = 1
# The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
namespace = default

# Allows users to launch pods in multiple namespaces.
# Will require creating a cluster-role for the scheduler
multi_namespace_mode = False

# The name of the Kubernetes ConfigMap containing ``airflow.cfg`` file.
#
# For example:
Expand Down
19 changes: 18 additions & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""
import base64
import datetime
import functools
import json
import multiprocessing
import time
Expand Down Expand Up @@ -188,6 +189,7 @@ def __init__(self): # pylint: disable=too-many-statements
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode')
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
Expand Down Expand Up @@ -287,11 +289,15 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs"""

def __init__(self,
namespace: Optional[str],
mult_namespace_mode: bool,
watcher_queue: 'Queue[KubernetesWatchType]',
resource_version: Optional[str],
worker_uuid: Optional[str],
kube_config: Configuration):
super().__init__()
self.namespace = namespace
self.multi_namespace_mode = mult_namespace_mode
self.worker_uuid = worker_uuid
self.watcher_queue = watcher_queue
self.resource_version = resource_version
Expand Down Expand Up @@ -336,7 +342,16 @@ def _run(self,
kwargs[key] = value

last_resource_version: Optional[str] = None
for event in watcher.stream(kube_client.list_pod_for_all_namespaces, **kwargs):
if self.multi_namespace_mode:
list_worker_pods = functools.partial(watcher.stream,
kube_client.list_pod_for_all_namespaces,
**kwargs)
else:
list_worker_pods = functools.partial(watcher.stream,
kube_client.list_namespaced_pod,
self.namespace,
**kwargs)
for event in list_worker_pods():
task = event['object']
self.log.info(
'Event: %s had an event of type %s',
Expand Down Expand Up @@ -430,6 +445,8 @@ def __init__(self,
def _make_kube_watcher(self) -> KubernetesJobWatcher:
resource_version = KubeResourceVersion.get_current_resource_version()
watcher = KubernetesJobWatcher(watcher_queue=self.watcher_queue,
namespace=self.kube_config.kube_namespace,
mult_namespace_mode=self.kube_config.multi_namespace_mode,
resource_version=resource_version,
worker_uuid=self.worker_uuid,
kube_config=self.kube_config)
Expand Down

0 comments on commit 2e3c878

Please sign in to comment.