From 686716bdd7f3c34f2dcc182431e0a371ee1c7964 Mon Sep 17 00:00:00 2001 From: "Brandon T. Willard" <971601+brandonwillard@users.noreply.github.com> Date: Tue, 7 Apr 2020 15:35:09 -0500 Subject: [PATCH] [AIRFLOW-6778] Add a configurable DAGs volume mount path for Kubernetes (#8147) (cherry picked from commit 75896c30cf37002585e3b17efa002da279090f76) --- airflow/config_templates/config.yml | 7 +++++++ airflow/config_templates/default_airflow.cfg | 3 +++ airflow/executors/kubernetes_executor.py | 2 ++ airflow/kubernetes/worker_configuration.py | 5 +++++ tests/kubernetes/test_worker_configuration.py | 6 ++++++ 5 files changed, 23 insertions(+) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a7fa7a688501a7..9b632008f6cc05 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1827,6 +1827,13 @@ type: string example: ~ default: "" + - name: dags_volume_mount_point + description: | + For either git sync or volume mounted DAGs, the worker will mount the volume in this path + version_added: ~ + type: string + example: ~ + default: "" - name: dags_volume_claim description: | For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 27fe92a3887080..ca9de120c0806b 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -853,6 +853,9 @@ dags_in_image = False # For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs dags_volume_subpath = +# For either git sync or volume mounted DAGs, the worker will mount the volume in this path +dags_volume_mount_point = + # For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path) dags_volume_claim = diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 6ec26608e9bca9..98e31547b55982 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -125,6 +125,8 @@ def __init__(self): # DAGs directly self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim') + self.dags_volume_mount_point = conf.get(self.kubernetes_section, 'dags_volume_mount_point') + # This prop may optionally be set for PV Claims and is used to write logs self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim') diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py index 820763b12d9e3a..9c35910816ba81 100644 --- a/airflow/kubernetes/worker_configuration.py +++ b/airflow/kubernetes/worker_configuration.py @@ -412,6 +412,11 @@ def _construct_volume(name, claim, host): return list(volumes.values()) def generate_dag_volume_mount_path(self): + """Generate path for DAG volume""" + + if self.kube_config.dags_volume_mount_point: + return self.kube_config.dags_volume_mount_point + if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host: return self.worker_airflow_dags diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py index 73b3f2099a7b0c..0730595e05738c 100644 --- a/tests/kubernetes/test_worker_configuration.py +++ b/tests/kubernetes/test_worker_configuration.py @@ -88,6 +88,7 @@ def setUp(self): self.kube_config.airflow_dags = 'dags' self.kube_config.airflow_logs = 'logs' self.kube_config.dags_volume_subpath = None + self.kube_config.dags_volume_mount_point = None self.kube_config.logs_volume_subpath = None self.kube_config.dags_in_image = False self.kube_config.dags_folder = None @@ -145,6 +146,11 @@ def test_worker_generate_dag_volume_mount_path(self): dag_volume_mount_path = worker_config.generate_dag_volume_mount_path() self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder) + self.kube_config.dags_volume_mount_point = '/root/airflow/package' + dag_volume_mount_path = worker_config.generate_dag_volume_mount_path() + self.assertEqual(dag_volume_mount_path, '/root/airflow/package') + self.kube_config.dags_volume_mount_point = '' + self.kube_config.dags_volume_claim = '' self.kube_config.dags_volume_host = '/host/airflow/dags' dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()