From 4f8e08dfa9846de0f5d599bd4661ed0c91bf556c Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sat, 22 Aug 2020 16:47:47 +0100 Subject: [PATCH] Fix broken Kubernetes PodRuntimeInfoEnv closes https://github.com/apache/airflow/issues/10456 --- airflow/kubernetes/pod_runtime_info_env.py | 2 +- .../test_kubernetes_pod_operator.py | 38 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/airflow/kubernetes/pod_runtime_info_env.py b/airflow/kubernetes/pod_runtime_info_env.py index f390450ec16cc1..7477d0f4526f75 100644 --- a/airflow/kubernetes/pod_runtime_info_env.py +++ b/airflow/kubernetes/pod_runtime_info_env.py @@ -48,7 +48,7 @@ def to_k8s_client_obj(self) -> k8s.V1EnvVar: name=self.name, value_from=k8s.V1EnvVarSource( field_ref=k8s.V1ObjectFieldSelector( - self.field_path + field_path=self.field_path ) ) ) diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index d2946cc5559273..59f881235ea880 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -34,6 +34,7 @@ from airflow.kubernetes.pod import Port from airflow.kubernetes.pod_generator import PodDefaults from airflow.kubernetes.pod_launcher import PodLauncher +from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv from airflow.kubernetes.secret import Secret from airflow.kubernetes.volume import Volume from airflow.kubernetes.volume_mount import VolumeMount @@ -540,7 +541,7 @@ def test_faulty_service_account(self): def test_pod_failure(self): """ - Tests that the task fails when a pod reports a failure + Tests that the task fails when a pod reports a failure """ bad_internal_command = ["foobar 10 "] k = KubernetesPodOperator( @@ -651,6 +652,41 @@ def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock): ))] ) + def test_env_vars(self, mock_client, monitor_mock, start_mock): + # WHEN + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + env_vars={"ENV1": "val1", "ENV2": "val2", }, + pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")], + labels={"foo": "bar"}, + name="test", + task_id="task" + self.get_current_task_name(), + in_cluster=False, + do_xcom_push=False, + ) + + context = create_context(k) + k.execute(context) + + # THEN + actual_pod = self.api_client.sanitize_for_serialization(k.pod) + self.expected_pod['spec']['containers'][0]['env'] = [ + {'name': 'ENV1', 'value': 'val1'}, + {'name': 'ENV2', 'value': 'val2'}, + { + 'name': 'ENV3', + 'valueFrom': { + 'fieldRef': { + 'fieldPath': 'status.podIP' + } + } + } + ] + self.assertEqual(self.expected_pod, actual_pod) + def test_init_container(self): # GIVEN volume_mounts = [k8s.V1VolumeMount(