Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Optimize the use of stellar_sdk.client.AiohttpClient. #317

Merged
merged 5 commits into from
May 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions stellar_sdk/call_builder/base_call_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def stream(

:return: If it is called synchronous, it will return ``Generator``, If
it is called asynchronously, it will return ``AsyncGenerator``.

:raise: :exc:`StreamClientError <stellar_sdk.exceptions.StreamClientError>` - Failed to fetch stream resource.
"""
if self.__async:
return self.__stream_async()
Expand Down
32 changes: 22 additions & 10 deletions stellar_sdk/client/aiohttp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import json
import logging
from typing import Optional, AsyncGenerator, Any, Dict

import aiohttp
from aiohttp_sse_client.client import EventSource

from .base_async_client import BaseAsyncClient
from .response import Response
from ..__version__ import __version__
from ..exceptions import StreamClientError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,7 +41,7 @@ def __init__(
request_timeout: float = DEFAULT_REQUEST_TIMEOUT,
backoff_factor: Optional[float] = DEFAULT_BACKOFF_FACTOR,
user_agent: Optional[str] = None,
**kwargs
**kwargs,
) -> None:
self.backoff_factor: Optional[float] = backoff_factor
self.request_timeout: float = request_timeout
Expand All @@ -65,7 +65,7 @@ def __init__(
headers=self.headers.copy(),
connector=connector,
timeout=aiohttp.ClientTimeout(total=request_timeout),
**kwargs
**kwargs,
)

self._session: aiohttp.ClientSession = session
Expand All @@ -75,7 +75,7 @@ async def get(self, url: str, params: Dict[str, str] = None) -> Response:
"""Perform HTTP GET request.

:param url: the request url
:param params: the requested params
:param params: the request params
:return: the response from server
:raise: :exc:`ConnectionError <stellar_sdk.exceptions.ConnectionError>`
"""
Expand Down Expand Up @@ -112,11 +112,20 @@ async def post(self, url: str, data: Dict[str, str] = None) -> Response:
async def stream(
self, url: str, params: Dict[str, str] = None
) -> AsyncGenerator[Dict[str, Any], None]:
"""Init the sse session """
"""Perform Stream request.

:param url: the request url
:param params: the request params
:return: the stream response from server
:raise: :exc:`StreamClientError <stellar_sdk.exceptions.StreamClientError>` - Failed to fetch stream resource.
"""

# Init the sse session
if self._sse_session is None:
# No timeout, no special connector
# No special connector
# Other headers such as "Accept: text/event-stream" are added by thr SSEClient
self._sse_session = aiohttp.ClientSession()
timeout = aiohttp.ClientTimeout(total=60 * 5)
self._sse_session = aiohttp.ClientSession(timeout=timeout)

query_params = {**params} if params else dict()

Expand Down Expand Up @@ -158,10 +167,13 @@ async def stream(
except json.JSONDecodeError:
# Content was not json-decodable
pass
except aiohttp.ClientConnectionError:
# Retry if the connection dropped after we got the initial response
except aiohttp.ClientError as e:
raise StreamClientError(
query_params["cursor"], "Failed to get stream message."
) from e
except asyncio.exceptions.TimeoutError:
logger.warning(
"We have encountered an error and we will try to reconnect, cursor = {}".format(
"We have encountered an timeout error and we will try to reconnect, cursor = {}".format(
query_params.get("cursor")
)
)
Expand Down
13 changes: 13 additions & 0 deletions stellar_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"BadResponseError",
"UnknownRequestError",
"NotPageableError",
"StreamClientError"
]


Expand Down Expand Up @@ -192,6 +193,18 @@ class NotPageableError(BaseRequestError):
"""


class StreamClientError(BaseRequestError):
"""Failed to fetch stream resource.

:param current_cursor: The cursor of the last message obtained can be used for reconnect.
:param message: error message
"""

def __init__(self, current_cursor: str, message: str) -> None:
super().__init__(message)
self.current_cursor = current_cursor


def raise_request_exception(response: Response) -> None:
status_code = response.status_code
if status_code == 200:
Expand Down