Skip to content
This repository has been archived by the owner on May 22, 2021. It is now read-only.

Commit

Permalink
[AIRFLOW-6504] Allow specifying configmap for Airflow Local Setting (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored and galuszkak committed Mar 5, 2020
1 parent 9616d97 commit d4fdec6
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 3 deletions.
31 changes: 29 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1546,10 +1546,37 @@
default: "default"
- name: airflow_configmap
description: |
The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file)
The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
version_added: ~
type: string
example: ~
example: "airflow-configmap"
default: ""
- name: airflow_local_settings_configmap
description: |
The name of the Kubernetes ConfigMap containing ``airflow_local_settings.py`` file.
For example:
``airflow_local_settings_configmap = "airflow-configmap"`` if you have the following ConfigMap.
``airflow-configmap.yaml``:
.. code-block:: yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
data:
airflow_local_settings.py: |
def pod_mutation_hook(pod):
...
airflow.cfg: |
...
version_added: ~
type: string
example: "airflow-configmap"
default: ""
- name: dags_in_image
description: |
Expand Down
27 changes: 26 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -694,9 +694,34 @@ worker_pods_creation_batch_size = 1
# The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
namespace = default

# The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file)
# The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file)
# Example: airflow_configmap = airflow-configmap
airflow_configmap =

# The name of the Kubernetes ConfigMap containing ``airflow_local_settings.py`` file.
#
# For example:
#
# ``airflow_local_settings_configmap = "airflow-configmap"`` if you have the following ConfigMap.
#
# ``airflow-configmap.yaml``:
#
# .. code-block:: yaml
#
# ---
# apiVersion: v1
# kind: ConfigMap
# metadata:
# name: airflow-configmap
# data:
# airflow_local_settings.py: |
# def pod_mutation_hook(pod):
# ...
# airflow.cfg: |
# ...
# Example: airflow_local_settings_configmap = airflow-configmap
airflow_local_settings_configmap =

# For docker image already contains DAGs, this is set to ``True``, and the worker will
# search for dags in dags_folder,
# otherwise use git sync or dags volume claim to mount DAGs
Expand Down
5 changes: 5 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ def __init__(self): # pylint: disable=too-many-statements
# configmap
self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap')

# The worker pod may optionally have a valid Airflow local settings loaded via a
# configmap
self.airflow_local_settings_configmap = conf.get(
self.kubernetes_section, 'airflow_local_settings_configmap')

affinity_json = conf.get(self.kubernetes_section, 'affinity')
if affinity_json:
self.kube_affinity = json.loads(affinity_json)
Expand Down
21 changes: 21 additions & 0 deletions airflow/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ def _get_volume_mounts(self) -> List[k8s.V1VolumeMount]:
read_only=True
)

# Mount the airflow_local_settings.py file via a configmap the user has specified
if self.kube_config.airflow_local_settings_configmap:
config_volume_name = 'airflow-config'
config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
volume_mounts[config_volume_name] = k8s.V1VolumeMount(
name=config_volume_name,
mount_path=config_path,
sub_path='airflow_local_settings.py',
read_only=True
)

return list(volume_mounts.values())

def _get_volumes(self) -> List[k8s.V1Volume]:
Expand Down Expand Up @@ -349,6 +360,16 @@ def _construct_volume(name, claim, host) -> k8s.V1Volume:
)
)

# Mount the airflow_local_settings.py file via a configmap the user has specified
if self.kube_config.airflow_local_settings_configmap:
config_volume_name = 'airflow-config'
volumes[config_volume_name] = k8s.V1Volume(
name=config_volume_name,
config_map=k8s.V1ConfigMapVolumeSource(
name=self.kube_config.airflow_local_settings_configmap
)
)

return list(volumes.values())

def generate_dag_volume_mount_path(self) -> str:
Expand Down
38 changes: 38 additions & 0 deletions tests/kubernetes/test_worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,44 @@ def test_worker_container_dags(self):
self.assertEqual(0, len(dag_volume_mount))
self.assertEqual(0, len(init_containers))

def test_set_airflow_local_settings_configmap(self):
"""
Test that airflow_local_settings.py can be set via configmap by
checking volume & volume-mounts are set correctly.
"""
self.kube_config.airflow_home = '/usr/local/airflow'
self.kube_config.airflow_configmap = 'airflow-configmap'
self.kube_config.airflow_local_settings_configmap = 'airflow-configmap'
self.kube_config.dags_folder = '/workers/path/to/dags'

worker_config = WorkerConfiguration(self.kube_config)
pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
"test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")

airflow_config_volume = [
volume for volume in pod.spec.volumes if volume.name == 'airflow-config'
]
# Test that volume_name is found
self.assertEqual(1, len(airflow_config_volume))

# Test that config map exists
self.assertEqual("airflow-configmap", airflow_config_volume[0].config_map.name)

# Test Volume Mount exists
local_setting_volume_mount = [
volume_mount for volume_mount in pod.spec.containers[0].volume_mounts
if volume_mount.name == 'airflow-config'
]
self.assertEqual(1, len(local_setting_volume_mount))

# Test Mounth Path is set correctly.
self.assertEqual(
'/usr/local/airflow/config/airflow_local_settings.py',
local_setting_volume_mount[0].mount_path
)
self.assertEqual(True, local_setting_volume_mount[0].read_only)
self.assertEqual('airflow_local_settings.py', local_setting_volume_mount[0].sub_path)

def test_kubernetes_environment_variables(self):
# Tests the kubernetes environment variables get copied into the worker pods
input_environment = {
Expand Down

0 comments on commit d4fdec6

Please sign in to comment.