Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Combine LruCache.invalidate and invalidate_many (#9973)
Browse files Browse the repository at this point in the history
* Make `invalidate` and `invalidate_many` do the same thing

... so that we can do either over the invalidation replication stream, and also
because they always confused me a bit.

* Kill off `invalidate_many`

* changelog
  • Loading branch information
richvdh committed May 27, 2021
1 parent f42e4c4 commit 224f2f9
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 52 deletions.
1 change: 1 addition & 0 deletions changelog.d/9973.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make `LruCache.invalidate` support tree invalidation, and remove `invalidate_many`.
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _invalidate_caches_for_devices(self, token, rows):
if row.entity.startswith("@"):
self._device_list_stream_cache.entity_has_changed(row.entity, token)
self.get_cached_devices_for_user.invalidate((row.entity,))
self._get_cached_user_device.invalidate_many((row.entity,))
self._get_cached_user_device.invalidate((row.entity,))
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))

else:
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _invalidate_caches_for_event(

self.get_latest_event_ids_in_room.invalidate((room_id,))

self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))

if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
Expand All @@ -184,8 +184,8 @@ def _invalidate_caches_for_event(
self.get_invited_rooms_for_local_user.invalidate((state_key,))

if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
self.get_relations_for_event.invalidate((relates_to,))
self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))

async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ def _update_remote_device_list_cache_txn(
)

txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
txn.call_after(self._get_cached_user_device.invalidate, (user_id,))
txn.call_after(
self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def _remove_old_push_actions_before_txn(
not be deleted.
"""
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1748,9 +1748,9 @@ def _handle_event_relations(self, txn, event):
},
)

txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,))
txn.call_after(self.store.get_relations_for_event.invalidate, (parent_id,))
txn.call_after(
self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
self.store.get_aggregation_groups_for_event.invalidate, (parent_id,)
)

if rel_type == RelationTypes.REPLACE:
Expand Down Expand Up @@ -1903,7 +1903,7 @@ def _set_push_actions_for_event_and_users_txn(

for user_id in user_ids:
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

Expand All @@ -1917,7 +1917,7 @@ def _set_push_actions_for_event_and_users_txn(
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id,),
)
txn.execute(
Expand Down
6 changes: 2 additions & 4 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def _invalidate_get_users_with_receipts_in_room(

def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
self._get_linearized_receipts_for_room.invalidate_many((room_id,))
self._get_linearized_receipts_for_room.invalidate((room_id,))
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
Expand Down Expand Up @@ -659,9 +659,7 @@ def insert_graph_receipt_txn(
)
txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type))
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(
self._get_linearized_receipts_for_room.invalidate_many, (room_id,)
)
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))

self.db_pool.simple_delete_txn(
txn,
Expand Down
42 changes: 16 additions & 26 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,7 @@

import enum
import threading
from typing import (
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
TypeVar,
Union,
cast,
)
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, Union

from prometheus_client import Gauge

Expand Down Expand Up @@ -91,7 +82,7 @@ def __init__(
# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
self._pending_deferred_cache = (
cache_type()
) # type: MutableMapping[KT, CacheEntry]
) # type: Union[TreeCache, MutableMapping[KT, CacheEntry]]

def metrics_cb():
cache_pending_metric.labels(name).set(len(self._pending_deferred_cache))
Expand Down Expand Up @@ -287,8 +278,17 @@ def prefill(
self.cache.set(key, value, callbacks=callbacks)

def invalidate(self, key):
"""Delete a key, or tree of entries
If the cache is backed by a regular dict, then "key" must be of
the right type for this cache
If the cache is backed by a TreeCache, then "key" must be a tuple, but
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
self.check_thread()
self.cache.pop(key, None)
self.cache.del_multi(key)

# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, which will (a) stop it being returned
Expand All @@ -299,20 +299,10 @@ def invalidate(self, key):
# run the invalidation callbacks now, rather than waiting for the
# deferred to resolve.
if entry:
entry.invalidate()

def invalidate_many(self, key: KT):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
key = cast(KT, key)
self.cache.del_multi(key)

# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, as above
entry_dict = self._pending_deferred_cache.pop(key, None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
# _pending_deferred_cache.pop should either return a CacheEntry, or, in the
# case of a TreeCache, a dict of keys to cache entries. Either way calling
# iterate_tree_cache_entry on it will do the right thing.
for entry in iterate_tree_cache_entry(entry):
entry.invalidate()

def invalidate_all(self):
Expand Down
8 changes: 6 additions & 2 deletions synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
class _CachedFunction(Generic[F]):
invalidate = None # type: Any
invalidate_all = None # type: Any
invalidate_many = None # type: Any
prefill = None # type: Any
cache = None # type: Any
num_args = None # type: Any
Expand Down Expand Up @@ -262,6 +261,11 @@ def __init__(
):
super().__init__(orig, num_args=num_args, cache_context=cache_context)

if tree and self.num_args < 2:
raise RuntimeError(
"tree=True is nonsensical for cached functions with a single parameter"
)

self.max_entries = max_entries
self.tree = tree
self.iterable = iterable
Expand Down Expand Up @@ -302,11 +306,11 @@ def _wrapped(*args, **kwargs):
wrapped = cast(_CachedFunction, _wrapped)

if self.num_args == 1:
assert not self.tree
wrapped.invalidate = lambda key: cache.invalidate(key[0])
wrapped.prefill = lambda key, val: cache.prefill(key[0], val)
else:
wrapped.invalidate = cache.invalidate
wrapped.invalidate_many = cache.invalidate_many
wrapped.prefill = cache.prefill

wrapped.invalidate_all = cache.invalidate_all
Expand Down
18 changes: 11 additions & 7 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ class LruCache(Generic[KT, VT]):
"""
Least-recently-used cache, supporting prometheus metrics and invalidation callbacks.
Supports del_multi only if cache_type=TreeCache
If cache_type=TreeCache, all keys must be tuples.
"""

Expand Down Expand Up @@ -393,10 +392,16 @@ def cache_pop(key: KT, default: Optional[T] = None):

@synchronized
def cache_del_multi(key: KT) -> None:
"""Delete an entry, or tree of entries
If the LruCache is backed by a regular dict, then "key" must be of
the right type for this cache
If the LruCache is backed by a TreeCache, then "key" must be a tuple, but
may be of lower cardinality than the TreeCache - in which case the whole
subtree is deleted.
"""
This will only work if constructed with cache_type=TreeCache
"""
popped = cache.pop(key)
popped = cache.pop(key, None)
if popped is None:
return
# for each deleted node, we now need to remove it from the linked list
Expand Down Expand Up @@ -430,11 +435,10 @@ def cache_contains(key: KT) -> bool:
self.set = cache_set
self.setdefault = cache_set_default
self.pop = cache_pop
self.del_multi = cache_del_multi
# `invalidate` is exposed for consistency with DeferredCache, so that it can be
# invalidated by the cache invalidation replication stream.
self.invalidate = cache_pop
if cache_type is TreeCache:
self.del_multi = cache_del_multi
self.invalidate = cache_del_multi
self.len = synchronized(cache_len)
self.contains = cache_contains
self.clear = cache_clear
Expand Down
3 changes: 3 additions & 0 deletions synapse/util/caches/treecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def pop(self, key, default=None):
value. If the key is partial, the TreeCacheNode corresponding to the part
of the tree that was removed.
"""
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))

# a list of the nodes we have touched on the way down the tree
nodes = []

Expand Down
6 changes: 3 additions & 3 deletions tests/util/caches/test_descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,17 +622,17 @@ def func2(self, key, cache_context):
self.assertEquals(callcount2[0], 1)

a.func2.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 1)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 1)

yield a.func2("foo")
a.func2.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 2)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 2)

self.assertEquals(callcount[0], 1)
self.assertEquals(callcount2[0], 2)

a.func.invalidate(("foo",))
self.assertEquals(a.func2.cache.cache.pop.call_count, 3)
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 3)
yield a.func("foo")

self.assertEquals(callcount[0], 2)
Expand Down

0 comments on commit 224f2f9

Please sign in to comment.