Skip to content

Commit

Permalink
Add termination_message_policy parameter to KubernetesPodOperator (
Browse files Browse the repository at this point in the history
…apache#32885)

* KubernetesPodOperator: add termination_message_policy option

Allow setting termination_message_policy in the Pod container

---------

Co-authored-by: eladkal <[email protected]>
Co-authored-by: Hussein Awala <[email protected]>
  • Loading branch information
3 people authored Jul 29, 2023
1 parent 4a68e83 commit bcc7856
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
5 changes: 5 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ class KubernetesPodOperator(BaseOperator):
state, or the execution is interrupted. If True (default), delete the
pod; if False, leave the pod.
Deprecated - use `on_finish_action` instead.
:param termination_message_policy: The termination message policy of the base container.
Default value is "File"
"""

# This field can be overloaded at the instance level via base_container_name
Expand Down Expand Up @@ -317,6 +319,7 @@ def __init__(
log_pod_spec_on_failure: bool = True,
on_finish_action: str = "delete_pod",
is_delete_operator_pod: None | bool = None,
termination_message_policy: str = "File",
**kwargs,
) -> None:
# TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
Expand Down Expand Up @@ -415,6 +418,7 @@ def __init__(
else:
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
self.termination_message_policy = termination_message_policy

self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict

Expand Down Expand Up @@ -838,6 +842,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
env=self.env_vars,
env_from=self.env_from,
security_context=self.container_security_context,
termination_message_policy=self.termination_message_policy,
)
],
image_pull_secrets=self.image_pull_secrets,
Expand Down
2 changes: 2 additions & 0 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def setup_tests(self, test_label):
"envFrom": [],
"name": "base",
"ports": [],
"terminationMessagePolicy": "File",
"volumeMounts": [],
}
],
Expand Down Expand Up @@ -957,6 +958,7 @@ def test_pod_template_file(
"name": "base",
"ports": [],
"resources": {"limits": {"memory": "200Mi"}, "requests": {"memory": "100Mi"}},
"terminationMessagePolicy": "File",
"volumeMounts": [{"mountPath": "/airflow/xcom", "name": "xcom"}],
},
{
Expand Down
17 changes: 17 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,23 @@ def test_image_pull_policy_correctly_set(self):
pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[0].image_pull_policy == "Always"

def test_termination_message_policy_correctly_set(self):
k = KubernetesPodOperator(
task_id="task",
termination_message_policy="FallbackToLogsOnError",
)

pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[0].termination_message_policy == "FallbackToLogsOnError"

def test_termination_message_policy_default_value_correctly_set(self):
k = KubernetesPodOperator(
task_id="task",
)

pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[0].termination_message_policy == "File"

@pytest.mark.parametrize(
"task_kwargs, should_be_deleted",
[
Expand Down

0 comments on commit bcc7856

Please sign in to comment.