Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix more PodMutationHook issues for backwards compatibility #10084

Merged
merged 33 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6a0e28d
Fix more PodMutationHook issues for backwards compatibility
kaxil Aug 1, 2020
7db97ad
fixup! Fix more PodMutationHook issues for backwards compatibility
kaxil Aug 1, 2020
1071847
fixup! fixup! Fix more PodMutationHook issues for backwards compatibi…
kaxil Aug 2, 2020
09bfca0
fixup! fixup! fixup! fixup! Fix more PodMutationHook issues for backw…
kaxil Aug 2, 2020
91aee72
fixup! fixup! fixup! fixup! fixup! Fix more PodMutationHook issues fo…
kaxil Aug 2, 2020
6a41a6f
Add SecurityContext backwards compatibility fixes
Aug 3, 2020
f262756
More SecurityContext testing
dimberman Aug 3, 2020
c9c288e
Fix issue where resources default to zero
dimberman Aug 4, 2020
6e54525
Fixes issue where labels and annotations are not propagated
dimberman Aug 4, 2020
888d5bd
fix webserver by adding original KubernetesExecutor path
dimberman Aug 4, 2020
9e8b114
add license
dimberman Aug 4, 2020
ee87f8c
get static tests to pass
dimberman Aug 4, 2020
12f9521
Fixes issue preventing airflow.cfg configMap from mounting
dimberman Aug 5, 2020
6fa6553
Fix volume and env var secrets
dimberman Aug 5, 2020
b52ab6e
add secret key
dimberman Aug 5, 2020
18e45e5
fixup! Fix volume and env var secrets
kaxil Aug 5, 2020
bc54c82
Kubernetes tests now passing
dimberman Aug 5, 2020
267be53
make results more consistent
dimberman Aug 5, 2020
3e56021
python2.7 tests passing
dimberman Aug 5, 2020
a882ccc
raise two errors for failures so users have all info
dimberman Aug 6, 2020
e9189bf
nit
dimberman Aug 6, 2020
4ae2cd8
Fix issue with when client_obj is dict
kaxil Aug 6, 2020
a06e642
fixup! nit
kaxil Aug 6, 2020
4e58047
fixup! fixup! nit
kaxil Aug 6, 2020
c0bad52
fixup! fixup! fixup! nit
kaxil Aug 6, 2020
ebcae45
v1volume instead of dict
dimberman Aug 6, 2020
64de305
Fix issue where resources were not propagated
dimberman Aug 6, 2020
8bbd4a8
add other k8s features
dimberman Aug 6, 2020
933fbd4
Fixes issue with volume conversion
dimberman Aug 7, 2020
37d97f3
fixes case where init container is the volume_mount
dimberman Aug 7, 2020
c2ef17d
fix for resources
dimberman Aug 7, 2020
7563eba
fix image_pull_secret test
dimberman Aug 7, 2020
a47568b
Removed relative imports
kaxil Aug 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.executors import kubernetes_executor # noqa
137 changes: 131 additions & 6 deletions airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,18 @@
import warnings

# pylint: disable=unused-import
from airflow.kubernetes.pod import Port, Resources # noqa
from typing import List, Union

from kubernetes.client import models as k8s

from airflow.kubernetes.pod import Port, Resources # noqa
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.kubernetes.secret import Secret

from kubernetes.client.api_client import ApiClient

api_client = ApiClient()

warnings.warn(
"This module is deprecated. Please use `airflow.kubernetes.pod`.",
Expand Down Expand Up @@ -120,7 +131,7 @@ def __init__(
self.affinity = affinity or {}
self.hostnetwork = hostnetwork or False
self.tolerations = tolerations or []
self.security_context = security_context
self.security_context = security_context or {}
self.configmaps = configmaps or []
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
Expand Down Expand Up @@ -154,24 +165,26 @@ def to_v1_kubernetes_pod(self):
dns_policy=self.dnspolicy,
host_network=self.hostnetwork,
tolerations=self.tolerations,
affinity=self.affinity,
security_context=self.security_context,
)

pod = k8s.V1Pod(
spec=spec,
metadata=meta,
)
for port in self.ports:
for port in _extract_ports(self.ports):
pod = port.attach_to_pod(pod)
for volume in self.volumes:
volumes, _ = _extract_volumes_and_secrets(self.volumes, self.volume_mounts)
for volume in volumes:
pod = volume.attach_to_pod(pod)
for volume_mount in self.volume_mounts:
for volume_mount in _extract_volume_mounts(self.volume_mounts):
pod = volume_mount.attach_to_pod(pod)
for secret in self.secrets:
pod = secret.attach_to_pod(pod)
for runtime_info in self.pod_runtime_info_envs:
pod = runtime_info.attach_to_pod(pod)
pod = self.resources.attach_to_pod(pod)
pod = _extract_resources(self.resources).attach_to_pod(pod)
return pod

def as_dict(self):
Expand All @@ -182,3 +195,115 @@ def as_dict(self):
res['volumes'] = [volume.as_dict() for volume in res['volumes']]

return res


def _extract_env_vars_and_secrets(env_vars):
result = {}
env_vars = env_vars or [] # type: List[Union[k8s.V1EnvVar, dict]]
secrets = []
for env_var in env_vars:
if isinstance(env_var, k8s.V1EnvVar):
secret = _extract_env_secret(env_var)
if secret:
secrets.append(secret)
continue
env_var = api_client.sanitize_for_serialization(env_var)
result[env_var.get("name")] = env_var.get("value")
return result, secrets


def _extract_env_secret(env_var):
if env_var.value_from and env_var.value_from.secret_key_ref:
secret = env_var.value_from.secret_key_ref # type: k8s.V1SecretKeySelector
name = secret.name
key = secret.key
return Secret("env", deploy_target=env_var.name, secret=name, key=key)
return None


def _extract_ports(ports):
result = []
ports = ports or [] # type: List[Union[k8s.V1ContainerPort, dict]]
for port in ports:
if isinstance(port, k8s.V1ContainerPort):
port = api_client.sanitize_for_serialization(port)
port = Port(name=port.get("name"), container_port=port.get("containerPort"))
elif not isinstance(port, Port):
port = Port(name=port.get("name"), container_port=port.get("containerPort"))
result.append(port)
return result


def _extract_resources(resources):
if isinstance(resources, k8s.V1ResourceRequirements):
requests = resources.requests
limits = resources.limits
return Resources(
request_memory=requests.get('memory', None),
request_cpu=requests.get('cpu', None),
request_ephemeral_storage=requests.get('ephemeral-storage', None),
limit_memory=limits.get('memory', None),
limit_cpu=limits.get('cpu', None),
limit_ephemeral_storage=limits.get('ephemeral-storage', None),
limit_gpu=limits.get('nvidia.com/gpu')
)
elif isinstance(resources, Resources):
return resources


def _extract_security_context(security_context):
if isinstance(security_context, k8s.V1PodSecurityContext):
security_context = api_client.sanitize_for_serialization(security_context)
return security_context


def _extract_volume_mounts(volume_mounts):
result = []
volume_mounts = volume_mounts or [] # type: List[Union[k8s.V1VolumeMount, dict]]
for volume_mount in volume_mounts:
if isinstance(volume_mount, k8s.V1VolumeMount):
volume_mount = api_client.sanitize_for_serialization(volume_mount)
volume_mount = VolumeMount(
name=volume_mount.get("name"),
mount_path=volume_mount.get("mountPath"),
sub_path=volume_mount.get("subPath"),
read_only=volume_mount.get("readOnly")
)
elif not isinstance(volume_mount, VolumeMount):
volume_mount = VolumeMount(
name=volume_mount.get("name"),
mount_path=volume_mount.get("mountPath"),
sub_path=volume_mount.get("subPath"),
read_only=volume_mount.get("readOnly")
)

result.append(volume_mount)
return result


def _extract_volumes_and_secrets(volumes, volume_mounts):
result = []
volumes = volumes or [] # type: List[Union[k8s.V1Volume, dict]]
secrets = []
volume_mount_dict = {
volume_mount.name: volume_mount
for volume_mount in _extract_volume_mounts(volume_mounts)
}
for volume in volumes:
if isinstance(volume, k8s.V1Volume):
secret = _extract_volume_secret(volume, volume_mount_dict.get(volume.name, None))
if secret:
secrets.append(secret)
continue
volume = api_client.sanitize_for_serialization(volume)
volume = Volume(name=volume.get("name"), configs=volume)
if not isinstance(volume, Volume):
volume = Volume(name=volume.get("name"), configs=volume)
result.append(volume)
return result, secrets


def _extract_volume_secret(volume, volume_mount):
if not volume.secret:
return None
return Secret("volume", volume_mount.mount_path, volume.name, volume.secret.secret_name)
6 changes: 6 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ def run_next(self, next_job):
kube_executor_config=kube_executor_config,
worker_config=self.worker_configuration_pod
)

sanitized_pod = self.launcher._client.api_client.sanitize_for_serialization(pod)
json_pod = json.dumps(sanitized_pod, indent=2)

self.log.debug('Pod Creation Request before mutation: \n%s', json_pod)

# Reconcile the pod generated by the Operator and the Pod
# generated by the .cfg file
self.log.debug("Kubernetes running for command %s", command)
Expand Down
31 changes: 19 additions & 12 deletions airflow/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import copy

import kubernetes.client.models as k8s
from kubernetes.client import models as k8s

from airflow.kubernetes.k8s_model import K8SModel

Expand Down Expand Up @@ -87,18 +87,25 @@ def has_requests(self):
self.request_ephemeral_storage is not None

def to_k8s_client_obj(self):
return k8s.V1ResourceRequirements(
limits={
'cpu': self.limit_cpu,
'memory': self.limit_memory,
'nvidia.com/gpu': self.limit_gpu,
'ephemeral-storage': self.limit_ephemeral_storage
},
requests={
'cpu': self.request_cpu,
'memory': self.request_memory,
'ephemeral-storage': self.request_ephemeral_storage}
limits_raw = {
'cpu': self.limit_cpu,
'memory': self.limit_memory,
'nvidia.com/gpu': self.limit_gpu,
'ephemeral-storage': self.limit_ephemeral_storage
}
requests_raw = {
'cpu': self.request_cpu,
'memory': self.request_memory,
'ephemeral-storage': self.request_ephemeral_storage
}

limits = {k: v for k, v in limits_raw.items() if v}
requests = {k: v for k, v in requests_raw.items() if v}
resource_req = k8s.V1ResourceRequirements(
limits=limits,
requests=requests
)
return resource_req

def attach_to_pod(self, pod):
cp_pod = copy.deepcopy(pod)
Expand Down
76 changes: 71 additions & 5 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import kubernetes.client.models as k8s
import yaml
from kubernetes.client.api_client import ApiClient
from airflow.contrib.kubernetes.pod import _extract_volume_mounts

from airflow.exceptions import AirflowConfigException
from airflow.version import version as airflow_version
Expand Down Expand Up @@ -249,7 +250,7 @@ def __init__(
self.container.image_pull_policy = image_pull_policy
self.container.ports = ports or []
self.container.resources = resources
self.container.volume_mounts = volume_mounts or []
self.container.volume_mounts = [v.to_k8s_client_obj() for v in _extract_volume_mounts(volume_mounts)]

# Pod Spec
self.spec = k8s.V1PodSpec(containers=[])
Expand Down Expand Up @@ -370,6 +371,11 @@ def extract(cpu, memory, ephemeral_storage, limit_gpu=None):
requests=requests,
limits=limits
)
elif isinstance(resources, dict):
resources = k8s.V1ResourceRequirements(
requests=resources['requests'],
limits=resources['limits']
)

annotations = namespaced.get('annotations', {})
gcp_service_account_key = namespaced.get('gcp_service_account_key', None)
Expand Down Expand Up @@ -402,12 +408,35 @@ def reconcile_pods(base_pod, client_pod):

client_pod_cp = copy.deepcopy(client_pod)
client_pod_cp.spec = PodGenerator.reconcile_specs(base_pod.spec, client_pod_cp.spec)

client_pod_cp.metadata = merge_objects(base_pod.metadata, client_pod_cp.metadata)
client_pod_cp.metadata = PodGenerator.reconcile_metadata(base_pod.metadata, client_pod_cp.metadata)
client_pod_cp = merge_objects(base_pod, client_pod_cp)

return client_pod_cp

@staticmethod
def reconcile_metadata(base_meta, client_meta):
"""
:param base_meta: has the base attributes which are overwritten if they exist
in the client_meta and remain if they do not exist in the client_meta
:type base_meta: k8s.V1ObjectMeta
:param client_meta: the spec that the client wants to create.
:type client_meta: k8s.V1ObjectMeta
:return: the merged specs
"""
if base_meta and not client_meta:
return base_meta
if not base_meta and client_meta:
return client_meta
elif client_meta and base_meta:
client_meta.labels = merge_objects(base_meta.labels, client_meta.labels)
client_meta.annotations = merge_objects(base_meta.annotations, client_meta.annotations)
extend_object_field(base_meta, client_meta, 'managed_fields')
extend_object_field(base_meta, client_meta, 'finalizers')
extend_object_field(base_meta, client_meta, 'owner_references')
return merge_objects(base_meta, client_meta)

return None

@staticmethod
def reconcile_specs(base_spec,
client_spec):
Expand Down Expand Up @@ -580,10 +609,17 @@ def merge_objects(base_obj, client_obj):

client_obj_cp = copy.deepcopy(client_obj)

if isinstance(base_obj, dict) and isinstance(client_obj_cp, dict):
client_obj_cp.update(base_obj)
return client_obj_cp

for base_key in base_obj.to_dict().keys():
base_val = getattr(base_obj, base_key, None)
if not getattr(client_obj, base_key, None) and base_val:
setattr(client_obj_cp, base_key, base_val)
if not isinstance(client_obj_cp, dict):
setattr(client_obj_cp, base_key, base_val)
else:
client_obj_cp[base_key] = base_val
return client_obj_cp


Expand All @@ -610,6 +646,36 @@ def extend_object_field(base_obj, client_obj, field_name):
setattr(client_obj_cp, field_name, base_obj_field)
return client_obj_cp

appended_fields = base_obj_field + client_obj_field
base_obj_set = _get_dict_from_list(base_obj_field)
client_obj_set = _get_dict_from_list(client_obj_field)

appended_fields = _merge_list_of_objects(base_obj_set, client_obj_set)

setattr(client_obj_cp, field_name, appended_fields)
return client_obj_cp


def _merge_list_of_objects(base_obj_set, client_obj_set):
for k, v in base_obj_set.items():
if k not in client_obj_set:
client_obj_set[k] = v
else:
client_obj_set[k] = merge_objects(v, client_obj_set[k])
appended_field_keys = sorted(client_obj_set.keys())
appended_fields = [client_obj_set[k] for k in appended_field_keys]
return appended_fields


def _get_dict_from_list(base_list):
"""
:type base_list: list(Optional[dict, *to_dict])
"""
result = {}
for obj in base_list:
if isinstance(obj, dict):
result[obj['name']] = obj
elif hasattr(obj, "to_dict"):
result[obj.name] = obj
else:
raise AirflowConfigException("Trying to merge invalid object {}".format(obj))
return result
Loading