From 8171cb21edc5fdf2fe3daf59a8dd0fd9641f3ed0 Mon Sep 17 00:00:00 2001 From: david <14880945+ddelange@users.noreply.github.com> Date: Fri, 8 Nov 2019 14:19:20 +0100 Subject: [PATCH] [AIRFLOW-5873] KubernetesPodOperator fixes and test - `security_context` was missing from docs of `KubernetesPodOperator` - `KubernetesPodOperator` kwarg `in_cluster` erroneously defaults to False in comparison to `default_args.py`, also default `do_xcom_push` was overwritten to False in contradiction to `BaseOperator` - `KubernetesPodOperator` kwarg `resources` is erroneously passed to `base_operator`, instead should only go to `PodGenerator`. The two have different syntax. (both on `master` and `v1-10-test` branches) - `kubernetes/pod.py`: classes do not have `__slots__` so they would accept arbitrary values in `setattr` - Reduce amount of times the pod object is copied before execution --- .../operators/kubernetes_pod_operator.py | 105 ++++++++++-------- airflow/kubernetes/pod.py | 4 + airflow/kubernetes/pod_generator.py | 1 - airflow/utils/helpers.py | 2 +- scripts/ci/_utils.sh | 4 +- .../test_kubernetes_pod_operator.py | 103 ++++++++++++----- 6 files changed, 146 insertions(+), 73 deletions(-) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index b545f073f78cb8..c1c49e1a54bffd 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -15,11 +15,15 @@ # specific language governing permissions and limitations # under the License. """Executes task in a Kubernetes POD""" +import re + from airflow.exceptions import AirflowException from airflow.kubernetes import kube_client, pod_generator, pod_launcher from airflow.kubernetes.k8s_model import append_to_pod +from airflow.kubernetes.pod import Resources from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults +from airflow.utils.helpers import validate_key from airflow.utils.state import State @@ -32,59 +36,62 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- :class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which simplifies the authorization process. - :param image: Docker image you wish to launch. Defaults to dockerhub.io, - but fully qualified URLS will point to custom repositories + :param image: Docker image you wish to launch. Defaults to hub.docker.com, + but fully qualified URLS will point to custom repositories. :type image: str - :param namespace: the namespace to run within kubernetes + :param namespace: the namespace to run within kubernetes. :type namespace: str :param cmds: entrypoint of the container. (templated) - The docker images's entrypoint is used if this is not provide. + The docker images's entrypoint is used if this is not provided. :type cmds: list[str] :param arguments: arguments of the entrypoint. (templated) The docker image's CMD is used if this is not provided. :type arguments: list[str] - :param image_pull_policy: Specify a policy to cache or always pull an image + :param image_pull_policy: Specify a policy to cache or always pull an image. :type image_pull_policy: str :param image_pull_secrets: Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b :type image_pull_secrets: str - :param ports: ports for launched pod - :type ports: list[airflow.kubernetes.models.port.Port] - :param volume_mounts: volumeMounts for launched pod - :type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount] - :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes - :type volumes: list[airflow.kubernetes.models.volume.Volume] - :param labels: labels to apply to the Pod + :param ports: ports for launched pod. + :type ports: list[airflow.kubernetes.pod.Port] + :param volume_mounts: volumeMounts for launched pod. + :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount] + :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes. + :type volumes: list[airflow.kubernetes.volume.Volume] + :param labels: labels to apply to the Pod. :type labels: dict - :param startup_timeout_seconds: timeout in seconds to startup the pod + :param startup_timeout_seconds: timeout in seconds to startup the pod. :type startup_timeout_seconds: int - :param name: name of the task you want to run, - will be used to generate a pod id + :param name: name of the pod in which the task will run, will be used to + generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]). :type name: str :param env_vars: Environment variables initialized in the container. (templated) :type env_vars: dict - :param secrets: Kubernetes secrets to inject in the container, + :param secrets: Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume. - :type secrets: list[airflow.kubernetes.models.secret.Secret] - :param in_cluster: run kubernetes client with in_cluster configuration + :type secrets: list[airflow.kubernetes.secret.Secret] + :param in_cluster: run kubernetes client with in_cluster configuration. :type in_cluster: bool :param cluster_context: context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. :type cluster_context: str - :param get_logs: get the stdout of the container as logs of the tasks + :param get_logs: get the stdout of the container as logs of the tasks. :type get_logs: bool :param annotations: non-identifying metadata you can attach to the Pod. Can be a large range of data, and can include characters that are not permitted by labels. :type annotations: dict - :param resources: A dict containing a group of resources requests and limits + :param resources: A dict containing resources requests and limits. + Possible keys are request_memory, request_cpu, limit_memory, limit_cpu, + and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources. + See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container :type resources: dict - :param affinity: A dict containing a group of affinity scheduling rules + :param affinity: A dict containing a group of affinity scheduling rules. :type affinity: dict - :param node_selectors: A dict containing a group of scheduling rules + :param node_selectors: A dict containing a group of scheduling rules. :type node_selectors: dict - :param config_file: The path to the Kubernetes config file. + :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` :type config_file: str :param do_xcom_push: If True, the content of the file @@ -95,17 +102,19 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- state, or the execution is interrupted. If False (default): do nothing, If True: delete the pod :type is_delete_operator_pod: bool - :param hostnetwork: If True enable host networking on the pod + :param hostnetwork: If True enable host networking on the pod. :type hostnetwork: bool - :param tolerations: A list of kubernetes tolerations + :param tolerations: A list of kubernetes tolerations. :type tolerations: list tolerations :param configmaps: A list of configmap names objects that we - want mount as env variables + want mount as env variables. :type configmaps: list[str] :param pod_runtime_info_envs: environment variables about - pod runtime information (ip, namespace, nodeName, podName) - :type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv] - :param dnspolicy: Specify a dnspolicy for the pod + pod runtime information (ip, namespace, nodeName, podName). + :type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv] + :param security_context: security options the pod should run with (PodSecurityContext). + :type security_context: dict + :param dnspolicy: dnspolicy for the pod. :type dnspolicy: str :param full_pod_spec: The complete podSpec :type full_pod_spec: kubernetes.client.models.V1Pod @@ -138,15 +147,18 @@ def execute(self, context): configmaps=self.configmaps, security_context=self.security_context, dnspolicy=self.dnspolicy, - resources=self.resources, pod=self.full_pod_spec, ).gen_pod() - pod = append_to_pod(pod, self.ports) - pod = append_to_pod(pod, self.pod_runtime_info_envs) - pod = append_to_pod(pod, self.volumes) - pod = append_to_pod(pod, self.volume_mounts) - pod = append_to_pod(pod, self.secrets) + pod = append_to_pod( + pod, + self.pod_runtime_info_envs + + self.ports + + self.resources + + self.secrets + + self.volumes + + self.volume_mounts + ) self.pod = pod @@ -171,6 +183,13 @@ def execute(self, context): except AirflowException as ex: raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) + def _set_resources(self, resources): + return [Resources(**resources) if resources else Resources()] + + def _set_name(self, name): + validate_key(name, max_length=63) + return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) + @apply_defaults def __init__(self, # pylint: disable=too-many-arguments,too-many-locals namespace, @@ -183,7 +202,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals volumes=None, env_vars=None, secrets=None, - in_cluster=False, + in_cluster=True, cluster_context=None, labels=None, startup_timeout_seconds=120, @@ -193,10 +212,9 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals resources=None, affinity=None, config_file=None, - do_xcom_push=False, node_selectors=None, image_pull_secrets=None, - service_account_name="default", + service_account_name='default', is_delete_operator_pod=False, hostnetwork=False, tolerations=None, @@ -207,7 +225,9 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals full_pod_spec=None, *args, **kwargs): - super().__init__(*args, **kwargs) + if kwargs.get('xcom_push') is not None: + raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") + super().__init__(*args, resources=None, **kwargs) self.pod = None @@ -217,7 +237,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals self.arguments = arguments or [] self.labels = labels or {} self.startup_timeout_seconds = startup_timeout_seconds - self.name = name + self.name = self._set_name(name) self.env_vars = env_vars or {} self.ports = ports or [] self.volume_mounts = volume_mounts or [] @@ -230,10 +250,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals self.node_selectors = node_selectors or {} self.annotations = annotations or {} self.affinity = affinity or {} - self.do_xcom_push = do_xcom_push - if kwargs.get('xcom_push') is not None: - raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") - self.resources = resources + self.resources = self._set_resources(resources) self.config_file = config_file self.image_pull_secrets = image_pull_secrets self.service_account_name = service_account_name diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index 2568e635218a92..9406a05de94c7d 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -40,6 +40,8 @@ class Resources(K8SModel): :param limit_gpu: Limits for GPU used :type limit_gpu: int """ + __slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu') + def __init__( self, request_memory=None, @@ -82,6 +84,8 @@ def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: class Port(K8SModel): """POD port""" + __slots__ = ('name', 'container_port') + def __init__( self, name=None, diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 7aa4a59c1304d5..07414901eaa105 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -238,7 +238,6 @@ def from_obj(obj) -> k8s.V1Pod: requests = { 'cpu': namespaced.get('request_cpu'), 'memory': namespaced.get('request_memory') - } limits = { 'cpu': namespaced.get('limit_cpu'), diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 5f034bcf168984..a41ac9d6751bea 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -44,7 +44,7 @@ 'core', 'KILLED_TASK_CLEANUP_TIME' ) -KEY_REGEX = re.compile(r'^[\w\-\.]+$') +KEY_REGEX = re.compile(r'^[\w.-]+$') def validate_key(k, max_length=250): diff --git a/scripts/ci/_utils.sh b/scripts/ci/_utils.sh index c2e6596a34e9ef..71d3ab52d30ae8 100644 --- a/scripts/ci/_utils.sh +++ b/scripts/ci/_utils.sh @@ -863,7 +863,9 @@ function build_image_on_ci() { echo "Finding changed file names ${TRAVIS_BRANCH}...HEAD" echo - CHANGED_FILE_NAMES=$(git diff --name-only "${TRAVIS_BRANCH}...HEAD") + git config remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*" + git fetch origin "${TRAVIS_BRANCH}" + CHANGED_FILE_NAMES=$(git diff --name-only "remotes/origin/${TRAVIS_BRANCH}...HEAD") echo echo "Changed file names in this commit" echo "${CHANGED_FILE_NAMES}" diff --git a/tests/integration/kubernetes/test_kubernetes_pod_operator.py b/tests/integration/kubernetes/test_kubernetes_pod_operator.py index 26670e55a6b872..3a55f6a176fd47 100644 --- a/tests/integration/kubernetes/test_kubernetes_pod_operator.py +++ b/tests/integration/kubernetes/test_kubernetes_pod_operator.py @@ -74,6 +74,11 @@ def setUp(self): 'envFrom': [], 'name': 'base', 'ports': [], + 'resources': {'limits': {'cpu': None, + 'memory': None, + 'nvidia.com/gpu': None}, + 'requests': {'cpu': None, + 'memory': None}}, 'volumeMounts': [], }], 'hostNetwork': False, @@ -100,7 +105,9 @@ def test_config_path_move(self): labels={"foo": "bar"}, name="test", task_id="task", - config_file=new_config_path + in_cluster=False, + do_xcom_push=False, + config_file=new_config_path, ) k.execute(None) actual_pod = self.api_client.sanitize_for_serialization(k.pod) @@ -120,16 +127,17 @@ def test_config_path(self, client_mock, launcher_mock): labels={"foo": "bar"}, name="test", task_id="task", - config_file=file_path, in_cluster=False, - cluster_context='default' + do_xcom_push=False, + config_file=file_path, + cluster_context='default', ) launcher_mock.return_value = (State.SUCCESS, None) k.execute(None) client_mock.assert_called_once_with( in_cluster=False, cluster_context='default', - config_file=file_path + config_file=file_path, ) @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod") @@ -146,9 +154,10 @@ def test_image_pull_secrets_correctly_set(self, mock_client, launcher_mock): labels={"foo": "bar"}, name="test", task_id="task", - image_pull_secrets=fake_pull_secrets, in_cluster=False, - cluster_context='default' + do_xcom_push=False, + image_pull_secrets=fake_pull_secrets, + cluster_context='default', ) launcher_mock.return_value = (State.SUCCESS, None) k.execute(None) @@ -170,8 +179,9 @@ def test_pod_delete_even_on_launcher_error(self, mock_client, delete_pod_mock, r name="test", task_id="task", in_cluster=False, + do_xcom_push=False, cluster_context='default', - is_delete_operator_pod=True + is_delete_operator_pod=True, ) run_pod_mock.side_effect = AirflowException('fake failure') with self.assertRaises(AirflowException): @@ -186,7 +196,9 @@ def test_working_pod(self): arguments=["echo 10"], labels={"foo": "bar"}, name="test", - task_id="task" + task_id="task", + in_cluster=False, + do_xcom_push=False, ) k.execute(None) actual_pod = self.api_client.sanitize_for_serialization(k.pod) @@ -201,7 +213,9 @@ def test_delete_operator_pod(self): labels={"foo": "bar"}, name="test", task_id="task", - is_delete_operator_pod=True + in_cluster=False, + do_xcom_push=False, + is_delete_operator_pod=True, ) k.execute(None) actual_pod = self.api_client.sanitize_for_serialization(k.pod) @@ -216,7 +230,9 @@ def test_pod_hostnetwork(self): labels={"foo": "bar"}, name="test", task_id="task", - hostnetwork=True + in_cluster=False, + do_xcom_push=False, + hostnetwork=True, ) k.execute(None) actual_pod = self.api_client.sanitize_for_serialization(k.pod) @@ -233,6 +249,8 @@ def test_pod_dnspolicy(self): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, hostnetwork=True, dnspolicy=dns_policy ) @@ -254,6 +272,8 @@ def test_pod_node_selectors(self): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, node_selectors=node_selectors, ) k.execute(None) @@ -263,14 +283,10 @@ def test_pod_node_selectors(self): def test_pod_resources(self): resources = { - 'limits': { - 'cpu': '250m', - 'memory': '64Mi', - }, - 'requests': { - 'cpu': '250m', - 'memory': '64Mi', - } + 'limit_cpu': 0.25, + 'limit_memory': '64Mi', + 'request_cpu': '250m', + 'request_memory': '64Mi', } k = KubernetesPodOperator( namespace='default', @@ -280,11 +296,23 @@ def test_pod_resources(self): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, resources=resources, ) k.execute(None) actual_pod = self.api_client.sanitize_for_serialization(k.pod) - self.expected_pod['spec']['containers'][0]['resources'] = resources + self.expected_pod['spec']['containers'][0]['resources'] = { + 'requests': { + 'memory': '64Mi', + 'cpu': '250m' + }, + 'limits': { + 'memory': '64Mi', + 'cpu': 0.25, + 'nvidia.com/gpu': None + } + } self.assertEqual(self.expected_pod, actual_pod) def test_pod_affinity(self): @@ -313,6 +341,8 @@ def test_pod_affinity(self): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, affinity=affinity, ) k.execute(None) @@ -331,7 +361,9 @@ def test_port(self): labels={"foo": "bar"}, name="test", task_id="task", - ports=[port] + in_cluster=False, + do_xcom_push=False, + ports=[port], ) k.execute(None) actual_pod = self.api_client.sanitize_for_serialization(k.pod) @@ -365,7 +397,9 @@ def test_volume_mount(self): volume_mounts=[volume_mount], volumes=[volume], name="test", - task_id="task" + task_id="task", + in_cluster=False, + do_xcom_push=False, ) k.execute(None) mock_logger.info.assert_any_call(b"retrieved from mount\n") @@ -398,6 +432,8 @@ def test_run_as_user_root(self): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, security_context=security_context, ) k.execute(None) @@ -420,6 +456,8 @@ def test_run_as_user_non_root(self): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, security_context=security_context, ) k.execute(None) @@ -442,6 +480,8 @@ def test_fs_group(self): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, security_context=security_context, ) k.execute(None) @@ -459,7 +499,9 @@ def test_faulty_image(self): labels={"foo": "bar"}, name="test", task_id="task", - startup_timeout_seconds=5 + in_cluster=False, + do_xcom_push=False, + startup_timeout_seconds=5, ) with self.assertRaises(AirflowException): k.execute(None) @@ -477,8 +519,10 @@ def test_faulty_service_account(self): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, startup_timeout_seconds=5, - service_account_name=bad_service_account_name + service_account_name=bad_service_account_name, ) with self.assertRaises(ApiException): k.execute(None) @@ -498,7 +542,9 @@ def test_pod_failure(self): arguments=bad_internal_command, labels={"foo": "bar"}, name="test", - task_id="task" + task_id="task", + in_cluster=False, + do_xcom_push=False, ) with self.assertRaises(AirflowException): k.execute(None) @@ -517,7 +563,8 @@ def test_xcom_push(self): labels={"foo": "bar"}, name="test", task_id="task", - do_xcom_push=True + in_cluster=False, + do_xcom_push=True, ) self.assertEqual(k.execute(None), json.loads(return_value)) actual_pod = self.api_client.sanitize_for_serialization(k.pod) @@ -546,7 +593,9 @@ def test_envs_from_configmaps(self, mock_client, mock_launcher): labels={"foo": "bar"}, name="test", task_id="task", - configmaps=[configmap] + in_cluster=False, + do_xcom_push=False, + configmaps=[configmap], ) # THEN mock_launcher.return_value = (State.SUCCESS, None) @@ -575,6 +624,8 @@ def test_envs_from_secrets(self, mock_client, launcher_mock): labels={"foo": "bar"}, name="test", task_id="task", + in_cluster=False, + do_xcom_push=False, ) # THEN launcher_mock.return_value = (State.SUCCESS, None)