Skip to content

Commit

Permalink
Fixes PodMutationHook for backwards compatibility (#9903)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Imberman <[email protected]>
Co-authored-by: Kaxil Naik <[email protected]>
  • Loading branch information
3 people committed Aug 11, 2020
1 parent 05ec21a commit bcd02dd
Show file tree
Hide file tree
Showing 11 changed files with 619 additions and 15 deletions.
16 changes: 16 additions & 0 deletions airflow/kubernetes/k8s_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@


class K8SModel(ABC):

"""
These Airflow Kubernetes models are here for backwards compatibility
reasons only. Ideally clients should use the kubernetes api
Expand All @@ -39,6 +40,7 @@ class K8SModel(ABC):
can be avoided. All of these models implement the
`attach_to_pod` method so that they integrate with the kubernetes client.
"""

@abc.abstractmethod
def attach_to_pod(self, pod):
"""
Expand All @@ -47,9 +49,23 @@ def attach_to_pod(self, pod):
:return: The pod with the object attached
"""

def as_dict(self):
res = {}
if hasattr(self, "__slots__"):
for s in self.__slots__:
if hasattr(self, s):
res[s] = getattr(self, s)
if hasattr(self, "__dict__"):
res_dict = self.__dict__.copy()
res_dict.update(res)
return res_dict
return res


def append_to_pod(pod, k8s_objects):
"""
Attach Kubernetes objects to the given POD
:param pod: A pod to attach a list of Kubernetes objects to
:type pod: kubernetes.client.models.V1Pod
:param k8s_objects: a potential None list of K8SModels
Expand Down
33 changes: 21 additions & 12 deletions airflow/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@


class Resources(K8SModel):
__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')
__slots__ = ('request_memory',
'request_cpu',
'limit_memory',
'limit_cpu',
'limit_gpu',
'request_ephemeral_storage',
'limit_ephemeral_storage')

"""
:param request_memory: requested memory
Expand All @@ -44,15 +50,17 @@ class Resources(K8SModel):
:param limit_ephemeral_storage: Limit for ephemeral storage
:type limit_ephemeral_storage: float | str
"""

def __init__(
self,
request_memory=None,
request_cpu=None,
request_ephemeral_storage=None,
limit_memory=None,
limit_cpu=None,
limit_gpu=None,
limit_ephemeral_storage=None):
self,
request_memory=None,
request_cpu=None,
request_ephemeral_storage=None,
limit_memory=None,
limit_cpu=None,
limit_gpu=None,
limit_ephemeral_storage=None
):
self.request_memory = request_memory
self.request_cpu = request_cpu
self.request_ephemeral_storage = request_ephemeral_storage
Expand Down Expand Up @@ -104,9 +112,10 @@ class Port(K8SModel):
__slots__ = ('name', 'container_port')

def __init__(
self,
name=None,
container_port=None):
self,
name=None,
container_port=None
):
"""Creates port"""
self.name = name
self.container_port = container_port
Expand Down
26 changes: 23 additions & 3 deletions airflow/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
from requests.exceptions import BaseHTTPError

from airflow import AirflowException
from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.settings import pod_mutation_hook
from airflow import settings
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
import kubernetes.client.models as k8s # noqa
from .kube_client import get_kube_client


Expand Down Expand Up @@ -62,8 +64,12 @@ def __init__(self,
self.extract_xcom = extract_xcom

def run_pod_async(self, pod, **kwargs):
"""Runs POD asynchronously"""
pod_mutation_hook(pod)
"""Runs POD asynchronously
:param pod: Pod to run
:type pod: k8s.V1Pod
"""
pod = self._mutate_pod_backcompat(pod)

sanitized_pod = self._client.api_client.sanitize_for_serialization(pod)
json_pod = json.dumps(sanitized_pod, indent=2)
Expand All @@ -79,6 +85,20 @@ def run_pod_async(self, pod, **kwargs):
raise e
return resp

@staticmethod
def _mutate_pod_backcompat(pod):
"""Backwards compatible Pod Mutation Hook"""
try:
settings.pod_mutation_hook(pod)
# attempts to run pod_mutation_hook using k8s.V1Pod, if this
# fails we attempt to run by converting pod to Old Pod
except AttributeError:
dummy_pod = convert_to_airflow_pod(pod)
settings.pod_mutation_hook(dummy_pod)
dummy_pod = dummy_pod.to_v1_kubernetes_pod()
return dummy_pod
return pod

def delete_pod(self, pod):
"""Deletes POD"""
try:
Expand Down
96 changes: 96 additions & 0 deletions airflow/kubernetes/pod_launcher_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List, Union

import kubernetes.client.models as k8s # noqa

from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.kubernetes.pod import Port
from airflow.kubernetes_deprecated.pod import Pod


def convert_to_airflow_pod(pod):
base_container = pod.spec.containers[0] # type: k8s.V1Container

dummy_pod = Pod(
image=base_container.image,
envs=_extract_env_vars(base_container.env),
volumes=_extract_volumes(pod.spec.volumes),
volume_mounts=_extract_volume_mounts(base_container.volume_mounts),
labels=pod.metadata.labels,
name=pod.metadata.name,
namespace=pod.metadata.namespace,
image_pull_policy=base_container.image_pull_policy or 'IfNotPresent',
cmds=[],
ports=_extract_ports(base_container.ports)
)
return dummy_pod


def _extract_env_vars(env_vars):
"""
:param env_vars:
:type env_vars: list
:return: result
:rtype: dict
"""
result = {}
env_vars = env_vars or [] # type: List[Union[k8s.V1EnvVar, dict]]
for env_var in env_vars:
if isinstance(env_var, k8s.V1EnvVar):
env_var.to_dict()
result[env_var.get("name")] = env_var.get("value")
return result


def _extract_volumes(volumes):
result = []
volumes = volumes or [] # type: List[Union[k8s.V1Volume, dict]]
for volume in volumes:
if isinstance(volume, k8s.V1Volume):
volume = volume.to_dict()
result.append(Volume(name=volume.get("name"), configs=volume))
return result


def _extract_volume_mounts(volume_mounts):
result = []
volume_mounts = volume_mounts or [] # type: List[Union[k8s.V1VolumeMount, dict]]
for volume_mount in volume_mounts:
if isinstance(volume_mount, k8s.V1VolumeMount):
volume_mount = volume_mount.to_dict()
result.append(
VolumeMount(
name=volume_mount.get("name"),
mount_path=volume_mount.get("mount_path"),
sub_path=volume_mount.get("sub_path"),
read_only=volume_mount.get("read_only"))
)

return result


def _extract_ports(ports):
result = []
ports = ports or [] # type: List[Union[k8s.V1ContainerPort, dict]]
for port in ports:
if isinstance(port, k8s.V1ContainerPort):
port = port.to_dict()
result.append(Port(name=port.get("name"), container_port=port.get("container_port")))
return result
1 change: 1 addition & 0 deletions airflow/kubernetes/volume_mount.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@


class VolumeMount(K8SModel):
__slots__ = ('name', 'mount_path', 'sub_path', 'read_only')
"""
Initialize a Kubernetes Volume Mount. Used to mount pod level volumes to
running container.
Expand Down
16 changes: 16 additions & 0 deletions airflow/kubernetes_deprecated/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit bcd02dd

Please sign in to comment.