diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 972ad2774dc2c..beaf292f08a47 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1887,6 +1887,13 @@ type: string example: ~ default: "default" + - name: multi_namespace_mode + description: | + Allows users to launch pods in multiple namespaces. + Will require creating a cluster-role for the scheduler + type: boolean + example: ~ + default: "False" - name: airflow_configmap description: | The name of the Kubernetes ConfigMap containing ``airflow.cfg`` file. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bddb418510594..940d3651bf115 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -899,6 +899,10 @@ worker_pods_creation_batch_size = 1 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` namespace = default +# Allows users to launch pods in multiple namespaces. +# Will require creating a cluster-role for the scheduler +multi_namespace_mode = False + # The name of the Kubernetes ConfigMap containing ``airflow.cfg`` file. # # For example: diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3a32f55b74e9e..7f89ae3518dde 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -23,6 +23,7 @@ """ import base64 import datetime +import functools import json import multiprocessing import time @@ -188,6 +189,7 @@ def __init__(self): # pylint: disable=too-many-statements # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. self.kube_namespace = conf.get(self.kubernetes_section, 'namespace') + self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode') # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to @@ -287,11 +289,15 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" def __init__(self, + namespace: Optional[str], + mult_namespace_mode: bool, watcher_queue: 'Queue[KubernetesWatchType]', resource_version: Optional[str], worker_uuid: Optional[str], kube_config: Configuration): super().__init__() + self.namespace = namespace + self.multi_namespace_mode = mult_namespace_mode self.worker_uuid = worker_uuid self.watcher_queue = watcher_queue self.resource_version = resource_version @@ -336,7 +342,16 @@ def _run(self, kwargs[key] = value last_resource_version: Optional[str] = None - for event in watcher.stream(kube_client.list_pod_for_all_namespaces, **kwargs): + if self.multi_namespace_mode: + list_worker_pods = functools.partial(watcher.stream, + kube_client.list_pod_for_all_namespaces, + **kwargs) + else: + list_worker_pods = functools.partial(watcher.stream, + kube_client.list_namespaced_pod, + self.namespace, + **kwargs) + for event in list_worker_pods(): task = event['object'] self.log.info( 'Event: %s had an event of type %s', @@ -430,6 +445,8 @@ def __init__(self, def _make_kube_watcher(self) -> KubernetesJobWatcher: resource_version = KubeResourceVersion.get_current_resource_version() watcher = KubernetesJobWatcher(watcher_queue=self.watcher_queue, + namespace=self.kube_config.kube_namespace, + mult_namespace_mode=self.kube_config.multi_namespace_mode, resource_version=resource_version, worker_uuid=self.worker_uuid, kube_config=self.kube_config)