Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Close ijson coroutines ourselves instead of letting the GC close them #12875

Merged
merged 2 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12875.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Explicitly close `ijson` coroutines once we are done with them, instead of leaving the garbage collector to close them.
9 changes: 7 additions & 2 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
def __init__(self, room_version: RoomVersion, v1_api: bool):
self._response = SendJoinResponse([], [], event_dict={})
self._room_version = room_version
self._coros = []
self._coros: List[Generator[None, bytes, None]] = []

# The V1 API has the shape of `[200, {...}]`, which we handle by
# prefixing with `item.*`.
Expand Down Expand Up @@ -1411,6 +1411,9 @@ def write(self, data: bytes) -> int:
return len(data)

def finish(self) -> SendJoinResponse:
for c in self._coros:
c.close()
Comment on lines +1414 to +1415
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This'll only close the first ijson coroutine if it raises an IncompleteJSONError.


if self._response.event_dict:
self._response.event = make_event_from_dict(
self._response.event_dict, self._room_version
Expand All @@ -1430,7 +1433,7 @@ class _StateParser(ByteParser[StateRequestResponse]):
def __init__(self, room_version: RoomVersion):
self._response = StateRequestResponse([], [])
self._room_version = room_version
self._coros = [
self._coros: List[Generator[None, bytes, None]] = [
ijson.items_coro(
_event_list_parser(room_version, self._response.state),
"pdus.item",
Expand All @@ -1449,4 +1452,6 @@ def write(self, data: bytes) -> int:
return len(data)

def finish(self) -> StateRequestResponse:
for c in self._coros:
c.close()
return self._response
11 changes: 11 additions & 0 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ async def _handle_response(
if max_response_size is None:
max_response_size = MAX_RESPONSE_SIZE

finished = False
try:
check_content_type_is(response.headers, parser.CONTENT_TYPE)

Expand All @@ -233,6 +234,7 @@ async def _handle_response(

length = await make_deferred_yieldable(d)

finished = True
value = parser.finish()
except BodyExceededMaxSize as e:
# The response was too big.
Expand Down Expand Up @@ -283,6 +285,15 @@ async def _handle_response(
e,
)
raise
finally:
if not finished:
# There was an exception and we didn't `finish()` the parse.
# Let the parser know that it can free up any resources.
try:
parser.finish()
except Exception:
# Ignore any additional exceptions.
pass

time_taken_secs = reactor.seconds() - start_ms / 1000

Expand Down