Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only patch single label when adopting pod #28776

Merged
merged 3 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,22 +778,24 @@ def adopt_launched_task(
assert self.scheduler_job_id

self.log.info("attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels["airflow-worker"] = pod_generator.make_safe_label_value(self.scheduler_job_id)
pod_id = annotations_to_key(pod.metadata.annotations)
if pod_id not in pod_ids:
self.log.error("attempting to adopt taskinstance which was not specified by database: %s", pod_id)
return

new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=PodGenerator.serialize_pod(pod),
body={"metadata": {"labels": {"airflow-worker": new_worker_id_label}}},
)
pod_ids.pop(pod_id)
self.running.add(pod_id)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
return

pod_ids.pop(pod_id)
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
self.running.add(pod_id)

def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
"""
Expand All @@ -812,12 +814,11 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.log.info("Attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels["airflow-worker"] = new_worker_id_label
try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=PodGenerator.serialize_pod(pod),
body={"metadata": {"labels": {"airflow-worker": new_worker_id_label}}},
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
Expand Down
44 changes: 33 additions & 11 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,20 +718,42 @@ def test_adopt_launched_task(self, mock_kube_client):
pod_ids = {ti_key: {}}

executor.adopt_launched_task(mock_kube_client, pod=pod, pod_ids=pod_ids)
assert mock_kube_client.patch_namespaced_pod.call_args[1] == {
"body": {
"metadata": {
"labels": {"airflow-worker": "modified"},
"annotations": annotations,
"name": "foo",
}
},
"name": "foo",
"namespace": None,
}
mock_kube_client.patch_namespaced_pod.assert_called_once_with(
body={"metadata": {"labels": {"airflow-worker": "modified"}}},
name="foo",
namespace=None,
)
assert pod_ids == {}
assert executor.running == {ti_key}

@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_adopt_completed_pods(self, mock_kube_client):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could add docstring saying what this is testing

Copy link
Member

@potiuk potiuk Jan 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am rather opting for very long and very descriptive test method names:

test_adopted_pod_only_update_label_and_nothing_else

(I am not joking).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is we'd end up with test_adopt_completed_pods_only_those_from_other_schedulers_and_only_patch_label. The one for adopt_launched_task would be even worse. Let me come up with something.

executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
executor.kube_client = mock_kube_client
executor.kube_config.kube_namespace = "somens"
pod = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name="foo",
labels={"airflow-worker": "bar"},
annotations={"some_annotation": "hello"},
namespace="somens",
)
)
mock_kube_client.list_namespaced_pod.return_value.items = [pod]

executor._adopt_completed_pods(mock_kube_client)
mock_kube_client.list_namespaced_pod.assert_called_once_with(
namespace="somens",
field_selector="status.phase=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker!=modified",
)
mock_kube_client.patch_namespaced_pod.assert_called_once_with(
body={"metadata": {"labels": {"airflow-worker": "modified"}}},
name="foo",
namespace="somens",
)

@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
"""
Expand Down