diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a7fa7a688501a..9b632008f6cc0 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1827,6 +1827,13 @@ type: string example: ~ default: "" + - name: dags_volume_mount_point + description: | + For either git sync or volume mounted DAGs, the worker will mount the volume in this path + version_added: ~ + type: string + example: ~ + default: "" - name: dags_volume_claim description: | For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 27fe92a388708..ca9de120c0806 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -853,6 +853,9 @@ dags_in_image = False # For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs dags_volume_subpath = +# For either git sync or volume mounted DAGs, the worker will mount the volume in this path +dags_volume_mount_point = + # For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path) dags_volume_claim = diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 6ec26608e9bca..98e31547b5598 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -125,6 +125,8 @@ def __init__(self): # DAGs directly self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim') + self.dags_volume_mount_point = conf.get(self.kubernetes_section, 'dags_volume_mount_point') + # This prop may optionally be set for PV Claims and is used to write logs self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim') diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py index 820763b12d9e3..9c35910816ba8 100644 --- a/airflow/kubernetes/worker_configuration.py +++ b/airflow/kubernetes/worker_configuration.py @@ -412,6 +412,11 @@ def _construct_volume(name, claim, host): return list(volumes.values()) def generate_dag_volume_mount_path(self): + """Generate path for DAG volume""" + + if self.kube_config.dags_volume_mount_point: + return self.kube_config.dags_volume_mount_point + if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host: return self.worker_airflow_dags diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py index 73b3f2099a7b0..0730595e05738 100644 --- a/tests/kubernetes/test_worker_configuration.py +++ b/tests/kubernetes/test_worker_configuration.py @@ -88,6 +88,7 @@ def setUp(self): self.kube_config.airflow_dags = 'dags' self.kube_config.airflow_logs = 'logs' self.kube_config.dags_volume_subpath = None + self.kube_config.dags_volume_mount_point = None self.kube_config.logs_volume_subpath = None self.kube_config.dags_in_image = False self.kube_config.dags_folder = None @@ -145,6 +146,11 @@ def test_worker_generate_dag_volume_mount_path(self): dag_volume_mount_path = worker_config.generate_dag_volume_mount_path() self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder) + self.kube_config.dags_volume_mount_point = '/root/airflow/package' + dag_volume_mount_path = worker_config.generate_dag_volume_mount_path() + self.assertEqual(dag_volume_mount_path, '/root/airflow/package') + self.kube_config.dags_volume_mount_point = '' + self.kube_config.dags_volume_claim = '' self.kube_config.dags_volume_host = '/host/airflow/dags' dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()