Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets v2 update #3096

Merged
merged 10 commits into from
Sep 19, 2023
Merged

Websockets v2 update #3096

merged 10 commits into from
Sep 19, 2023

Conversation

fselmo
Copy link
Collaborator

@fselmo fselmo commented Sep 13, 2023

What was wrong?

The main reason WebsocketProviderV2 is in beta is that we have to asynchronously process a response from the original request that was made. This isn't so straightforward with the way websockets work because the socket will give back the next item it needs to, whatever that may be, when we call recv(). If a user has subscribed to an eth_subscribe subscription, say something like "newPendingTransactions" where it will return many transactions almost nearly instantly as soon as a user has subscribed to it, one (or many) of these subscription messages may return before the original request was made for which a user may be expecting a response. Consider the following:

async def wsV2_example():
    async with AsyncWeb3.persistent_websocket(
        WebsocketProviderV2(f"wss://mainnet.infura.io/ws/v3/{INFURA_PROJECT_ID}")
    ) as w3:
        # -- get pending transactions subscription -- #
        
        pending_full_txs = True
        pending_tx_sub_id = await w3.eth.subscribe("newPendingTransactions", pending_full_txs)
        print(f"pending_tx_sub_id: {pending_tx_sub_id}\n")

        # -- logs subscription -- #

        logs_sub_id = await w3.eth.subscribe("logs", {"address": ETHEREUM_WETH_CONTRACT})
        # we will likely get one of the "newPendingTransactions" subscriptions returned here 
        # before we get the response to our second `eth_subscribe` request with the `logs_sub_id` value.
        print(f"logs_sub_id: {logs_sub_id}\n")

        while True:
            async for response in w3.listen_to_websocket():
                # when we actually listen to the websocket for our subscriptions, we will eventually get the response
                # we needed to the second subscription request with our `logs_sub_id` value
                print(response)

asyncio.run(wsV2_example())

How was it fixed?

Thanks to JSON-RPC 2.0 specs, responses are required to return the id of the request that asked for the response. If we keep the request id and pass it to the method we use to retrieve and process a response, we can wait until this particular response is received and return that. It should take no longer than a call_timeout. While we look for the response, we need to store every response that came in before it in a transient cache that will then be emptied on each __anext__() call before we pull anything from the socket again.

  • Created a RequestProcessor class that is strapped to PersistentConnectionProvider classes to process two types of caches now (both transient, in the sense that they only store information until it's needed and that time window shouldn't be too long):
    • The RequestInformation cache that already existed. This cache is responsible for storing a request's information so we can have that context and middleware methods to pipe the answer to the request through once we process it asynchronously.
    • A cache that stores raw responses from the open websocket connection that may come in before the response for the original request comes in. This cache gets emptied in a FIFO manner before we listen to any new messages from the websocket so that we don't lose any response that were received within that window.
  • Refactored matching responses to request ids so that it exists within the make_request() method of the WebsocketProviderV2 class. This allows us to actually return a response that matches the request id within the same method, keeping types happy throughout the code base.
  • Internalize the provider.ws websocket connection (as provider._ws) and add a public API class (WebsocketConnection) for interacting with the websocket on the _PersistentConnectionWeb3 class with more predictable behavior and formatting support, etc.
  • Other refactoring along the way.

Todo:

  • Add entry to the release notes
  • Make provider.ws internal (provider._ws) and have any "listen" to ws method that is public look in the raw response cache first and process those responses in FIFO before actually calling ws.recv() again. This will be important to document since a user might want to directly call recv() on the websocket connection without knowing there may be cached responses ahead of any new responses.
  • Missing a cute animal picture below 😱
  • Manual testing (for now) of the new request processor transient caches and the logic therein

Cute Animal Picture

20230914_125619

fselmo added a commit to fselmo/web3.py that referenced this pull request Sep 14, 2023
@fselmo fselmo marked this pull request as ready for review September 14, 2023 18:55
fselmo added a commit to fselmo/web3.py that referenced this pull request Sep 14, 2023
- Create a request processor class for websocket connections with two caches. One cache for storing request information to process a response with the correct formatters and middleware asynchronously (already existed but moved into this new class). The other cache was added to make sure that a request receives the answer it is asking for by matching the request `id` with the response `id`. Any responses received between then are cached in an OrderedDict / SimpleCache FIFO queue and processed in order before the next response.

- Some major refactoring and organization would still be ideal. This was a rough dump of ideas to get it working well.

- Add `yParity` to TRANSACTION_RESULT_FORMATTERS as that led to some issues with formatting. Perhaps `either_set_is_a_subset` may become too restrictive in the future.
- Add a percentage flag for how much of either set should be present in the other set.
- Add `yParity` field to `TRANSACTION_RESULT_FORMATTERS` because this is what caused the whole of transaction formatting to fail since it existed in one set and not the other.
- Look for a 90% match between a subscription response and its formatters, rather than requiring either to be a full subset of the other.
fselmo added a commit to fselmo/web3.py that referenced this pull request Sep 14, 2023
web3/main.py Outdated Show resolved Hide resolved
fselmo added a commit to fselmo/web3.py that referenced this pull request Sep 14, 2023
- Return the request ``id`` from the websocket_v2 provider so we know what response to match it to. This is a bit hacky for now because it will require a whole refactor of what the method ``make_request`` returns. This is an attempt to just get things working before said refactor.
- Move logic for matching response to the request into the provider class.
- Move ThreadPoolExecutor and cache locking into provider as well (from the _PersistentConnectionWeb3 class). This shouldn've been the case from the start.
- We now do return the appropriate RPCResponse for the request within the ``make_request()`` method and so we don't have to work through the typing differences anymore.
- Fix async middleware that return responses to actually return the response and pass it to the next middleware so we get a response object through the pipe.
- Internalize the direct websocket connection on the provider to avoid direct usage of the websocket library which may break expectated results: `provider.ws` -> `provider._ws`
- Create a public API on the `_PersistentConnectionWeb3` class via the `WebsocketConnection` class, with a useful UX for interacting with the websocket connection and getting expected results, and attach this to the `_PersistentConnectionWeb3` instance as `w3.ws`.
fselmo added a commit to fselmo/web3.py that referenced this pull request Sep 14, 2023
@reedsa
Copy link
Contributor

reedsa commented Sep 14, 2023

Nothing blocking here, I haven't yet tested though. Looks great, love the separate commits and clean messages 💯

fselmo added a commit to fselmo/web3.py that referenced this pull request Sep 15, 2023
fselmo added a commit to fselmo/web3.py that referenced this pull request Sep 15, 2023
- Return the formatted `params` object for `eth_subscription` messages. This includes the subscription `id` which should remove ambiguity from messages if subscribed to multiple subscriptions.
Copy link
Contributor

@pacrob pacrob left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@fselmo fselmo merged commit d6d1084 into ethereum:main Sep 19, 2023
84 checks passed
fselmo added a commit that referenced this pull request Sep 19, 2023
@fselmo fselmo deleted the websockets-v2-update branch September 19, 2023 18:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants