Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix spurious LocalProtocolError errors when processing pipelined requests #2243

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions tests/protocols/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ def set_protocol(self, protocol):
pass


class MockTimerHandle:
def __init__(self, loop_later_list, delay, callback, args):
self.loop_later_list = loop_later_list
self.delay = delay
self.callback = callback
self.args = args
self.cancelled = False

def cancel(self):
if not self.cancelled:
self.cancelled = True
self.loop_later_list.remove(self)


class MockLoop:
def __init__(self):
self._tasks = []
Expand All @@ -186,18 +200,20 @@ def create_task(self, coroutine):
return MockTask()

def call_later(self, delay, callback, *args):
self._later.insert(0, (delay, callback, args))
handle = MockTimerHandle(self._later, delay, callback, args)
self._later.insert(0, handle)
return handle

async def run_one(self):
return await self._tasks.pop()

def run_later(self, with_delay):
later = []
for delay, callback, args in self._later:
if with_delay >= delay:
callback(*args)
for timer_handle in self._later:
if with_delay >= timer_handle.delay:
timer_handle.callback(*timer_handle.args)
else:
later.append((delay, callback, args))
later.append(timer_handle)
self._later = later


Expand Down Expand Up @@ -315,6 +331,32 @@ async def test_keepalive_timeout(http_protocol_cls: HTTPProtocol):
assert protocol.transport.is_closing()


@pytest.mark.anyio
async def test_keepalive_timeout_with_pipelined_requests(
http_protocol_cls: HTTPProtocol,
):
app = Response("Hello, world", media_type="text/plain")

protocol = get_connected_protocol(app, http_protocol_cls)
protocol.data_received(SIMPLE_GET_REQUEST)
protocol.data_received(SIMPLE_GET_REQUEST)

# After processing the first request, the keep-alive task should be
# disabled because the second request is not responded yet.
await protocol.loop.run_one()
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
assert b"Hello, world" in protocol.transport.buffer
assert protocol.timeout_keep_alive_task is None

# Process the second request and ensure that the keep-alive task
# has been enabled again as the connection is now idle.
protocol.transport.clear_buffer()
await protocol.loop.run_one()
assert b"HTTP/1.1 200 OK" in protocol.transport.buffer
assert b"Hello, world" in protocol.transport.buffer
assert protocol.timeout_keep_alive_task is not None


@pytest.mark.anyio
async def test_close(http_protocol_cls: HTTPProtocol):
app = Response(b"", status_code=204, headers={"connection": "close"})
Expand Down
8 changes: 8 additions & 0 deletions uvicorn/protocols/http/h11_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ def handle_events(self) -> None:
else:
app = self.app

# When starting to process a request, disable the keep-alive
# timeout. Normally we disable this when receiving data from
# client and set back when finishing processing its request.
# However, for pipelined requests processing finishes after
# already receiving the next request and thus the timer may
# be set here, which we don't want.
self._unset_keepalive_if_required()

self.cycle = RequestResponseCycle(
scope=self.scope,
conn=self.conn,
Expand Down
12 changes: 6 additions & 6 deletions uvicorn/protocols/http/httptools_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,22 +326,22 @@ def on_response_complete(self) -> None:
if self.transport.is_closing():
return

# Set a short Keep-Alive timeout.
self._unset_keepalive_if_required()

self.timeout_keep_alive_task = self.loop.call_later(
self.timeout_keep_alive, self.timeout_keep_alive_handler
)
Comment on lines -332 to -334
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not failing on httptools implementation, is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change, the unit test that I added would be failing – it finds self.timeout_keep_alive_task set after handling the first request, before handling the second, pipelined one.

But let me see on a real application with the reproduction script that I posted in #1637.

Copy link
Contributor Author

@marcinsulikowski marcinsulikowski Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the test script for the httptools implementation:

import asyncio

import uvicorn
from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
    await asyncio.sleep(3)
    return {"msg": "Hello World"}


async def main():
    # Start uvicorn in a background task
    config = uvicorn.Config(app, port=8000, timeout_keep_alive=1, http="httptools")
    server = uvicorn.Server(config)
    uvicorn_task = asyncio.create_task(server.serve())

    # After it starts, try making two HTTP requests.
    await asyncio.sleep(1)
    print("Sending requests")
    reader, writer = await asyncio.open_connection("localhost", 8000)
    writer.write(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n")
    writer.write(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n")
    await writer.drain()
    while data := await reader.read(1000):
        print(data.decode("utf-8"))
    print("Server closed the connection")
    server.should_exit = True
    await uvicorn_task


if __name__ == "__main__":
    asyncio.run(main())

Without my changes, the client receives a response for its first request and then the connection is gracefully closed despite the second request still being in progress:

INFO:     Started server process [260529]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Sending requests
INFO:     127.0.0.1:59558 - "GET / HTTP/1.1" 200 OK
HTTP/1.1 200 OK
date: Fri, 09 Feb 2024 22:48:32 GMT
server: uvicorn
content-length: 21
content-type: application/json

{"msg":"Hello World"}                # One second pause after this line is printed
Server closed the connection
INFO:     Shutting down
INFO:     Waiting for background tasks to complete. (CTRL+C to force quit)
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [260529]

The httptools implementation doesn't have the same state checks as h11 so it doesn't notice that it is closing a connection which is still needed and doesn't print the ugly stack traces, but this is still not the correct behavior.

With my changes from this PR, the client gets two responses, as expected, and the connection is closed a second after getting the second one, also as expected:

INFO:     Started server process [261908]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Sending requests
INFO:     127.0.0.1:54282 - "GET / HTTP/1.1" 200 OK
HTTP/1.1 200 OK
date: Fri, 09 Feb 2024 22:51:44 GMT
server: uvicorn
content-length: 21
content-type: application/json

{"msg":"Hello World"}
INFO:     127.0.0.1:54282 - "GET / HTTP/1.1" 200 OK
HTTP/1.1 200 OK
date: Fri, 09 Feb 2024 22:51:44 GMT
server: uvicorn
content-length: 21
content-type: application/json

{"msg":"Hello World"}
Server closed the connection
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [261908]


# Unpause data reads if needed.
self.flow.resume_reading()

# Unblock any pipelined events.
# Unblock any pipelined events. If there are none, arm the
# Keep-Alive timeout instead.
if self.pipeline:
cycle, app = self.pipeline.pop()
task = self.loop.create_task(cycle.run_asgi(app))
task.add_done_callback(self.tasks.discard)
self.tasks.add(task)
else:
self.timeout_keep_alive_task = self.loop.call_later(
self.timeout_keep_alive, self.timeout_keep_alive_handler
)

def shutdown(self) -> None:
"""
Expand Down