Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Increase perf of handling concurrent use of StreamIDGenerators. #9190

Merged
merged 4 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9190.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of concurrent use of `StreamIDGenerators`.
21 changes: 13 additions & 8 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
import heapq
import logging
import threading
from collections import deque
from collections import OrderedDict
from contextlib import contextmanager
from typing import Dict, List, Optional, Set, Tuple, Union

import attr
from typing_extensions import Deque

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
Expand Down Expand Up @@ -101,7 +100,13 @@ def __init__(self, db_conn, table, column, extra_tables=[], step=1):
self._current = (max if step > 0 else min)(
self._current, _load_current_id(db_conn, table, column, step)
)
self._unfinished_ids = deque() # type: Deque[int]

# We use this as an ordered set, as we want to efficiently append items,
# remove items and get the first item. Since we insert IDs in order, the
# insertion ordering will ensure its in the correct ordering.
#
# The key and values are the same, but we never look at the values.
self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrderedDict is ordered by insertion order rather than key, iirc. Is that what you intend here? (possibly it amounts to the same thing?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, they amount to the same thing (and is the same situation as with deque). Will expand the comment


def get_next(self):
"""
Expand All @@ -113,15 +118,15 @@ def get_next(self):
self._current += self._step
next_id = self._current

self._unfinished_ids.append(next_id)
self._unfinished_ids[next_id] = next_id

@contextmanager
def manager():
try:
yield next_id
finally:
with self._lock:
self._unfinished_ids.remove(next_id)
self._unfinished_ids.pop(next_id)

return _AsyncCtxManagerWrapper(manager())

Expand All @@ -140,7 +145,7 @@ def get_next_mult(self, n):
self._current += n * self._step

for next_id in next_ids:
self._unfinished_ids.append(next_id)
self._unfinished_ids[next_id] = next_id

@contextmanager
def manager():
Expand All @@ -149,7 +154,7 @@ def manager():
finally:
with self._lock:
for next_id in next_ids:
self._unfinished_ids.remove(next_id)
self._unfinished_ids.pop(next_id)

return _AsyncCtxManagerWrapper(manager())

Expand All @@ -162,7 +167,7 @@ def get_current_token(self) -> int:
"""
with self._lock:
if self._unfinished_ids:
return self._unfinished_ids[0] - self._step
return next(iter(self._unfinished_ids)) - self._step

return self._current

Expand Down