diff --git a/stellar_sdk/call_builder/base_call_builder.py b/stellar_sdk/call_builder/base_call_builder.py index 506e7670..5f5ef44b 100644 --- a/stellar_sdk/call_builder/base_call_builder.py +++ b/stellar_sdk/call_builder/base_call_builder.py @@ -90,6 +90,8 @@ def stream( :return: If it is called synchronous, it will return ``Generator``, If it is called asynchronously, it will return ``AsyncGenerator``. + + :raise: :exc:`StreamClientError ` - Failed to fetch stream resource. """ if self.__async: return self.__stream_async() diff --git a/stellar_sdk/client/aiohttp_client.py b/stellar_sdk/client/aiohttp_client.py index d14daebe..e9280da7 100644 --- a/stellar_sdk/client/aiohttp_client.py +++ b/stellar_sdk/client/aiohttp_client.py @@ -2,13 +2,13 @@ import json import logging from typing import Optional, AsyncGenerator, Any, Dict - import aiohttp from aiohttp_sse_client.client import EventSource from .base_async_client import BaseAsyncClient from .response import Response from ..__version__ import __version__ +from ..exceptions import StreamClientError logger = logging.getLogger(__name__) @@ -41,7 +41,7 @@ def __init__( request_timeout: float = DEFAULT_REQUEST_TIMEOUT, backoff_factor: Optional[float] = DEFAULT_BACKOFF_FACTOR, user_agent: Optional[str] = None, - **kwargs + **kwargs, ) -> None: self.backoff_factor: Optional[float] = backoff_factor self.request_timeout: float = request_timeout @@ -65,7 +65,7 @@ def __init__( headers=self.headers.copy(), connector=connector, timeout=aiohttp.ClientTimeout(total=request_timeout), - **kwargs + **kwargs, ) self._session: aiohttp.ClientSession = session @@ -75,7 +75,7 @@ async def get(self, url: str, params: Dict[str, str] = None) -> Response: """Perform HTTP GET request. :param url: the request url - :param params: the requested params + :param params: the request params :return: the response from server :raise: :exc:`ConnectionError ` """ @@ -112,11 +112,20 @@ async def post(self, url: str, data: Dict[str, str] = None) -> Response: async def stream( self, url: str, params: Dict[str, str] = None ) -> AsyncGenerator[Dict[str, Any], None]: - """Init the sse session """ + """Perform Stream request. + + :param url: the request url + :param params: the request params + :return: the stream response from server + :raise: :exc:`StreamClientError ` - Failed to fetch stream resource. + """ + + # Init the sse session if self._sse_session is None: - # No timeout, no special connector + # No special connector # Other headers such as "Accept: text/event-stream" are added by thr SSEClient - self._sse_session = aiohttp.ClientSession() + timeout = aiohttp.ClientTimeout(total=60 * 5) + self._sse_session = aiohttp.ClientSession(timeout=timeout) query_params = {**params} if params else dict() @@ -158,10 +167,13 @@ async def stream( except json.JSONDecodeError: # Content was not json-decodable pass - except aiohttp.ClientConnectionError: - # Retry if the connection dropped after we got the initial response + except aiohttp.ClientError as e: + raise StreamClientError( + query_params["cursor"], "Failed to get stream message." + ) from e + except asyncio.exceptions.TimeoutError: logger.warning( - "We have encountered an error and we will try to reconnect, cursor = {}".format( + "We have encountered an timeout error and we will try to reconnect, cursor = {}".format( query_params.get("cursor") ) ) diff --git a/stellar_sdk/exceptions.py b/stellar_sdk/exceptions.py index 1d6fbcee..483d72d5 100644 --- a/stellar_sdk/exceptions.py +++ b/stellar_sdk/exceptions.py @@ -29,6 +29,7 @@ "BadResponseError", "UnknownRequestError", "NotPageableError", + "StreamClientError" ] @@ -192,6 +193,18 @@ class NotPageableError(BaseRequestError): """ +class StreamClientError(BaseRequestError): + """Failed to fetch stream resource. + + :param current_cursor: The cursor of the last message obtained can be used for reconnect. + :param message: error message + """ + + def __init__(self, current_cursor: str, message: str) -> None: + super().__init__(message) + self.current_cursor = current_cursor + + def raise_request_exception(response: Response) -> None: status_code = response.status_code if status_code == 200: