From aca3dc2b9fad112dafbf882d4005bbebd6f0eb78 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 8 Jan 2020 14:41:22 +0000 Subject: [PATCH] [AIRFLOW-6504] Allow specifying configmap for Airflow Local Setting (#7097) --- airflow/config_templates/config.yml | 31 ++++++++++++++- airflow/config_templates/default_airflow.cfg | 30 ++++++++++++++- .../contrib/executors/kubernetes_executor.py | 5 +++ .../kubernetes/worker_configuration.py | 16 ++++++++ .../executors/test_kubernetes_executor.py | 38 +++++++++++++++++++ 5 files changed, 117 insertions(+), 3 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d9b915136e808f..42c42bfd332641 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1618,10 +1618,37 @@ default: "default" - name: airflow_configmap description: | - The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file) + The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) version_added: ~ type: string - example: ~ + example: "airflow-configmap" + default: "" + - name: airflow_local_settings_configmap + description: | + The name of the Kubernetes ConfigMap containing ``airflow_local_settings.py`` file. + + For example: + + ``airflow_local_settings_configmap = "airflow-configmap"`` if you have the following ConfigMap. + + ``airflow-configmap.yaml``: + + .. code-block:: yaml + + --- + apiVersion: v1 + kind: ConfigMap + metadata: + name: airflow-configmap + data: + airflow_local_settings.py: | + def pod_mutation_hook(pod): + ... + airflow.cfg: | + ... + version_added: ~ + type: string + example: "airflow-configmap" default: "" - name: dags_in_image description: | diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 12a2a7e92c359d..d44ae96f4eac9a 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -383,6 +383,9 @@ cookie_samesite = # Default setting for wrap toggle on DAG code and TI log views. default_wrap = False +# Allow the UI to be rendered in a frame +x_frame_enabled = True + # Send anonymous user activity to your analytics tool # choose from google_analytics, segment, or metarouter # analytics_tool = @@ -733,9 +736,34 @@ worker_pods_creation_batch_size = 1 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` namespace = default -# The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file) +# The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) +# Example: airflow_configmap = airflow-configmap airflow_configmap = +# The name of the Kubernetes ConfigMap containing ``airflow_local_settings.py`` file. +# +# For example: +# +# ``airflow_local_settings_configmap = "airflow-configmap"`` if you have the following ConfigMap. +# +# ``airflow-configmap.yaml``: +# +# .. code-block:: yaml +# +# --- +# apiVersion: v1 +# kind: ConfigMap +# metadata: +# name: airflow-configmap +# data: +# airflow_local_settings.py: | +# def pod_mutation_hook(pod): +# ... +# airflow.cfg: | +# ... +# Example: airflow_local_settings_configmap = airflow-configmap +airflow_local_settings_configmap = + # For docker image already contains DAGs, this is set to ``True``, and the worker will # search for dags in dags_folder, # otherwise use git sync or dags volume claim to mount DAGs diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index fc57c5972743af..746fcf9d4ee4c6 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -261,6 +261,11 @@ def __init__(self): # configmap self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap') + # The worker pod may optionally have a valid Airflow local settings loaded via a + # configmap + self.airflow_local_settings_configmap = conf.get( + self.kubernetes_section, 'airflow_local_settings_configmap') + affinity_json = conf.get(self.kubernetes_section, 'affinity') if affinity_json: self.kube_affinity = json.loads(affinity_json) diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 6d789f6a4819eb..acdb1eb08e0cd9 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -332,6 +332,22 @@ def _construct_volume(name, claim, host): 'readOnly': True } + if self.kube_config.airflow_local_settings_configmap: + config_volume_name = 'airflow-config' + config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home) + volumes[config_volume_name] = { + 'name': config_volume_name, + 'configMap': { + 'name': self.kube_config.airflow_local_settings_configmap + } + } + volume_mounts[config_volume_name] = { + 'name': config_volume_name, + 'mountPath': config_path, + 'subPath': 'airflow_local_settings.py', + 'readOnly': True + } + return volumes, volume_mounts def generate_dag_volume_mount_path(self): diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 02d3dbe301b10f..eebe8e69f8cbda 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -688,6 +688,44 @@ def test_worker_container_dags(self): self.assertEqual(0, len(dag_volume_mount)) self.assertEqual(0, len(init_containers)) + def test_set_airflow_local_settings_configmap(self): + """ + Test that airflow_local_settings.py can be set via configmap by + checking volume & volume-mounts are set correctly. + """ + self.kube_config.airflow_home = '/usr/local/airflow' + self.kube_config.airflow_configmap = 'airflow-configmap' + self.kube_config.airflow_local_settings_configmap = 'airflow-configmap' + self.kube_config.dags_folder = '/workers/path/to/dags' + + worker_config = WorkerConfiguration(self.kube_config) + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'") + + airflow_config_volume = [ + volume for volume in pod.spec.volumes if volume.name == 'airflow-config' + ] + # Test that volume_name is found + self.assertEqual(1, len(airflow_config_volume)) + + # Test that config map exists + self.assertEqual("airflow-configmap", airflow_config_volume[0].config_map.name) + + # Test Volume Mount exists + local_setting_volume_mount = [ + volume_mount for volume_mount in pod.spec.containers[0].volume_mounts + if volume_mount.name == 'airflow-config' + ] + self.assertEqual(1, len(local_setting_volume_mount)) + + # Test Mounth Path is set correctly. + self.assertEqual( + '/usr/local/airflow/config/airflow_local_settings.py', + local_setting_volume_mount[0].mount_path + ) + self.assertEqual(True, local_setting_volume_mount[0].read_only) + self.assertEqual('airflow_local_settings.py', local_setting_volume_mount[0].sub_path) + def test_kubernetes_environment_variables(self): # Tests the kubernetes environment variables get copied into the worker pods input_environment = {