From d2f55a440b5a8f9ce2867f8338830efe8cc04a54 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sat, 22 Aug 2020 17:52:41 +0100 Subject: [PATCH] Fix broken Kubernetes PodRuntimeInfoEnv (#10478) closes https://github.com/apache/airflow/issues/10456 (cherry picked from commit 47c6657ce012f6db147fdcce3ca5e77f46a9e491) --- airflow/kubernetes/pod_runtime_info_env.py | 2 +- .../test_kubernetes_pod_operator.py | 38 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/airflow/kubernetes/pod_runtime_info_env.py b/airflow/kubernetes/pod_runtime_info_env.py index 72e215164229a..95fbe6bfe1865 100644 --- a/airflow/kubernetes/pod_runtime_info_env.py +++ b/airflow/kubernetes/pod_runtime_info_env.py @@ -47,7 +47,7 @@ def to_k8s_client_obj(self): name=self.name, value_from=k8s.V1EnvVarSource( field_ref=k8s.V1ObjectFieldSelector( - self.field_path + field_path=self.field_path ) ) ) diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 89572f9441e46..a5c973159e0d1 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -32,6 +32,7 @@ from airflow.kubernetes.pod import Port from airflow.kubernetes.pod_generator import PodDefaults from airflow.kubernetes.pod_launcher import PodLauncher +from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv from airflow.kubernetes.secret import Secret from airflow.kubernetes.volume import Volume from airflow.kubernetes.volume_mount import VolumeMount @@ -669,7 +670,7 @@ def test_faulty_service_account(self): def test_pod_failure(self): """ - Tests that the task fails when a pod reports a failure + Tests that the task fails when a pod reports a failure """ bad_internal_command = ["foobar 10 "] k = KubernetesPodOperator( @@ -780,6 +781,41 @@ def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock): ))] ) + def test_env_vars(self): + # WHEN + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + env_vars={"ENV1": "val1", "ENV2": "val2", }, + pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + ) + + context = create_context(k) + k.execute(context) + + # THEN + actual_pod = self.api_client.sanitize_for_serialization(k.pod) + self.expected_pod['spec']['containers'][0]['env'] = [ + {'name': 'ENV1', 'value': 'val1'}, + {'name': 'ENV2', 'value': 'val2'}, + { + 'name': 'ENV3', + 'valueFrom': { + 'fieldRef': { + 'fieldPath': 'status.podIP' + } + } + } + ] + self.assertEqual(self.expected_pod, actual_pod) + def test_init_container(self): # GIVEN volume_mounts = [k8s.V1VolumeMount(