From 59afddd2f27aa0529c602da825e1ee429fe3934b Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Mon, 15 Mar 2021 15:16:49 -0600 Subject: [PATCH] Fix KubernetesExecutor issue with deleted pending pods (#14810) This change treats a pending KubernetesExecutor worker pod deletion as a failure. This allows them to follow the configured retry rules for the task as one would expect. (cherry picked from commit a639dd364865da7367f342d5721a5f46a7188a29) --- airflow/executors/kubernetes_executor.py | 6 +- tests/executors/test_kubernetes_executor.py | 92 +++++++++++++++++++++ 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index fd5c6faca22781..c42531a83c2ea9 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -194,10 +194,8 @@ def process_status( """Process status response""" if status == 'Pending': if event['type'] == 'DELETED': - self.log.info('Event: Failed to start pod %s, will reschedule', pod_id) - self.watcher_queue.put( - (pod_id, namespace, State.UP_FOR_RESCHEDULE, annotations, resource_version) - ) + self.log.info('Event: Failed to start pod %s', pod_id) + self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version)) else: self.log.info('Event: %s Pending', pod_id) elif status == 'Failed': diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index dc7cbbbf7b5328..68b000660f7781 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -34,6 +34,7 @@ from airflow.executors.kubernetes_executor import ( AirflowKubernetesScheduler, KubernetesExecutor, + KubernetesJobWatcher, create_pod_id, get_base_pod_from_template, ) @@ -328,3 +329,94 @@ def test_not_adopt_unassigned_task(self, mock_kube_client): executor.adopt_launched_task(mock_kube_client, pod=pod, pod_ids=pod_ids) assert not mock_kube_client.patch_namespaced_pod.called assert pod_ids == {"foobar": {}} + + +class TestKubernetesJobWatcher(unittest.TestCase): + def setUp(self): + self.watcher = KubernetesJobWatcher( + namespace="airflow", + multi_namespace_mode=False, + watcher_queue=mock.MagicMock(), + resource_version="0", + scheduler_job_id="123", + kube_config=mock.MagicMock(), + ) + self.kube_client = mock.MagicMock() + self.core_annotations = { + "dag_id": "dag", + "task_id": "task", + "execution_date": "dt", + "try_number": "1", + } + self.pod = k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + name="foo", + annotations={"airflow-worker": "bar", **self.core_annotations}, + namespace="airflow", + resource_version="456", + ), + status=k8s.V1PodStatus(phase="Pending"), + ) + self.events = [] + + def _run(self): + with mock.patch('airflow.executors.kubernetes_executor.watch') as mock_watch: + mock_watch.Watch.return_value.stream.return_value = self.events + latest_resource_version = self.watcher._run( + self.kube_client, + self.watcher.resource_version, + self.watcher.scheduler_job_id, + self.watcher.kube_config, + ) + assert self.pod.metadata.resource_version == latest_resource_version + + def assert_watcher_queue_called_once_with_state(self, state): + self.watcher.watcher_queue.put.assert_called_once_with( + ( + self.pod.metadata.name, + self.watcher.namespace, + state, + self.core_annotations, + self.pod.metadata.resource_version, + ) + ) + + def test_process_status_pending(self): + self.events.append({"type": 'MODIFIED', "object": self.pod}) + + self._run() + self.watcher.watcher_queue.put.assert_not_called() + + def test_process_status_pending_deleted(self): + self.events.append({"type": 'DELETED', "object": self.pod}) + + self._run() + self.assert_watcher_queue_called_once_with_state(State.FAILED) + + def test_process_status_failed(self): + self.pod.status.phase = "Failed" + self.events.append({"type": 'MODIFIED', "object": self.pod}) + + self._run() + self.assert_watcher_queue_called_once_with_state(State.FAILED) + + def test_process_status_succeeded(self): + self.pod.status.phase = "Succeeded" + self.events.append({"type": 'MODIFIED', "object": self.pod}) + + self._run() + self.assert_watcher_queue_called_once_with_state(None) + + def test_process_status_running(self): + self.pod.status.phase = "Running" + self.events.append({"type": 'MODIFIED', "object": self.pod}) + + self._run() + self.watcher.watcher_queue.put.assert_not_called() + + def test_process_status_catchall(self): + self.pod.status.phase = "Unknown" + self.events.append({"type": 'MODIFIED', "object": self.pod}) + + self._run() + self.watcher.watcher_queue.put.assert_not_called()