Skip to content

Commit

Permalink
Retry pod launching on 409 ApiExceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
SamWheating committed Apr 6, 2021
1 parent 18e2c1d commit fb5f077
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
13 changes: 13 additions & 0 deletions airflow/providers/cncf/kubernetes/utils/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
from airflow.utils.state import State


def should_retry_start_pod(exception: Exception):
"""Check if an Exception indicates a transient error and warrants retrying"""
if isinstance(exception, ApiException):
return exception.status == 409
return False


class PodStatus:
"""Status of the PODs"""

Expand Down Expand Up @@ -98,6 +105,12 @@ def delete_pod(self, pod: V1Pod):
if e.status != 404:
raise

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_start_pod),
)
def start_pod(self, pod: V1Pod, startup_timeout: int = 120):
"""
Launches the pod synchronously and waits for completion.
Expand Down
29 changes: 29 additions & 0 deletions tests/providers/cncf/kubernetes/utils/test_pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from unittest import mock

import pytest
from kubernetes.client.rest import ApiException
from requests.exceptions import BaseHTTPError

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -206,3 +207,31 @@ def test_parse_log_line(self):

with pytest.raises(Exception):
self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalidmessage\n')

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async")
def test_start_pod_retries_on_409_error(self, mock_run_pod_async):
mock_run_pod_async.side_effect = [
ApiException(status=409),
mock.MagicMock(),
]
self.pod_launcher.start_pod(mock.sentinel)
assert mock_run_pod_async.call_count == 2

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async")
def test_start_pod_fails_on_other_exception(self, mock_run_pod_async):
mock_run_pod_async.side_effect = [ApiException(status=504)]
with pytest.raises(ApiException):
self.pod_launcher.start_pod(mock.sentinel)

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async")
def test_start_pod_retries_three_times(self, mock_run_pod_async):
mock_run_pod_async.side_effect = [
ApiException(status=409),
ApiException(status=409),
ApiException(status=409),
ApiException(status=409),
]
with pytest.raises(ApiException):
self.pod_launcher.start_pod(mock.sentinel)

assert mock_run_pod_async.call_count == 3

0 comments on commit fb5f077

Please sign in to comment.