From 3ad4e66510444b32313edc2f61d4fdfb5290767e Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Fri, 27 Aug 2021 15:52:02 +0300 Subject: [PATCH 1/3] Python CDK: fix retry attempts in case of user defined backoff time --- airbyte-cdk/python/CHANGELOG.md | 3 ++ .../airbyte_cdk/sources/streams/http/http.py | 32 +++++++++-- airbyte-cdk/python/setup.py | 2 +- .../sources/streams/http/test_http.py | 53 +++++++++++++++++-- 4 files changed, 82 insertions(+), 8 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index d296390c5fb3..e8a1e8e92088 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.13 +Fix defect with user defined backoff time retry attempts, number of retries logic fixed + ## 0.1.12 Add raise_on_http_errors, max_retries, retry_factor properties to be able to ignore http status errors and modify retry time in HTTP stream diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 2a0178875f65..a926506bd79b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -73,7 +73,7 @@ def raise_on_http_errors(self) -> bool: @property def max_retries(self) -> int: """ - Override if needed. Specifies maximum amount of retries for backoff policy. + Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit. """ return 5 @@ -265,9 +265,33 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi """ Creates backoff wrappers which are responsible for retry logic """ - backoff_handler = default_backoff_handler(max_tries=self.max_retries, factor=self.retry_factor) - user_backoff_handler = user_defined_backoff_handler(max_tries=self.max_retries)(backoff_handler) - return user_backoff_handler(self._send)(request, request_kwargs) + + """ + Backoff package has max_tries parameter that means total number of + tries before giving up, so if this number is 0 no calls expected to be done. + But for this class we call it max_REtries assuming there would be at + least one attempt and some retry attempts, to comply this logic we add + 1 to expected retries attempts. + """ + max_tries = self.max_retries + """ + According to backoff max_tries docstring: + max_tries: The maximum number of attempts to make before giving + up ...The default value of None means there is no limit to + the number of tries. + This implies that is max_tries is excplicitly set to None there is no + limit to retry attempts, otherwise it is limited number of tries. But + this is not true for current version of backoff packages (1.8.0). Setting + max_tries to 0 or negative number would result in endless retry atempts. + Add this condition to avoid endless loop if it havent been set + explicitly (i.e. max_retries is not None). + """ + if max_tries is not None: + max_tries = 1 if max_tries <= 0 else max_tries + 1 + + user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send) + backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor) + return backoff_handler(user_backoff_handler)(request, request_kwargs) def read_records( self, diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 28cb7070ea7b..38a119df0e50 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.12", + version="0.1.13", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index 57eb9875bf02..5e0348b54a95 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -24,6 +24,7 @@ import json +from http import HTTPStatus from typing import Any, Iterable, Mapping, Optional from unittest.mock import ANY @@ -66,12 +67,13 @@ def test_request_kwargs_used(mocker, requests_mock): stream = StubBasicReadHttpStream() request_kwargs = {"cert": None, "proxies": "google.com"} mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs) - mocker.patch.object(stream._session, "send", wraps=stream._session.send) + send_mock = mocker.patch.object(stream._session, "send", wraps=stream._session.send) requests_mock.register_uri("GET", stream.url_base) list(stream.read_records(sync_mode=SyncMode.full_refresh)) stream._session.send.assert_any_call(ANY, **request_kwargs) + assert send_mock.call_count == 1 def test_stub_basic_read_http_stream_read_records(mocker): @@ -149,14 +151,58 @@ def test_stub_custom_backoff_http_stream(mocker): req = requests.Response() req.status_code = 429 - mocker.patch.object(requests.Session, "send", return_value=req) + send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises(UserDefinedBackoffException): list(stream.read_records(SyncMode.full_refresh)) + assert send_mock.call_count == stream.max_retries + 1 # TODO(davin): Figure out how to assert calls. +@pytest.mark.parametrize("retries", [-20, -1, 0, 1, 2, 10]) +def test_stub_custom_backoff_http_stream_retries(mocker, retries): + mocker.patch("time.sleep", lambda x: None) + + class StubCustomBackoffHttpStreamRetires(StubCustomBackoffHttpStream): + @property + def max_retries(self): + return retries + + stream = StubCustomBackoffHttpStreamRetires() + req = requests.Response() + req.status_code = HTTPStatus.TOO_MANY_REQUESTS + send_mock = mocker.patch.object(requests.Session, "send", return_value=req) + + with pytest.raises(UserDefinedBackoffException): + list(stream.read_records(SyncMode.full_refresh)) + if retries <= 0: + assert send_mock.call_count == 1 + else: + assert send_mock.call_count == stream.max_retries + 1 + + +def test_stub_custom_backoff_http_stream_endless_retries(mocker): + mocker.patch("time.sleep", lambda x: None) + + class StubCustomBackoffHttpStreamRetires(StubCustomBackoffHttpStream): + @property + def max_retries(self): + return None + + infinite_number = 20 + + stream = StubCustomBackoffHttpStreamRetires() + req = requests.Response() + req.status_code = HTTPStatus.TOO_MANY_REQUESTS + send_mock = mocker.patch.object(requests.Session, "send", side_effect=[req] * infinite_number) + + # Expecting mock object to raise a RuntimeError when the end of side_effect list parameter reached. + with pytest.raises(RuntimeError): + list(stream.read_records(SyncMode.full_refresh)) + assert send_mock.call_count == infinite_number + 1 + + @pytest.mark.parametrize("http_code", [400, 401, 403]) def test_4xx_error_codes_http_stream(mocker, http_code): stream = StubCustomBackoffHttpStream() @@ -190,9 +236,10 @@ def test_raise_on_http_errors_off_5xx(mocker, status_code): req = requests.Response() req.status_code = status_code - mocker.patch.object(requests.Session, "send", return_value=req) + send_mock = mocker.patch.object(requests.Session, "send", return_value=req) with pytest.raises(DefaultBackoffException): list(stream.read_records(SyncMode.full_refresh)) + assert send_mock.call_count == stream.max_retries + 1 @pytest.mark.parametrize("status_code", [400, 401, 402, 403, 416]) From 47c8c3a22cb516a19c7487cb560a2e8f43fc8511 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Fri, 27 Aug 2021 18:57:40 +0300 Subject: [PATCH 2/3] Fix review comments --- airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index a926506bd79b..7d9711b7d08d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -71,7 +71,7 @@ def raise_on_http_errors(self) -> bool: return True @property - def max_retries(self) -> int: + def max_retries(self) -> Union[int, None]: """ Override if needed. Specifies maximum amount of retries for backoff policy. Return None for no limit. """ From 1ce67fe3f986fec88ce6c5377931d9c4a1379e48 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Mon, 30 Aug 2021 10:11:31 +0300 Subject: [PATCH 3/3] fix review comments --- .../python/airbyte_cdk/sources/streams/http/http.py | 6 +++--- .../python/unit_tests/sources/streams/http/test_http.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 7d9711b7d08d..a078266e1f47 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -279,15 +279,15 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi max_tries: The maximum number of attempts to make before giving up ...The default value of None means there is no limit to the number of tries. - This implies that is max_tries is excplicitly set to None there is no + This implies that if max_tries is excplicitly set to None there is no limit to retry attempts, otherwise it is limited number of tries. But this is not true for current version of backoff packages (1.8.0). Setting max_tries to 0 or negative number would result in endless retry atempts. - Add this condition to avoid endless loop if it havent been set + Add this condition to avoid an endless loop if it hasnt been set explicitly (i.e. max_retries is not None). """ if max_tries is not None: - max_tries = 1 if max_tries <= 0 else max_tries + 1 + max_tries = max(0, max_tries) + 1 user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send) backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor) diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index 5e0348b54a95..84a53835243d 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -164,12 +164,12 @@ def test_stub_custom_backoff_http_stream(mocker): def test_stub_custom_backoff_http_stream_retries(mocker, retries): mocker.patch("time.sleep", lambda x: None) - class StubCustomBackoffHttpStreamRetires(StubCustomBackoffHttpStream): + class StubCustomBackoffHttpStreamRetries(StubCustomBackoffHttpStream): @property def max_retries(self): return retries - stream = StubCustomBackoffHttpStreamRetires() + stream = StubCustomBackoffHttpStreamRetries() req = requests.Response() req.status_code = HTTPStatus.TOO_MANY_REQUESTS send_mock = mocker.patch.object(requests.Session, "send", return_value=req) @@ -185,14 +185,14 @@ def max_retries(self): def test_stub_custom_backoff_http_stream_endless_retries(mocker): mocker.patch("time.sleep", lambda x: None) - class StubCustomBackoffHttpStreamRetires(StubCustomBackoffHttpStream): + class StubCustomBackoffHttpStreamRetries(StubCustomBackoffHttpStream): @property def max_retries(self): return None infinite_number = 20 - stream = StubCustomBackoffHttpStreamRetires() + stream = StubCustomBackoffHttpStreamRetries() req = requests.Response() req.status_code = HTTPStatus.TOO_MANY_REQUESTS send_mock = mocker.patch.object(requests.Session, "send", side_effect=[req] * infinite_number)