Skip to content

Commit

Permalink
Fix broken Kubernetes PodRuntimeInfoEnv (#10478)
Browse files Browse the repository at this point in the history
closes apache/airflow#10456

GitOrigin-RevId: 47c6657ce012f6db147fdcce3ca5e77f46a9e491
  • Loading branch information
kaxil authored and Cloud Composer Team committed Mar 9, 2022
1 parent 3e8dd06 commit 5fa7629
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
2 changes: 1 addition & 1 deletion airflow/kubernetes/pod_runtime_info_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def to_k8s_client_obj(self) -> k8s.V1EnvVar:
name=self.name,
value_from=k8s.V1EnvVarSource(
field_ref=k8s.V1ObjectFieldSelector(
self.field_path
field_path=self.field_path
)
)
)
Expand Down
38 changes: 37 additions & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.kubernetes.pod import Port
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
Expand Down Expand Up @@ -540,7 +541,7 @@ def test_faulty_service_account(self):

def test_pod_failure(self):
"""
Tests that the task fails when a pod reports a failure
Tests that the task fails when a pod reports a failure
"""
bad_internal_command = ["foobar 10 "]
k = KubernetesPodOperator(
Expand Down Expand Up @@ -651,6 +652,41 @@ def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock):
))]
)

def test_env_vars(self):
# WHEN
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars={"ENV1": "val1", "ENV2": "val2", },
pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
labels={"foo": "bar"},
name="test",
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
)

context = create_context(k)
k.execute(context)

# THEN
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['env'] = [
{'name': 'ENV1', 'value': 'val1'},
{'name': 'ENV2', 'value': 'val2'},
{
'name': 'ENV3',
'valueFrom': {
'fieldRef': {
'fieldPath': 'status.podIP'
}
}
}
]
self.assertEqual(self.expected_pod, actual_pod)

def test_init_container(self):
# GIVEN
volume_mounts = [k8s.V1VolumeMount(
Expand Down

0 comments on commit 5fa7629

Please sign in to comment.