Skip to content

Commit

Permalink
Fix KubernetesExecutor issue with deleted pending pods (#14810)
Browse files Browse the repository at this point in the history
This change treats a pending KubernetesExecutor worker pod deletion
as a failure. This allows them to follow the configured retry rules
for the task as one would expect.

(cherry picked from commit a639dd3)
  • Loading branch information
jedcunningham authored and ashb committed Mar 19, 2021
1 parent ecd3e3c commit 59afddd
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 4 deletions.
6 changes: 2 additions & 4 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,8 @@ def process_status(
"""Process status response"""
if status == 'Pending':
if event['type'] == 'DELETED':
self.log.info('Event: Failed to start pod %s, will reschedule', pod_id)
self.watcher_queue.put(
(pod_id, namespace, State.UP_FOR_RESCHEDULE, annotations, resource_version)
)
self.log.info('Event: Failed to start pod %s', pod_id)
self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))
else:
self.log.info('Event: %s Pending', pod_id)
elif status == 'Failed':
Expand Down
92 changes: 92 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.executors.kubernetes_executor import (
AirflowKubernetesScheduler,
KubernetesExecutor,
KubernetesJobWatcher,
create_pod_id,
get_base_pod_from_template,
)
Expand Down Expand Up @@ -328,3 +329,94 @@ def test_not_adopt_unassigned_task(self, mock_kube_client):
executor.adopt_launched_task(mock_kube_client, pod=pod, pod_ids=pod_ids)
assert not mock_kube_client.patch_namespaced_pod.called
assert pod_ids == {"foobar": {}}


class TestKubernetesJobWatcher(unittest.TestCase):
def setUp(self):
self.watcher = KubernetesJobWatcher(
namespace="airflow",
multi_namespace_mode=False,
watcher_queue=mock.MagicMock(),
resource_version="0",
scheduler_job_id="123",
kube_config=mock.MagicMock(),
)
self.kube_client = mock.MagicMock()
self.core_annotations = {
"dag_id": "dag",
"task_id": "task",
"execution_date": "dt",
"try_number": "1",
}
self.pod = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name="foo",
annotations={"airflow-worker": "bar", **self.core_annotations},
namespace="airflow",
resource_version="456",
),
status=k8s.V1PodStatus(phase="Pending"),
)
self.events = []

def _run(self):
with mock.patch('airflow.executors.kubernetes_executor.watch') as mock_watch:
mock_watch.Watch.return_value.stream.return_value = self.events
latest_resource_version = self.watcher._run(
self.kube_client,
self.watcher.resource_version,
self.watcher.scheduler_job_id,
self.watcher.kube_config,
)
assert self.pod.metadata.resource_version == latest_resource_version

def assert_watcher_queue_called_once_with_state(self, state):
self.watcher.watcher_queue.put.assert_called_once_with(
(
self.pod.metadata.name,
self.watcher.namespace,
state,
self.core_annotations,
self.pod.metadata.resource_version,
)
)

def test_process_status_pending(self):
self.events.append({"type": 'MODIFIED', "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_pending_deleted(self):
self.events.append({"type": 'DELETED', "object": self.pod})

self._run()
self.assert_watcher_queue_called_once_with_state(State.FAILED)

def test_process_status_failed(self):
self.pod.status.phase = "Failed"
self.events.append({"type": 'MODIFIED', "object": self.pod})

self._run()
self.assert_watcher_queue_called_once_with_state(State.FAILED)

def test_process_status_succeeded(self):
self.pod.status.phase = "Succeeded"
self.events.append({"type": 'MODIFIED', "object": self.pod})

self._run()
self.assert_watcher_queue_called_once_with_state(None)

def test_process_status_running(self):
self.pod.status.phase = "Running"
self.events.append({"type": 'MODIFIED', "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_catchall(self):
self.pod.status.phase = "Unknown"
self.events.append({"type": 'MODIFIED', "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

0 comments on commit 59afddd

Please sign in to comment.