Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets v2 update #3096

Merged
merged 10 commits into from
Sep 19, 2023
48 changes: 47 additions & 1 deletion docs/providers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ asynchronous context manager, can be found in the `websockets connection`_ docs.
...
... unsubscribed = False
... while not unsubscribed:
... async for response in w3.listen_to_websocket():
... async for response in w3.ws.listen_to_websocket():
... print(f"{response}\n")
... # handle responses here
...
Expand Down Expand Up @@ -320,6 +320,52 @@ and reconnect automatically if the connection is lost. A similar example, using
>>> asyncio.run(ws_v2_subscription_iterator_example())


_PersistentConnectionWeb3 via AsyncWeb3.persistent_websocket()
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When an ``AsyncWeb3`` class is connected to a persistent websocket connection, via the
``persistent_websocket()`` method, it becomes an instance of the
``_PersistentConnectionWeb3`` class. This class has a few additional methods and
attributes that are not available on the ``AsyncWeb3`` class.

.. py:attribute:: _PersistentConnectionWeb3.ws

Listening to websocket responses, and sending raw requests, can be done using the
``ws`` attribute of the ``_PersistentConnectionWeb3`` class. The ``ws`` attribute
houses a public API, a :class:`~web3.providers.websocket.WebsocketConnection` class,
for sending and receiving websocket messages.

.. py:class:: web3.providers.websocket.WebsocketConnection()

This class handles interactions with a websocket connection. It is available
via the ``ws`` attribute of the ``_PersistentConnectionWeb3`` class. The
``WebsocketConnection`` class has the following methods:

.. py:method:: listen_to_websocket()

This method is available for listening to websocket responses indefinitely.
It is an asynchronous generator that yields responses from the websocket
connection. The responses from this method are formatted by web3.py
formatters and run through the middlewares before being yielded.
An example of its use can be seen above in the `Usage`_ section.

.. py:method:: recv()

The ``recv()`` method can be used to receive the next message from the
websocket. The response from this method is formatted by web3.py formatters
and run through the middlewares before being returned. This is useful for
receiving singled responses for one-to-many requests such receiving the
next ``eth_subscribe`` subscription response.

.. py:method:: send(method: RPCEndpoint, params: Sequence[Any])

This method is available strictly for sending raw requests to the websocket,
if desired. It is not recommended to use this method directly, as the
responses will not be formatted by web3.py formatters or run through the
middlewares. Instead, use the methods available on the respective web3
module. For example, use ``w3.eth.get_block("latest")`` instead of
``w3.ws.send("eth_getBlockByNumber", ["latest", True])``.


AutoProvider
~~~~~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions newsfragments/3096.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Breaking change to the API for interacting with a persistent websocket connection via ``AsyncWeb3`` and ``WebsocketProviderV2``. This change internalizes the ``provider.ws`` property and opts for a ``w3.ws`` API achieved via a new ``WebsocketConnection`` class. With these changes, ``eth_subscription`` messages now return the subscription id as the ``subscription`` param and the formatted message as the ``result`` param.
1 change: 1 addition & 0 deletions newsfragments/3096.docs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update ``WebsocketProviderV2`` documentation to reflect the new public websocket API via the ``WebsocketConnection`` class.
1 change: 1 addition & 0 deletions newsfragments/3096.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Sync responses for ``WebsocketProviderV2`` open connections with requests via matching RPC ``id`` values.
81 changes: 53 additions & 28 deletions web3/_utils/method_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from eth_typing import (
HexStr,
)
from eth_utils import (
is_hexstr,
)
from eth_utils.curried import (
apply_formatter_at_index,
apply_formatter_if,
Expand Down Expand Up @@ -201,6 +204,7 @@ def type_aware_apply_formatters_to_dict_keys_and_values(
"to": apply_formatter_if(is_address, to_checksum_address),
"hash": to_hexbytes(32),
"v": apply_formatter_if(is_not_null, to_integer_if_hex),
"yParity": apply_formatter_if(is_not_null, to_integer_if_hex),
"standardV": apply_formatter_if(is_not_null, to_integer_if_hex),
"type": apply_formatter_if(is_not_null, to_integer_if_hex),
"chainId": apply_formatter_if(is_not_null, to_integer_if_hex),
Expand Down Expand Up @@ -612,40 +616,61 @@ def apply_list_to_array_formatter(formatter: Any) -> Callable[..., Any]:

# -- eth_subscribe -- #
def subscription_formatter(value: Any) -> Union[HexBytes, HexStr, Dict[str, Any]]:
if is_string(value):
if len(value.replace("0x", "")) == 64:
# transaction hash, from `newPendingTransactions` subscription w/o full_txs
return HexBytes(value)

if is_hexstr(value):
# subscription id from the original subscription request
return HexStr(value)

response_key_set = set(value.keys())

# handle dict subscription responses
if either_set_is_a_subset(response_key_set, set(BLOCK_FORMATTERS.keys())):
# block format, newHeads
return block_formatter(value)
elif isinstance(value, dict):
# subscription messages

elif either_set_is_a_subset(response_key_set, set(LOG_ENTRY_FORMATTERS.keys())):
# logs
return log_entry_formatter(value)
result = value.get("result")
result_formatter = None

elif either_set_is_a_subset(
response_key_set, set(TRANSACTION_RESULT_FORMATTERS.keys())
):
# transaction subscription type (newPendingTransactions), full transactions
return transaction_result_formatter(value)

elif any(_ in response_key_set for _ in {"syncing", "status"}):
# geth syncing response
return type_aware_apply_formatters_to_dict(GETH_SYNCING_SUBSCRIPTION_FORMATTERS)

elif either_set_is_a_subset(response_key_set, set(SYNCING_FORMATTERS.keys())):
# syncing response object
return syncing_formatter
if isinstance(result, str) and len(result.replace("0x", "")) == 64:
# transaction hash, from `newPendingTransactions` subscription w/o full_txs
result_formatter = HexBytes

elif isinstance(result, (dict, AttributeDict)):
result_key_set = set(result.keys())

# handle dict subscription responses
if either_set_is_a_subset(
result_key_set,
set(BLOCK_FORMATTERS.keys()),
percentage=90,
):
# block format, newHeads
result_formatter = block_formatter

elif either_set_is_a_subset(
result_key_set, set(LOG_ENTRY_FORMATTERS.keys()), percentage=90
):
# logs
result_formatter = log_entry_formatter

elif either_set_is_a_subset(
result_key_set, set(TRANSACTION_RESULT_FORMATTERS.keys()), percentage=90
):
# newPendingTransactions, full transactions
result_formatter = transaction_result_formatter

elif any(_ in result_key_set for _ in {"syncing", "status"}):
# geth syncing response
result_formatter = type_aware_apply_formatters_to_dict(
GETH_SYNCING_SUBSCRIPTION_FORMATTERS
)

elif either_set_is_a_subset(
result_key_set,
set(SYNCING_FORMATTERS.keys()),
percentage=90,
):
# syncing response object
result_formatter = syncing_formatter

if result_formatter is not None:
value["result"] = result_formatter(result)

# fallback to returning the value as-is
return value


Expand Down
23 changes: 17 additions & 6 deletions web3/_utils/utility_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,26 @@ def none_in_dict(
return not any_in_dict(values, d)


def either_set_is_a_subset(set1: Set[Any], set2: Set[Any]) -> bool:
def either_set_is_a_subset(
set1: Set[Any],
set2: Set[Any],
percentage: int = 100,
) -> bool:
"""
Returns a bool based on whether two sets might have some differences but are mostly
the same. This can be useful when comparing formatters to an actual response for
formatting.

:param set1: A set of values
:param set2: A second set of values
:return: True if the intersection of the two sets is equal to the first set;
False if the intersection of the two sets is NOT equal to the first set
:param set1: A set of values.
:param set2: A second set of values.
:param percentage: The percentage of either set that must be present in the
other set; defaults to 100.
:return: True if one set's intersection with the other set is greater
than or equal to the given percentage of the other set.
"""
return set1.intersection(set2) == set1 or set2.intersection(set1) == set2
threshold = percentage / 100

return (
len(set1.intersection(set2)) >= len(set1) * threshold
or len(set2.intersection(set1)) >= len(set2) * threshold
)
15 changes: 7 additions & 8 deletions web3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
build_strict_registry,
map_abi_data,
)
from web3._utils.compat import (
Self,
)
from web3._utils.empty import (
empty,
)
Expand Down Expand Up @@ -127,6 +130,7 @@
from web3.providers.websocket import (
WebsocketProvider,
)
from web3.providers.websocket.websocket_connection import WebsocketConnection
from web3.testing import (
Testing,
)
Expand All @@ -142,9 +146,6 @@
if TYPE_CHECKING:
from web3.pm import PM # noqa: F401
from web3._utils.empty import Empty # noqa: F401
from web3.manager import ( # noqa: F401
_AsyncPersistentRecvStream,
)


def get_async_default_modules() -> Dict[str, Union[Type[Module], Sequence[Any]]]:
Expand Down Expand Up @@ -538,9 +539,10 @@ def __init__(
"Provider must inherit from PersistentConnectionProvider class."
)
AsyncWeb3.__init__(self, provider, middlewares, modules, external_modules, ens)
self.ws = WebsocketConnection(self)

# async for w3 in w3.persistent_websocket(provider)
async def __aiter__(self) -> AsyncIterator["_PersistentConnectionWeb3"]:
async def __aiter__(self) -> AsyncIterator[Self]:
while True:
try:
yield self
Expand All @@ -549,7 +551,7 @@ async def __aiter__(self) -> AsyncIterator["_PersistentConnectionWeb3"]:
continue

# async with w3.persistent_websocket(provider) as w3
async def __aenter__(self) -> "_PersistentConnectionWeb3":
async def __aenter__(self) -> Self:
await self.provider.connect()
return self

Expand All @@ -560,6 +562,3 @@ async def __aexit__(
exc_tb: TracebackType,
) -> None:
await self.provider.disconnect()

def listen_to_websocket(self) -> "_AsyncPersistentRecvStream":
return self.manager.persistent_recv_stream()
Loading