Skip to content

Commit

Permalink
[AIRFLOW-6778] Add a configurable DAGs volume mount path for Kubernet…
Browse files Browse the repository at this point in the history
…es (#8147)

(cherry picked from commit 75896c3)
  • Loading branch information
brandonwillard authored and potiuk committed Jun 29, 2020
1 parent dbae79e commit a0ed8ec
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 0 deletions.
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,13 @@
type: string
example: ~
default: ""
- name: dags_volume_mount_point
description: |
For either git sync or volume mounted DAGs, the worker will mount the volume in this path
version_added: ~
type: string
example: ~
default: ""
- name: dags_volume_claim
description: |
For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path)
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,9 @@ dags_in_image = False
# For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs
dags_volume_subpath =

# For either git sync or volume mounted DAGs, the worker will mount the volume in this path
dags_volume_mount_point =

# For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path)
dags_volume_claim =

Expand Down
2 changes: 2 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def __init__(self):
# DAGs directly
self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim')

self.dags_volume_mount_point = conf.get(self.kubernetes_section, 'dags_volume_mount_point')

# This prop may optionally be set for PV Claims and is used to write logs
self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim')

Expand Down
5 changes: 5 additions & 0 deletions airflow/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ def _construct_volume(name, claim, host):
return list(volumes.values())

def generate_dag_volume_mount_path(self):
"""Generate path for DAG volume"""

if self.kube_config.dags_volume_mount_point:
return self.kube_config.dags_volume_mount_point

if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host:
return self.worker_airflow_dags

Expand Down
6 changes: 6 additions & 0 deletions tests/kubernetes/test_worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def setUp(self):
self.kube_config.airflow_dags = 'dags'
self.kube_config.airflow_logs = 'logs'
self.kube_config.dags_volume_subpath = None
self.kube_config.dags_volume_mount_point = None
self.kube_config.logs_volume_subpath = None
self.kube_config.dags_in_image = False
self.kube_config.dags_folder = None
Expand Down Expand Up @@ -145,6 +146,11 @@ def test_worker_generate_dag_volume_mount_path(self):
dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)

self.kube_config.dags_volume_mount_point = '/root/airflow/package'
dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
self.assertEqual(dag_volume_mount_path, '/root/airflow/package')
self.kube_config.dags_volume_mount_point = ''

self.kube_config.dags_volume_claim = ''
self.kube_config.dags_volume_host = '/host/airflow/dags'
dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
Expand Down

0 comments on commit a0ed8ec

Please sign in to comment.