Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add CPU metrics for _fetch_event_list #3497

Merged
merged 2 commits into from
Jul 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3497.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add CPU metrics for _fetch_event_list
51 changes: 32 additions & 19 deletions synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,25 +222,39 @@ def _do_fetch(self, conn):
"""Takes a database connection and waits for requests for events from
the _event_fetch_list queue.
"""
event_list = []
i = 0
while True:
try:
with self._event_fetch_lock:
event_list = self._event_fetch_list
self._event_fetch_list = []

if not event_list:
single_threaded = self.database_engine.single_threaded
if single_threaded or i > EVENT_QUEUE_ITERATIONS:
self._event_fetch_ongoing -= 1
return
else:
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
i += 1
continue
i = 0
with self._event_fetch_lock:
event_list = self._event_fetch_list
self._event_fetch_list = []

if not event_list:
single_threaded = self.database_engine.single_threaded
if single_threaded or i > EVENT_QUEUE_ITERATIONS:
self._event_fetch_ongoing -= 1
return
else:
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
i += 1
continue
i = 0

self._fetch_event_list(conn, event_list)

def _fetch_event_list(self, conn, event_list):
"""Handle a load of requests from the _event_fetch_list queue

Args:
conn (twisted.enterprise.adbapi.Connection): database connection

event_list (list[Tuple[list[str], Deferred]]):
The fetch requests. Each entry consists of a list of event
ids to be fetched, and a deferred to be completed once the
events have been fetched.

"""
with Measure(self._clock, "_fetch_event_list"):
try:
event_id_lists = zip(*event_list)[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
Expand Down Expand Up @@ -280,9 +294,8 @@ def fire(evs):
with PreserveLoggingContext():
d.errback(e)

if event_list:
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list)
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list)

@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
Expand Down