Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add DeferredCache.get_immediate method #8568

Merged
merged 4 commits into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8568.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `get_immediate` method to `DeferredCache`.
2 changes: 1 addition & 1 deletion synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,6 @@ class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
# dedupe when we add callbacks to lru cache nodes, otherwise the number
# of callbacks would grow.
def __call__(self):
rules = self.cache.get(self.room_id, None, update_metrics=False)
rules = self.cache.get_immediate(self.room_id, None, update_metrics=False)
if rules:
rules.invalidate_all()
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ async def add_pusher(
lock=False,
)

user_has_pusher = self.get_if_user_has_pusher.cache.get(
user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
(user_id,), None, update_metrics=False
)

Expand Down
11 changes: 1 addition & 10 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache

Expand Down Expand Up @@ -413,18 +412,10 @@ def _invalidate_get_users_with_receipts_in_room(
if receipt_type != "m.read":
return

# Returns either an ObservableDeferred or the raw result
res = self.get_users_with_read_receipts_in_room.cache.get(
res = self.get_users_with_read_receipts_in_room.cache.get_immediate(
room_id, None, update_metrics=False
)

# first handle the ObservableDeferred case
if isinstance(res, ObservableDeferred):
if res.has_called():
res = res.get_result()
else:
res = None

if res and user_id in res:
# We'd only be adding to the set, so no point invalidating if the
# user is already there
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ async def _get_joined_users_from_context(
# If we do then we can reuse that result and simply update it with
# any membership changes in `delta_ids`
if context.prev_group and context.delta_ids:
prev_res = self._get_joined_users_from_context.cache.get(
prev_res = self._get_joined_users_from_context.cache.get_immediate(
(room_id, context.prev_group), None
)
if prev_res and isinstance(prev_res, dict):
Expand Down
35 changes: 25 additions & 10 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@

import enum
import threading
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, cast
from typing import (
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
TypeVar,
Union,
cast,
)

from prometheus_client import Gauge

Expand All @@ -33,7 +42,7 @@
["name"],
)


T = TypeVar("T")
KT = TypeVar("KT")
VT = TypeVar("VT")

Expand Down Expand Up @@ -119,21 +128,21 @@ def check_thread(self):
def get(
self,
key: KT,
default=_Sentinel.sentinel,
callback: Optional[Callable[[], None]] = None,
update_metrics: bool = True,
):
) -> Union[ObservableDeferred, VT]:
"""Looks the key up in the caches.

Args:
key(tuple)
default: What is returned if key is not in the caches. If not
specified then function throws KeyError instead
callback(fn): Gets called when the entry in the cache is invalidated
update_metrics (bool): whether to update the cache hit rate metrics

Returns:
Either an ObservableDeferred or the result itself

Raises:
KeyError if the key is not found in the cache
"""
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
Expand All @@ -145,13 +154,19 @@ def get(
m.inc_hits()
return val.deferred

val = self.cache.get(
key, default, callbacks=callbacks, update_metrics=update_metrics
val2 = self.cache.get(
key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics
)
if val is _Sentinel.sentinel:
if val2 is _Sentinel.sentinel:
raise KeyError()
else:
return val
return val2

def get_immediate(
self, key: KT, default: T, update_metrics: bool = True
) -> Union[VT, T]:
"""If we have a *completed* cached value, return it."""
return self.cache.get(key, default, update_metrics=update_metrics)

def set(
self,
Expand Down
27 changes: 23 additions & 4 deletions tests/util/caches/test_deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ def test_hit(self):

self.assertEquals(cache.get("foo"), 123)

def test_get_immediate(self):
cache = DeferredCache("test")
d1 = defer.Deferred()
cache.set("key1", d1)

# get_immediate should return default
v = cache.get_immediate("key1", 1)
self.assertEqual(v, 1)

# now complete the set
d1.callback(2)

# get_immediate should return result
v = cache.get_immediate("key1", 1)
self.assertEqual(v, 2)

def test_invalidate(self):
cache = DeferredCache("test")
cache.prefill(("foo",), 123)
Expand Down Expand Up @@ -80,17 +96,20 @@ def record_callback(idx):
# now do the invalidation
cache.invalidate_all()

# lookup should return none
self.assertIsNone(cache.get("key1", None))
self.assertIsNone(cache.get("key2", None))
# lookup should fail
with self.assertRaises(KeyError):
cache.get("key1")
with self.assertRaises(KeyError):
cache.get("key2")

# both callbacks should have been callbacked
self.assertTrue(callback_record[0], "Invalidation callback for key1 not called")
self.assertTrue(callback_record[1], "Invalidation callback for key2 not called")

# letting the other lookup complete should do nothing
d1.callback("result1")
self.assertIsNone(cache.get("key1", None))
with self.assertRaises(KeyError):
cache.get("key1", None)

def test_eviction(self):
cache = DeferredCache(
Expand Down