Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Encode JSON responses on a thread in C #10844

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions changelog.d/10844.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up responding with large JSON objects to requests.
76 changes: 59 additions & 17 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import urllib
from http import HTTPStatus
from inspect import isawaitable
from io import BytesIO
from typing import (
Any,
Awaitable,
Expand All @@ -37,15 +36,15 @@
)

import jinja2
from canonicaljson import iterencode_canonical_json
from canonicaljson import encode_canonical_json
from typing_extensions import Protocol
from zope.interface import implementer

from twisted.internet import defer, interfaces
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
from twisted.web.static import File, NoRangeStaticProducer
from twisted.web.static import File
from twisted.web.util import redirectTo

from synapse.api.errors import (
Expand All @@ -56,10 +55,11 @@
UnrecognizedRequestError,
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import preserve_fn
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -320,7 +320,7 @@ def __init__(self, canonical_json=False, extract_context=False):

def _send_response(
self,
request: Request,
request: SynapseRequest,
code: int,
response_object: Any,
):
Expand Down Expand Up @@ -620,16 +620,15 @@ def stopProducing(self) -> None:
self._request = None


def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
def _encode_json_bytes(json_object: Any) -> bytes:
"""
Encode an object into JSON. Returns an iterator of bytes.
"""
for chunk in json_encoder.iterencode(json_object):
yield chunk.encode("utf-8")
return json_encoder.encode(json_object).encode("utf-8")


def respond_with_json(
request: Request,
request: SynapseRequest,
code: int,
json_object: Any,
send_cors: bool = False,
Expand Down Expand Up @@ -659,7 +658,7 @@ def respond_with_json(
return None

if canonical_json:
encoder = iterencode_canonical_json
encoder = encode_canonical_json
else:
encoder = _encode_json_bytes

Expand All @@ -670,7 +669,9 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)

_ByteProducer(request, encoder(json_object))
run_in_background(
_async_write_json_to_request_in_thread, request, encoder, json_object
)
return NOT_DONE_YET


Expand Down Expand Up @@ -706,15 +707,35 @@ def respond_with_json_bytes(
if send_cors:
set_cors_headers(request)

# note that this is zero-copy (the bytesio shares a copy-on-write buffer with
# the original `bytes`).
bytes_io = BytesIO(json_bytes)

producer = NoRangeStaticProducer(request, bytes_io)
producer.start()
_write_json_bytes_to_request(request, json_bytes)
Comment on lines -709 to +710
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this changing in this PR? It seems unrelated to how we encode JSON responses?

return NOT_DONE_YET


def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this specific to being json bytes, or could it be used for any byte sequence?

"""Writes the JSON bytes to the request using an appropriate producer.

Note: This should be used instead of `Request.write` to correctly handle
large response bodies.
"""

# The problem with dumping all of the json response into the `Request`
# object at once (via `Request.write`) is that doing so starts the timeout
# for the next request to be received: so if it takes longer than 60s to
# stream back the response to the client, the client never gets it.
#
# The correct solution is to use a Producer; then the timeout is only
# started once all of the content is sent over the TCP connection.

# To make sure we don't write the whole of the json at once we split it up
# into chunks.
chunk_size = 4096
bytes_generator = chunk_seq(json_bytes, chunk_size)

# We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the
# unit tests can't cope with being given a pull producer.
_ByteProducer(request, bytes_generator)


def set_cors_headers(request: Request):
"""Set the CORS headers so that javascript running in a web browsers can
use this API
Expand Down Expand Up @@ -809,3 +830,24 @@ def finish_request(request: Request):
request.finish()
except RuntimeError as e:
logger.info("Connection disconnected before response was written: %r", e)


async def _async_write_json_to_request_in_thread(
request: SynapseRequest,
json_encoder: Callable[[Any], bytes],
json_object: Any,
):
"""Encodes the given JSON object on a thread and then writes it to the
request.

This is done so that encoding large JSON objects doesn't block the reactor
thread.

Note: We don't use JsonEncoder.iterencode here as that falls back to the
Python implementation (rather than the C backend), which is *much* more
expensive.
Comment on lines +847 to +848
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and presumably holds the global interpreter lock, making thread pooling it useless?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ja

"""

json_str = await defer_to_thread(request.reactor, json_encoder, json_object)

_write_json_bytes_to_request(request, json_str)
Loading