Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Increase perf of handling concurrent use of StreamIDGenerators. (#9190)
Browse files Browse the repository at this point in the history
We have seen a failure mode here where if there are many in flight
unfinished IDs then marking an ID as finished takes a lot of CPU (as
calling deque.remove iterates over the list)
  • Loading branch information
erikjohnston authored Jan 21, 2021
1 parent 939ef65 commit 12ec55b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelog.d/9190.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of concurrent use of `StreamIDGenerators`.
21 changes: 13 additions & 8 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
import heapq
import logging
import threading
from collections import deque
from collections import OrderedDict
from contextlib import contextmanager
from typing import Dict, List, Optional, Set, Tuple, Union

import attr
from typing_extensions import Deque

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
Expand Down Expand Up @@ -101,7 +100,13 @@ def __init__(self, db_conn, table, column, extra_tables=[], step=1):
self._current = (max if step > 0 else min)(
self._current, _load_current_id(db_conn, table, column, step)
)
self._unfinished_ids = deque() # type: Deque[int]

# We use this as an ordered set, as we want to efficiently append items,
# remove items and get the first item. Since we insert IDs in order, the
# insertion ordering will ensure its in the correct ordering.
#
# The key and values are the same, but we never look at the values.
self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int]

def get_next(self):
"""
Expand All @@ -113,15 +118,15 @@ def get_next(self):
self._current += self._step
next_id = self._current

self._unfinished_ids.append(next_id)
self._unfinished_ids[next_id] = next_id

@contextmanager
def manager():
try:
yield next_id
finally:
with self._lock:
self._unfinished_ids.remove(next_id)
self._unfinished_ids.pop(next_id)

return _AsyncCtxManagerWrapper(manager())

Expand All @@ -140,7 +145,7 @@ def get_next_mult(self, n):
self._current += n * self._step

for next_id in next_ids:
self._unfinished_ids.append(next_id)
self._unfinished_ids[next_id] = next_id

@contextmanager
def manager():
Expand All @@ -149,7 +154,7 @@ def manager():
finally:
with self._lock:
for next_id in next_ids:
self._unfinished_ids.remove(next_id)
self._unfinished_ids.pop(next_id)

return _AsyncCtxManagerWrapper(manager())

Expand All @@ -162,7 +167,7 @@ def get_current_token(self) -> int:
"""
with self._lock:
if self._unfinished_ids:
return self._unfinished_ids[0] - self._step
return next(iter(self._unfinished_ids)) - self._step

return self._current

Expand Down

0 comments on commit 12ec55b

Please sign in to comment.