Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python CDK: fix retry attempts in case of user defined backoff time #5707

Merged
merged 3 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.13
Fix defect with user defined backoff time retry attempts, number of retries logic fixed

## 0.1.12
Add raise_on_http_errors, max_retries, retry_factor properties to be able to ignore http status errors and modify retry time in HTTP stream

Expand Down
34 changes: 29 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ def raise_on_http_errors(self) -> bool:
return True

@property
def max_retries(self) -> int:
def max_retries(self) -> Union[int, None]:
"""
Override if needed. Specifies maximum amount of retries for backoff policy.
Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit.
keu marked this conversation as resolved.
Show resolved Hide resolved
"""
return 5

Expand Down Expand Up @@ -265,9 +265,33 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi
"""
Creates backoff wrappers which are responsible for retry logic
"""
backoff_handler = default_backoff_handler(max_tries=self.max_retries, factor=self.retry_factor)
user_backoff_handler = user_defined_backoff_handler(max_tries=self.max_retries)(backoff_handler)
return user_backoff_handler(self._send)(request, request_kwargs)

"""
Backoff package has max_tries parameter that means total number of
tries before giving up, so if this number is 0 no calls expected to be done.
But for this class we call it max_REtries assuming there would be at
least one attempt and some retry attempts, to comply this logic we add
1 to expected retries attempts.
"""
max_tries = self.max_retries
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets not use docstrings in the middle of the function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not a docstring unless it occurs as the first statement in a module, function, class, or method definition. In this case its just a multi line comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I mean is that having novels like this in the middle of the function makes it hard to read and understand. please move it to the top.

According to backoff max_tries docstring:
max_tries: The maximum number of attempts to make before giving
up ...The default value of None means there is no limit to
the number of tries.
This implies that is max_tries is excplicitly set to None there is no
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This implies that is max_tries is excplicitly set to None there is no
This implies that if max_tries is excplicitly set to None there is no

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

limit to retry attempts, otherwise it is limited number of tries. But
this is not true for current version of backoff packages (1.8.0). Setting
max_tries to 0 or negative number would result in endless retry atempts.
Add this condition to avoid endless loop if it havent been set
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
explicitly (i.e. max_retries is not None).
"""
if max_tries is not None:
max_tries = 1 if max_tries <= 0 else max_tries + 1
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)

def read_records(
self,
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.12",
version="0.1.13",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
53 changes: 50 additions & 3 deletions airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@


import json
from http import HTTPStatus
from typing import Any, Iterable, Mapping, Optional
from unittest.mock import ANY

Expand Down Expand Up @@ -66,12 +67,13 @@ def test_request_kwargs_used(mocker, requests_mock):
stream = StubBasicReadHttpStream()
request_kwargs = {"cert": None, "proxies": "google.com"}
mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs)
mocker.patch.object(stream._session, "send", wraps=stream._session.send)
send_mock = mocker.patch.object(stream._session, "send", wraps=stream._session.send)
requests_mock.register_uri("GET", stream.url_base)

list(stream.read_records(sync_mode=SyncMode.full_refresh))

stream._session.send.assert_any_call(ANY, **request_kwargs)
assert send_mock.call_count == 1


def test_stub_basic_read_http_stream_read_records(mocker):
Expand Down Expand Up @@ -149,14 +151,58 @@ def test_stub_custom_backoff_http_stream(mocker):
req = requests.Response()
req.status_code = 429

mocker.patch.object(requests.Session, "send", return_value=req)
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)

with pytest.raises(UserDefinedBackoffException):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == stream.max_retries + 1

# TODO(davin): Figure out how to assert calls.


@pytest.mark.parametrize("retries", [-20, -1, 0, 1, 2, 10])
def test_stub_custom_backoff_http_stream_retries(mocker, retries):
mocker.patch("time.sleep", lambda x: None)

class StubCustomBackoffHttpStreamRetires(StubCustomBackoffHttpStream):
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
@property
def max_retries(self):
return retries

stream = StubCustomBackoffHttpStreamRetires()
req = requests.Response()
req.status_code = HTTPStatus.TOO_MANY_REQUESTS
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)

with pytest.raises(UserDefinedBackoffException):
list(stream.read_records(SyncMode.full_refresh))
if retries <= 0:
Comment on lines +177 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with pytest.raises(UserDefinedBackoffException):
list(stream.read_records(SyncMode.full_refresh))
if retries <= 0:
with pytest.raises(UserDefinedBackoffException):
list(stream.read_records(SyncMode.full_refresh))
if retries <= 0:

assert send_mock.call_count == 1
else:
assert send_mock.call_count == stream.max_retries + 1


def test_stub_custom_backoff_http_stream_endless_retries(mocker):
mocker.patch("time.sleep", lambda x: None)

class StubCustomBackoffHttpStreamRetires(StubCustomBackoffHttpStream):
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
@property
def max_retries(self):
return None

infinite_number = 20

stream = StubCustomBackoffHttpStreamRetires()
req = requests.Response()
req.status_code = HTTPStatus.TOO_MANY_REQUESTS
send_mock = mocker.patch.object(requests.Session, "send", side_effect=[req] * infinite_number)

# Expecting mock object to raise a RuntimeError when the end of side_effect list parameter reached.
with pytest.raises(RuntimeError):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == infinite_number + 1
Comment on lines +202 to +203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == infinite_number + 1
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == infinite_number + 1



@pytest.mark.parametrize("http_code", [400, 401, 403])
def test_4xx_error_codes_http_stream(mocker, http_code):
stream = StubCustomBackoffHttpStream()
Expand Down Expand Up @@ -190,9 +236,10 @@ def test_raise_on_http_errors_off_5xx(mocker, status_code):
req = requests.Response()
req.status_code = status_code

mocker.patch.object(requests.Session, "send", return_value=req)
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)
with pytest.raises(DefaultBackoffException):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == stream.max_retries + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert send_mock.call_count == stream.max_retries + 1
assert send_mock.call_count == stream.max_retries + 1



@pytest.mark.parametrize("status_code", [400, 401, 402, 403, 416])
Expand Down