diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py index 0924ef28021f..cccbc4b8c01c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py @@ -8,6 +8,7 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union +import orjson import requests import requests_cache from airbyte_cdk.models import ( @@ -45,7 +46,6 @@ from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException -from orjson import orjson from requests.auth import AuthBase BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH") @@ -249,6 +249,8 @@ def _send( self._request_attempt_count[request] = 1 else: self._request_attempt_count[request] += 1 + if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase): + self._session.auth(request) self._logger.debug( "Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body} @@ -285,6 +287,20 @@ def _send( lambda: formatter(response), # type: ignore # log_formatter is always cast to a callable ) + self._handle_error_resolution( + response=response, exc=exc, request=request, error_resolution=error_resolution, exit_on_rate_limit=exit_on_rate_limit + ) + + return response # type: ignore # will either return a valid response of type requests.Response or raise an exception + + def _handle_error_resolution( + self, + response: Optional[requests.Response], + exc: Optional[requests.RequestException], + request: requests.PreparedRequest, + error_resolution: ErrorResolution, + exit_on_rate_limit: Optional[bool] = False, + ) -> None: # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached if error_resolution.response_action == ResponseAction.RATE_LIMITED: # TODO: Update to handle with message repository when concurrent message repository is ready @@ -362,8 +378,6 @@ def _send( self._logger.error(response.text) raise e - return response # type: ignore # will either return a valid response of type requests.Response or raise an exception - @property def name(self) -> str: return self._name