Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix database performance of read/write worker locks
Browse files Browse the repository at this point in the history
We were seeing serialization errors when taking out multiple read locks.

The transactions were retried, so isn't causing any failures.

Introduced in #15782.
  • Loading branch information
erikjohnston committed Aug 3, 2023
1 parent 9c462f1 commit 3369ff1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 55 deletions.
87 changes: 35 additions & 52 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -96,6 +95,10 @@ def __init__(

self._acquiring_locks: Set[Tuple[str, str]] = set()

self._clock.looping_call(
self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0
)

@wrap_as_background_process("LockStore._on_shutdown")
async def _on_shutdown(self) -> None:
"""Called when the server is shutting down"""
Expand Down Expand Up @@ -216,6 +219,7 @@ async def try_acquire_read_write_lock(
lock_name,
lock_key,
write,
db_autocommit=True,
)
except self.database_engine.module.IntegrityError:
return None
Expand All @@ -233,61 +237,22 @@ def _try_acquire_read_write_lock_txn(
# `worker_read_write_locks` and seeing if that fails any
# constraints. If it doesn't then we have acquired the lock,
# otherwise we haven't.
#
# Before that though we clear the table of any stale locks.

now = self._clock.time_msec()
token = random_string(6)

delete_sql = """
DELETE FROM worker_read_write_locks
WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
"""

insert_sql = """
INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?, ?)
"""

if isinstance(self.database_engine, PostgresEngine):
# For Postgres we can send these queries at the same time.
txn.execute(
delete_sql + ";" + insert_sql,
(
# DELETE args
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
# UPSERT args
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)
else:
# For SQLite these need to be two queries.
txn.execute(
delete_sql,
(
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
),
)
txn.execute(
insert_sql,
(
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)
self.db_pool.simple_insert_txn(
txn,
table="worker_read_write_locks",
values={
"lock_name": lock_name,
"lock_key": lock_key,
"write_lock": write,
"instance_name": self._instance_name,
"token": token,
"last_renewed_ts": now,
},
)

lock = Lock(
self._reactor,
Expand Down Expand Up @@ -351,6 +316,24 @@ def _try_acquire_multi_read_write_lock_txn(

return locks

@wrap_as_background_process("_reap_stale_read_write_locks")
async def _reap_stale_read_write_locks(self) -> None:
delete_sql = """
DELETE FROM worker_read_write_locks
WHERE last_renewed_ts < ?
"""

def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None:
txn.execute(delete_sql, (self._clock.time_msec() - _LOCK_TIMEOUT_MS,))
if txn.rowcount:
logger.info("Reaped %d stale locks", txn.rowcount)

await self.db_pool.runInteraction(
"_reap_stale_read_write_locks",
reap_stale_read_write_locks_txn,
db_autocommit=True,
)


class Lock:
"""An async context manager that manages an acquired lock, ensuring it is
Expand Down
7 changes: 4 additions & 3 deletions tests/storage/databases/main/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from twisted.internet import defer, reactor
from twisted.internet.base import ReactorBase
from twisted.internet.defer import Deferred
from twisted.test.proto_helpers import MemoryReactor

from synapse.server import HomeServer
from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS
from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL_MS
from synapse.util import Clock

from tests import unittest
Expand Down Expand Up @@ -380,8 +381,8 @@ def test_maintain_lock(self) -> None:
self.get_success(lock.__aenter__())

# Wait for ages with the lock, we should not be able to get the lock.
self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000)
self.pump()
for _ in range(0, 10):
self.reactor.advance((_RENEWAL_INTERVAL_MS / 1000))

lock2 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
Expand Down

0 comments on commit 3369ff1

Please sign in to comment.