Skip to content

Commit

Permalink
Fix issue with mounting volumes from secrets (#10366)
Browse files Browse the repository at this point in the history
* Fix issue with mounting volumes from secrets

Previously, we were using the "name" variable to merge or extend
items, however this was problematic in volumes generated by secrets
as the generated names would not collide leading to duplicate values

(cherry picked from commit 635b04e9331f46cfb8b1810485681e8dc4581f38)

* fix tests

* Simplified workflow. Secrets will no longer be converted to secret objects

Co-authored-by: Daniel Imberman <[email protected]>
  • Loading branch information
kaxil and dimberman authored Aug 18, 2020
1 parent 8746f52 commit c44fddf
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 51 deletions.
25 changes: 3 additions & 22 deletions airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def to_v1_kubernetes_pod(self):
)
for port in _extract_ports(self.ports):
pod = port.attach_to_pod(pod)
volumes, _ = _extract_volumes_and_secrets(self.volumes, self.volume_mounts)
volumes = _extract_volumes(self.volumes)
for volume in volumes:
pod = volume.attach_to_pod(pod)
for volume_mount in _extract_volume_mounts(self.volume_mounts):
Expand Down Expand Up @@ -279,37 +279,18 @@ def _extract_volume_mounts(volume_mounts):
sub_path=volume_mount.get("subPath"),
read_only=volume_mount.get("readOnly")
)

result.append(volume_mount)
return result


def _extract_volumes_and_secrets(volumes, volume_mounts):
def _extract_volumes(volumes):
result = []
volumes = volumes or [] # type: List[Union[k8s.V1Volume, dict]]
secrets = []
volume_mount_dict = {
volume_mount.name: volume_mount
for volume_mount in _extract_volume_mounts(volume_mounts)
}
for volume in volumes:
if isinstance(volume, k8s.V1Volume):
secret = _extract_volume_secret(volume, volume_mount_dict.get(volume.name, None))
if secret:
secrets.append(secret)
continue
volume = api_client.sanitize_for_serialization(volume)
volume = Volume(name=volume.get("name"), configs=volume)
if not isinstance(volume, Volume):
volume = Volume(name=volume.get("name"), configs=volume)
result.append(volume)
return result, secrets


def _extract_volume_secret(volume, volume_mount):
if not volume.secret:
return None
if volume_mount:
return Secret("volume", volume_mount.mount_path, volume.name, volume.secret.secret_name)
else:
return Secret("volume", None, volume.name, volume.secret.secret_name)
return result
18 changes: 11 additions & 7 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,11 @@ def reconcile_containers(base_containers,

client_container = client_containers[0]
base_container = base_containers[0]
client_container = extend_object_field(base_container, client_container, 'volume_mounts')
client_container = extend_object_field(
base_container,
client_container,
'volume_mounts',
'mount_path')
client_container = extend_object_field(base_container, client_container, 'env')
client_container = extend_object_field(base_container, client_container, 'env_from')
client_container = extend_object_field(base_container, client_container, 'ports')
Expand Down Expand Up @@ -623,7 +627,7 @@ def merge_objects(base_obj, client_obj):
return client_obj_cp


def extend_object_field(base_obj, client_obj, field_name):
def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name"):
"""
:param base_obj: an object which has a property `field_name` that is a list
:param client_obj: an object which has a property `field_name` that is a list.
Expand All @@ -646,8 +650,8 @@ def extend_object_field(base_obj, client_obj, field_name):
setattr(client_obj_cp, field_name, base_obj_field)
return client_obj_cp

base_obj_set = _get_dict_from_list(base_obj_field)
client_obj_set = _get_dict_from_list(client_obj_field)
base_obj_set = _get_dict_from_list(base_obj_field, field_to_merge)
client_obj_set = _get_dict_from_list(client_obj_field, field_to_merge)

appended_fields = _merge_list_of_objects(base_obj_set, client_obj_set)

Expand All @@ -666,16 +670,16 @@ def _merge_list_of_objects(base_obj_set, client_obj_set):
return appended_fields


def _get_dict_from_list(base_list):
def _get_dict_from_list(base_list, field_to_merge="name"):
"""
:type base_list: list(Optional[dict, *to_dict])
"""
result = {}
for obj in base_list:
if isinstance(obj, dict):
result[obj['name']] = obj
result[obj[field_to_merge]] = obj
elif hasattr(obj, "to_dict"):
result[obj.name] = obj
result[getattr(obj, field_to_merge)] = obj
else:
raise AirflowConfigException("Trying to merge invalid object {}".format(obj))
return result
5 changes: 2 additions & 3 deletions airflow/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow import AirflowException
from airflow import settings
from airflow.contrib.kubernetes.pod import (
Pod, _extract_env_vars_and_secrets, _extract_volumes_and_secrets, _extract_volume_mounts,
Pod, _extract_env_vars_and_secrets, _extract_volumes, _extract_volume_mounts,
_extract_ports, _extract_security_context
)
from airflow.kubernetes.kube_client import get_kube_client
Expand Down Expand Up @@ -300,8 +300,7 @@ def _convert_to_airflow_pod(pod):
"""
base_container = pod.spec.containers[0] # type: k8s.V1Container
env_vars, secrets = _extract_env_vars_and_secrets(base_container.env)
volumes, vol_secrets = _extract_volumes_and_secrets(pod.spec.volumes, base_container.volume_mounts)
secrets.extend(vol_secrets)
volumes = _extract_volumes(pod.spec.volumes)
api_client = ApiClient()
init_containers = pod.spec.init_containers
if pod.spec.init_containers is not None:
Expand Down
1 change: 0 additions & 1 deletion airflow/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ def as_pod(self):
"""Creates POD."""
if self.kube_config.pod_template_file:
return PodGenerator(pod_template_file=self.kube_config.pod_template_file).gen_pod()

pod = PodGenerator(
image=self.kube_config.kube_image,
image_pull_policy=self.kube_config.kube_image_pull_policy or 'IfNotPresent',
Expand Down
34 changes: 34 additions & 0 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,40 @@ def test_delete_operator_pod(self):
self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])

def test_pod_with_volume_secret(self):
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
in_cluster=False,
labels={"foo": "bar"},
arguments=["echo 10"],
secrets=[Secret(
deploy_type="volume",
deploy_target="/var/location",
secret="my-secret",
key="content.json",
)],
name="airflow-test-pod",
task_id="task",
get_logs=True,
is_delete_operator_pod=True,
)

context = self.create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['volumeMounts'] = [
{'mountPath': '/var/location',
'name': mock.ANY,
'readOnly': True}]
self.expected_pod['spec']['volumes'] = [
{'name': mock.ANY,
'secret': {'secretName': 'my-secret'}}
]
self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])

def test_pod_hostnetwork(self):
k = KubernetesPodOperator(
namespace='default',
Expand Down
25 changes: 25 additions & 0 deletions scripts/ci/kubernetes/secrets.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
---
apiVersion: v1
kind: Secret
metadata:
name: my-secret
type: Opaque
data:
username: YWlyZmxvdw==
password: YWlyZmxvdw==
1 change: 1 addition & 0 deletions scripts/ci/libraries/_kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ function deploy_test_kubernetes_resources() {
echo "Deploying Custom kubernetes resources"
echo
verbose_kubectl apply -f "scripts/ci/kubernetes/volumes.yaml" --namespace default
verbose_kubectl apply -f "scripts/ci/kubernetes/secrets.yaml" --namespace default
}


Expand Down
3 changes: 0 additions & 3 deletions tests/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,6 @@ def test_reconcile_pods(self, mock_uuid):
'name': 'port',
}],
'volumeMounts': [{
'mountPath': '/foo/',
'name': 'example-kubernetes-test-volume1'
}, {
'mountPath': '/foo/',
'name': 'example-kubernetes-test-volume2'
}]
Expand Down
17 changes: 10 additions & 7 deletions tests/kubernetes/test_pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,12 @@ def test_convert_to_airflow_pod(self):
name="airflow-secret",
secret=k8s.V1SecretVolumeSource(
secret_name="secret-name",

)
),
k8s.V1Volume(
name="init-secret",
secret=k8s.V1SecretVolumeSource(
secret_name="secret-name",
secret_name="init-secret",
)
)
]
Expand Down Expand Up @@ -283,22 +282,26 @@ def test_convert_to_airflow_pod(self):
),
VolumeMount(
name="airflow-secret",
read_only=True,
mount_path="/opt/mount",
sub_path=None,
read_only=True
)],
secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key"),
Secret('volume', '/opt/mount', 'airflow-secret', "secret-name"),
Secret('volume', None, 'init-secret', 'secret-name')],
secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key")],
security_context={'fsGroup': 0, 'runAsUser': 0},
volumes=[Volume(name="myvolume", configs={'name': 'myvolume'}),
Volume(name="airflow-config", configs={'configMap': {'data': 'airflow-data'},
'name': 'airflow-config'})]
'name': 'airflow-config'}),
Volume(name='airflow-secret', configs={'name': 'airflow-secret',
'secret': {'secretName': 'secret-name'}}),
Volume(name='init-secret', configs={'name': 'init-secret', 'secret':
{'secretName': 'init-secret'}})],
)
expected_dict = expected.as_dict()
result_dict = result_pod.as_dict()
print(result_pod.volume_mounts)
parsed_configs = self.pull_out_volumes(result_dict)
result_dict['volumes'] = parsed_configs
self.assertEqual(result_dict['secrets'], expected_dict['secrets'])
self.assertDictEqual(expected_dict, result_dict)

@staticmethod
Expand Down
9 changes: 9 additions & 0 deletions tests/kubernetes/test_worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def test_make_pod_run_as_user_0(self):
self.kube_config.worker_run_as_user = 0
self.kube_config.dags_volume_claim = None
self.kube_config.dags_volume_host = None
self.kube_config.base_log_folder = '/logs'
self.kube_config.dags_in_image = None
self.kube_config.worker_fs_group = None
self.kube_config.git_dags_folder_mount_point = 'dags'
Expand All @@ -369,6 +370,7 @@ def test_make_pod_run_as_user_0(self):
def test_make_pod_assert_labels(self):
# Tests the pod created has all the expected labels set
self.kube_config.dags_folder = 'dags'
self.kube_config.base_log_folder = '/logs'

worker_config = WorkerConfiguration(self.kube_config)
pod = PodGenerator.construct_pod(
Expand Down Expand Up @@ -402,6 +404,7 @@ def test_make_pod_git_sync_ssh_without_known_hosts(self):
self.kube_config.dags_volume_host = None
self.kube_config.dags_in_image = None
self.kube_config.worker_fs_group = None
self.kube_config.base_log_folder = '/logs'
self.kube_config.git_dags_folder_mount_point = 'dags'
self.kube_config.git_sync_dest = 'repo'
self.kube_config.git_subpath = 'path'
Expand Down Expand Up @@ -431,6 +434,7 @@ def test_make_pod_git_sync_credentials_secret(self):
self.kube_config.dags_volume_host = None
self.kube_config.dags_in_image = None
self.kube_config.worker_fs_group = None
self.kube_config.base_log_folder = '/logs'
self.kube_config.git_dags_folder_mount_point = 'dags'
self.kube_config.git_sync_dest = 'repo'
self.kube_config.git_subpath = 'path'
Expand Down Expand Up @@ -469,6 +473,7 @@ def test_make_pod_git_sync_rev(self):
self.kube_config.dags_volume_host = None
self.kube_config.dags_in_image = None
self.kube_config.worker_fs_group = None
self.kube_config.base_log_folder = '/logs'
self.kube_config.git_dags_folder_mount_point = 'dags'
self.kube_config.git_sync_dest = 'repo'
self.kube_config.git_subpath = 'path'
Expand Down Expand Up @@ -515,6 +520,7 @@ def test_make_pod_with_empty_executor_config(self):
self.kube_config.kube_tolerations = self.tolerations_config
self.kube_config.kube_annotations = self.worker_annotations_config
self.kube_config.dags_folder = 'dags'
self.kube_config.base_log_folder = '/logs'
worker_config = WorkerConfiguration(self.kube_config)
pod = worker_config.as_pod()

Expand All @@ -533,6 +539,7 @@ def test_make_pod_with_empty_executor_config(self):

def test_make_pod_with_executor_config(self):
self.kube_config.dags_folder = 'dags'
self.kube_config.base_log_folder = '/logs'
worker_config = WorkerConfiguration(self.kube_config)
config_pod = PodGenerator(
image='',
Expand Down Expand Up @@ -722,6 +729,7 @@ def test_set_airflow_configmap_different_for_local_setting(self):
configmap than airflow_configmap (airflow.cfg)
"""
self.kube_config.airflow_home = '/usr/local/airflow'
self.kube_config.base_log_folder = '/logs'
self.kube_config.airflow_configmap = 'airflow-configmap'
self.kube_config.airflow_local_settings_configmap = 'airflow-ls-configmap'
self.kube_config.dags_folder = '/workers/path/to/dags'
Expand Down Expand Up @@ -792,6 +800,7 @@ def test_make_pod_with_image_pull_secrets(self):
self.kube_config.dags_volume_host = None
self.kube_config.dags_in_image = None
self.kube_config.git_dags_folder_mount_point = 'dags'
self.kube_config.base_log_folder = '/logs'
self.kube_config.git_sync_dest = 'repo'
self.kube_config.git_subpath = 'path'
self.kube_config.image_pull_secrets = 'image_pull_secret1,image_pull_secret2'
Expand Down
16 changes: 8 additions & 8 deletions tests/test_local_settings/test_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,14 @@ def test_pod_mutation_to_k8s_pod(self):
'resources': {'limits': {'nvidia.com/gpu': '200G'},
'requests': {'cpu': '200Mi',
'memory': '2G'}},
'volumeMounts': [{'mountPath': '/opt/airflow/secrets/',
'name': 'airflow-secrets-mount',
'readOnly': True},
{'mountPath': '/mnt',
'name': 'foo',
'readOnly': True,
'subPath': '/'}
]}],
'volumeMounts': [
{'mountPath': '/mnt',
'name': 'foo',
'readOnly': True,
'subPath': '/'},
{'mountPath': '/opt/airflow/secrets/',
'name': 'airflow-secrets-mount',
'readOnly': True}]}],
'hostNetwork': False,
'imagePullSecrets': [],
'initContainers': [{'name': 'init-container',
Expand Down

0 comments on commit c44fddf

Please sign in to comment.