Skip to content

Commit

Permalink
fix(WebSocket): handle OSError upon send() + fix `max_receive_que…
Browse files Browse the repository at this point in the history
…ue == 0` (#2324)

* feat(WebSocket): handle `OSError` upon `send()`

* fix(WebSocket): fix the `max_receive_queue == 0` case (WiP)

* chore: do not build rapidjson on PyPy

* test(WebSocket): add tests for the max_receive_queue==0 case

* docs(ws): revise "Lost Connections" in the light of ASGI WS spec 2.4

* docs: market this as bugfix instead

* test(WS): add a zero receive queue test with real servers

* docs(WS): polish newsfragment

* docs(WS): tone down inline comment
  • Loading branch information
vytas7 authored Sep 17, 2024
1 parent 87e0454 commit b29fd55
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 38 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pip-log.txt
.ecosystem
.tox
.pytest_cache
downloaded_files/
geckodriver.log
htmlcov
nosetests.xml
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ For your convenience, wheels containing pre-compiled binaries are available
from PyPI for the majority of common platforms. Even if a binary build for your
platform of choice is not available, ``pip`` will pick a pure-Python wheel.
You can also cythonize Falcon for your environment; see our
`Installation docs <https://falcon.readthedocs.io/en/stable/user/install.html>`__.
`Installation docs <https://falcon.readthedocs.io/en/stable/user/install.html>`__
for more information on this and other advanced options.

Dependencies
Expand Down
16 changes: 16 additions & 0 deletions docs/_newsfragments/2292.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Falcon will now raise an instance of
:class:`~falcon.errors.WebSocketDisconnected` from the :class:`OSError` that
the ASGI server signals in the case of a disconnected client (as per
the `ASGI HTTP & WebSocket protocol
<https://asgi.readthedocs.io/en/latest/specs/www.html#id2>`__ version ``2.4``).
It is worth noting though that Falcon's
:ref:`built-in receive buffer <ws_lost_connection>` normally detects the
``websocket.disconnect`` event itself prior the potentially failing attempt to
``send()``.

Disabling this built-in receive buffer (by setting
:attr:`~falcon.asgi.WebSocketOptions.max_receive_queue` to ``0``) was also
found to interfere with receiving ASGI WebSocket messages in an unexpected
way. The issue has been fixed so that setting this option to ``0`` now properly
bypasses the buffer altogether, and extensive test coverage has been added for
validating this scenario.
48 changes: 26 additions & 22 deletions docs/api/websocket.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,32 +112,36 @@ Lost Connections
----------------

When the app attempts to receive a message from the client, the ASGI server
emits a `disconnect` event if the connection has been lost for any reason. Falcon
surfaces this event by raising an instance of :class:`~.WebSocketDisconnected`
to the caller.

On the other hand, the ASGI spec requires the ASGI server to silently consume
messages sent by the app after the connection has been lost (i.e., it should
not be considered an error). Therefore, an endpoint that primarily streams
outbound events to the client might continue consuming resources unnecessarily
for some time after the connection is lost.
emits a ``disconnect`` event if the connection has been lost for any
reason. Falcon surfaces this event by raising an instance of
:class:`~.WebSocketDisconnected` to the caller.

On the other hand, the ASGI spec previously required the ASGI server to
silently consume messages sent by the app after the connection has been lost
(i.e., it should not be considered an error). Therefore, an endpoint that
primarily streams outbound events to the client could continue consuming
resources unnecessarily for some time after the connection is lost.
This aspect has been rectified in the ASGI HTTP spec version ``2.4``,
and calling ``send()`` on a closed connection should now raise an
error. Unfortunately, not all ASGI servers have adopted this new behavior
uniformly yet.

As a workaround, Falcon implements a small incoming message queue that is used
to detect a lost connection and then raise an instance of
:class:`~.WebSocketDisconnected` to the caller the next time it attempts to send
a message.

This workaround is only necessary when the app itself does not consume messages
from the client often enough to quickly detect when the connection is lost.
Otherwise, Falcon's receive queue can be disabled for a slight performance boost
by setting :attr:`~falcon.asgi.WebSocketOptions.max_receive_queue` to ``0`` via
:class:`~.WebSocketDisconnected` to the caller the next time it attempts to
send a message.
If your ASGI server of choice adheres to the spec version ``2.4``, this receive
queue can be safely disabled for a slight performance boost by setting
:attr:`~falcon.asgi.WebSocketOptions.max_receive_queue` to ``0`` via
:attr:`~falcon.asgi.App.ws_options`.

Note also that some ASGI server implementations do not strictly follow the ASGI
spec in this regard, and in fact will raise an error when the app attempts to
send a message after the client disconnects. If testing reveals this to be the
case for your ASGI server of choice, Falcon's own receive queue can be safely
disabled.
(We may revise this setting, and disable the queue by default in the future if
our testing indicates that all major ASGI servers have caught up with the
spec.)

Furthermore, even on non-compliant or older ASGI servers, this workaround is
only necessary when the app itself does not consume messages from the client
often enough to quickly detect when the connection is lost.
Otherwise, Falcon's receive queue can also be disabled as described above.

.. _ws_error_handling:

Expand Down
4 changes: 4 additions & 0 deletions e2e-tests/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
def create_app() -> falcon.asgi.App:
app = falcon.asgi.App()

# NOTE(vytas): E2E tests run Uvicorn, and the latest versions support ASGI
# HTTP/WSspec ver 2.4, so buffering on our side should not be needed.
app.ws_options.max_receive_queue = 0

hub = Hub()
app.add_route('/ping', Pong())
app.add_route('/sse', Events(hub))
Expand Down
48 changes: 45 additions & 3 deletions falcon/asgi/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import collections
from enum import auto
from enum import Enum
import re
from typing import Any, Deque, Dict, Iterable, Mapping, Optional, Tuple, Union

from falcon import errors
Expand All @@ -28,6 +29,9 @@ class _WebSocketState(Enum):
CLOSED = auto()


_CLIENT_DISCONNECTED_CAUSE = re.compile(r'received (\d\d\d\d)')


class WebSocket:
"""Represents a single WebSocket connection with a client."""

Expand All @@ -47,6 +51,8 @@ class WebSocket:
'subprotocols',
)

_asgi_receive: AsgiReceive
_asgi_send: AsgiSend
_state: _WebSocketState
_close_code: Optional[int]
subprotocols: Tuple[str, ...]
Expand Down Expand Up @@ -81,7 +87,12 @@ def __init__(
# event via one of their receive() calls, and there is no
# need for the added overhead.
self._buffered_receiver = _BufferedReceiver(receive, max_receive_queue)
self._asgi_receive = self._buffered_receiver.receive
if max_receive_queue > 0:
self._asgi_receive = self._buffered_receiver.receive
else:
# NOTE(vytas): Pass through the receive callable bypassing the
# buffered receiver in the case max_receive_queue is set to 0.
self._asgi_receive = receive
self._asgi_send = send

mh_text = media_handlers[WebSocketPayloadType.TEXT]
Expand Down Expand Up @@ -468,6 +479,8 @@ async def _send(self, msg: AsgiSendMsg) -> None:
if self._buffered_receiver.client_disconnected:
self._state = _WebSocketState.CLOSED
self._close_code = self._buffered_receiver.client_disconnected_code

if self._state == _WebSocketState.CLOSED:
raise errors.WebSocketDisconnected(self._close_code)

try:
Expand All @@ -483,7 +496,16 @@ async def _send(self, msg: AsgiSendMsg) -> None:

translated_ex = self._translate_webserver_error(ex)
if translated_ex:
raise translated_ex
# NOTE(vytas): Mark WebSocket as closed if we catch an error
# upon sending. This is useful when not using the buffered
# receiver, and not receiving anything at the given moment.
self._state = _WebSocketState.CLOSED
if isinstance(translated_ex, errors.WebSocketDisconnected):
self._close_code = translated_ex.code

# NOTE(vytas): Use the raise from form in order to preserve
# the traceback.
raise translated_ex from ex

# NOTE(kgriffs): Re-raise other errors directly so that we don't
# obscure the traceback.
Expand Down Expand Up @@ -529,6 +551,25 @@ def _translate_webserver_error(self, ex: Exception) -> Optional[Exception]:
'WebSocket subprotocol must be from the list sent by the client'
)

# NOTE(vytas): Per ASGI HTTP & WebSocket spec v2.4:
# If send() is called on a closed connection the server should raise
# a server-specific subclass of IOError.
# NOTE(vytas): Uvicorn 0.30.6 seems to conform to the spec only when
# using the wsproto stack, it then raises an instance of
# uvicorn.protocols.utils.ClientDisconnected.
if isinstance(ex, OSError):
close_code = None

# NOTE(vytas): If using the "websockets" backend, Uvicorn raises
# and instance of OSError from a websockets exception like this:
# "received 1001 (going away); then sent 1001 (going away)"
if ex.__cause__:
match = _CLIENT_DISCONNECTED_CAUSE.match(str(ex.__cause__))
if match:
close_code = int(match.group(1))

return errors.WebSocketDisconnected(close_code)

return None


Expand Down Expand Up @@ -679,7 +720,8 @@ def __init__(self, asgi_receive: AsgiReceive, max_queue: int) -> None:
self.client_disconnected_code = None

def start(self) -> None:
if self._pump_task is None:
# NOTE(vytas): Do not start anything if buffering is disabled.
if self._pump_task is None and self._max_queue > 0:
self._pump_task = asyncio.create_task(self._pump())

async def stop(self) -> None:
Expand Down
8 changes: 4 additions & 4 deletions falcon/testing/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,10 +785,10 @@ async def _collect(self, event: Dict[str, Any]):
else:
assert self.closed

# NOTE(kgriffs): According to the ASGI spec, we are
# supposed to just silently eat events once the
# socket is disconnected.
pass
# NOTE(vytas): Tweaked in Falcon 4.0: we now simulate ASGI
# WebSocket protocol 2.4+, raising an instance of OSError upon
# send if the client has already disconnected.
raise falcon_errors.WebSocketDisconnected(self._close_code)

# NOTE(kgriffs): Give whatever is waiting on the handshake or a
# collected data/text event a chance to progress.
Expand Down
24 changes: 24 additions & 0 deletions tests/asgi/_asgi_test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,29 @@ async def on_post(self, req, resp):
resp.status = falcon.HTTP_403


class WSOptions:
_SUPPORTED_KEYS = frozenset(
{'default_close_reasons', 'error_close_code', 'max_receive_queue'}
)

def __init__(self, ws_options):
self._ws_options = ws_options

async def on_get(self, req, resp):
resp.media = {
key: getattr(self._ws_options, key) for key in self._SUPPORTED_KEYS
}

async def on_patch(self, req, resp):
update = await req.get_media()
for key, value in update.items():
if key not in self._SUPPORTED_KEYS:
raise falcon.HTTPInvalidParam('unsupported option', key)
setattr(self._ws_options, key, value)

resp.status = falcon.HTTP_NO_CONTENT


def create_app():
app = falcon.asgi.App()
bucket = Bucket()
Expand All @@ -276,6 +299,7 @@ def create_app():
app.add_route('/forms', Multipart())
app.add_route('/jars', TestJar())
app.add_route('/feeds/{feed_id}', Feed())
app.add_route('/wsoptions', WSOptions(app.ws_options))

app.add_middleware(lifespan_handler)

Expand Down
15 changes: 14 additions & 1 deletion tests/asgi/test_asgi_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,20 @@ async def emitter():
class TestWebSocket:
@pytest.mark.parametrize('explicit_close', [True, False])
@pytest.mark.parametrize('close_code', [None, 4321])
async def test_hello(self, explicit_close, close_code, server_url_events_ws):
@pytest.mark.parametrize('max_receive_queue', [0, 4, 17])
async def test_hello(
self,
explicit_close,
close_code,
max_receive_queue,
server_base_url,
server_url_events_ws,
):
resp = requests.patch(
server_base_url + 'wsoptions', json={'max_receive_queue': max_receive_queue}
)
resp.raise_for_status()

echo_expected = 'Check 1 - \U0001f600'

extra_headers = {'X-Command': 'recv'}
Expand Down
Loading

0 comments on commit b29fd55

Please sign in to comment.