Skip to content

Commit

Permalink
refactor: Optimize the use of stellar_sdk.client.AiohttpClient. (#317)
Browse files Browse the repository at this point in the history
* refactor: Optimize the use of `stellar_sdk.client.AiohttpClient`.
When `stellar_sdk.client.AiohttpClient` cannot establish a Stream connection, `stellar_sdk.exceptions.StreamClientError` will be thrown.
  • Loading branch information
overcat authored May 17, 2020
1 parent c636e62 commit a846dde
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 10 deletions.
2 changes: 2 additions & 0 deletions stellar_sdk/call_builder/base_call_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def stream(
:return: If it is called synchronous, it will return ``Generator``, If
it is called asynchronously, it will return ``AsyncGenerator``.
:raise: :exc:`StreamClientError <stellar_sdk.exceptions.StreamClientError>` - Failed to fetch stream resource.
"""
if self.__async:
return self.__stream_async()
Expand Down
32 changes: 22 additions & 10 deletions stellar_sdk/client/aiohttp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import json
import logging
from typing import Optional, AsyncGenerator, Any, Dict

import aiohttp
from aiohttp_sse_client.client import EventSource

from . import defines
from .base_async_client import BaseAsyncClient
from .response import Response
from ..__version__ import __version__
from ..exceptions import StreamClientError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,7 +42,7 @@ def __init__(
post_timeout: float = defines.DEFAULT_POST_TIMEOUT_SECONDS,
backoff_factor: Optional[float] = DEFAULT_BACKOFF_FACTOR,
user_agent: Optional[str] = None,
**kwargs
**kwargs,
) -> None:
self.backoff_factor: Optional[float] = backoff_factor
self.request_timeout: float = request_timeout
Expand All @@ -67,7 +67,7 @@ def __init__(
headers=self.headers.copy(),
connector=connector,
timeout=aiohttp.ClientTimeout(total=request_timeout),
**kwargs
**kwargs,
)

self._session: aiohttp.ClientSession = session
Expand All @@ -77,7 +77,7 @@ async def get(self, url: str, params: Dict[str, str] = None) -> Response:
"""Perform HTTP GET request.
:param url: the request url
:param params: the requested params
:param params: the request params
:return: the response from server
:raise: :exc:`ConnectionError <stellar_sdk.exceptions.ConnectionError>`
"""
Expand Down Expand Up @@ -114,11 +114,20 @@ async def post(self, url: str, data: Dict[str, str] = None) -> Response:
async def stream(
self, url: str, params: Dict[str, str] = None
) -> AsyncGenerator[Dict[str, Any], None]:
"""Init the sse session """
"""Perform Stream request.
:param url: the request url
:param params: the request params
:return: the stream response from server
:raise: :exc:`StreamClientError <stellar_sdk.exceptions.StreamClientError>` - Failed to fetch stream resource.
"""

# Init the sse session
if self._sse_session is None:
# No timeout, no special connector
# No special connector
# Other headers such as "Accept: text/event-stream" are added by thr SSEClient
self._sse_session = aiohttp.ClientSession()
timeout = aiohttp.ClientTimeout(total=60 * 5)
self._sse_session = aiohttp.ClientSession(timeout=timeout)

query_params = {**params} if params else dict()

Expand Down Expand Up @@ -160,10 +169,13 @@ async def stream(
except json.JSONDecodeError:
# Content was not json-decodable
pass
except aiohttp.ClientConnectionError:
# Retry if the connection dropped after we got the initial response
except aiohttp.ClientError as e:
raise StreamClientError(
query_params["cursor"], "Failed to get stream message."
) from e
except asyncio.exceptions.TimeoutError:
logger.warning(
"We have encountered an error and we will try to reconnect, cursor = {}".format(
"We have encountered an timeout error and we will try to reconnect, cursor = {}".format(
query_params.get("cursor")
)
)
Expand Down
13 changes: 13 additions & 0 deletions stellar_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"BadResponseError",
"UnknownRequestError",
"NotPageableError",
"StreamClientError"
]


Expand Down Expand Up @@ -192,6 +193,18 @@ class NotPageableError(BaseRequestError):
"""


class StreamClientError(BaseRequestError):
"""Failed to fetch stream resource.
:param current_cursor: The cursor of the last message obtained can be used for reconnect.
:param message: error message
"""

def __init__(self, current_cursor: str, message: str) -> None:
super().__init__(message)
self.current_cursor = current_cursor


def raise_request_exception(response: Response) -> None:
status_code = response.status_code
if status_code == 200:
Expand Down

0 comments on commit a846dde

Please sign in to comment.