Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airbyte-cdk] - Update HttpClient to call authenticator on backoff retry attempts in HttpClient._send #47191

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union

import orjson
import requests
import requests_cache
from airbyte_cdk.models import (
Expand Down Expand Up @@ -45,7 +46,6 @@
from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from orjson import orjson
from requests.auth import AuthBase

BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH")
Expand Down Expand Up @@ -249,6 +249,8 @@ def _send(
self._request_attempt_count[request] = 1
else:
self._request_attempt_count[request] += 1
if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase):
self._session.auth(request)

self._logger.debug(
"Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body}
Expand Down Expand Up @@ -285,6 +287,20 @@ def _send(
lambda: formatter(response), # type: ignore # log_formatter is always cast to a callable
)

self._handle_error_resolution(
response=response, exc=exc, request=request, error_resolution=error_resolution, exit_on_rate_limit=exit_on_rate_limit
)

return response # type: ignore # will either return a valid response of type requests.Response or raise an exception

def _handle_error_resolution(
self,
response: Optional[requests.Response],
exc: Optional[requests.RequestException],
request: requests.PreparedRequest,
error_resolution: ErrorResolution,
exit_on_rate_limit: Optional[bool] = False,
) -> None:
# Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached
if error_resolution.response_action == ResponseAction.RATE_LIMITED:
# TODO: Update to handle with message repository when concurrent message repository is ready
Expand Down Expand Up @@ -362,8 +378,6 @@ def _send(
self._logger.error(response.text)
raise e

return response # type: ignore # will either return a valid response of type requests.Response or raise an exception

@property
def name(self) -> str:
return self._name
Expand Down
Loading