From 793cffeb347df1722ec000994d7f7e6ae4ab0a75 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2022 14:58:23 +0100 Subject: [PATCH 1/7] Add a `AwakenableSleeper` class --- synapse/util/async_helpers.py | 57 ++++++++++++++++++++++++++++++++ tests/util/test_async_helpers.py | 38 +++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index e27c5d298f6a..2762de72f586 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -766,3 +766,60 @@ def handle_cancel(new_deferred: "defer.Deferred[T]") -> None: new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel) deferred.chainDeferred(new_deferred) return new_deferred + + +class AwakenableSleeper: + """Allows explicitly waking up deferreds related to an entity that are + currently sleeping. + """ + + def __init__(self, reactor: IReactorTime) -> None: + self._streams: Dict[str, Set[defer.Deferred[None]]] = {} + self._reactor = reactor + + def wake(self, name: str) -> None: + """Wake everything related to `name` that is currently sleeping.""" + stream_set = self._streams.pop(name, set()) + for deferred in set(stream_set): + try: + with PreserveLoggingContext(): + deferred.callback(None) + except Exception: + pass + + async def sleep(self, name: str, delay_ms: int) -> None: + """Sleep for the given number of milliseconds, or return if the given + `name` is explicitly woken up. + """ + + # Create a deferred that gets called in N seconds + sleep_deferred: "defer.Deferred[None]" = defer.Deferred() + call = self._reactor.callLater(delay_ms / 1000, sleep_deferred.callback, None) + + # Create a deferred that will get called if `wake` is called with + # the same `name`. + stream_set = self._streams.setdefault(name, set()) + notify_deferred: "defer.Deferred[None]" = defer.Deferred() + stream_set.add(notify_deferred) + + try: + # Wait for either the delay or for `wake` to be called. + await make_deferred_yieldable( + defer.DeferredList( + [sleep_deferred, notify_deferred], + fireOnOneCallback=True, + fireOnOneErrback=True, + consumeErrors=True, + ) + ) + finally: + # Clean up the state + stream_set.discard(notify_deferred) + + curr_stream_set = self._streams.get(name) + if curr_stream_set is not None and len(curr_stream_set) == 0: + self._streams.pop(name) + + # Cancel the sleep if we were woken up + if call.active(): + call.cancel() diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py index daacc54c72bd..783bf2fa26da 100644 --- a/tests/util/test_async_helpers.py +++ b/tests/util/test_async_helpers.py @@ -28,6 +28,7 @@ make_deferred_yieldable, ) from synapse.util.async_helpers import ( + AwakenableSleeper, ObservableDeferred, concurrently_execute, delay_cancellation, @@ -35,6 +36,7 @@ timeout_deferred, ) +from tests.server import get_clock from tests.unittest import TestCase @@ -496,3 +498,39 @@ async def outer(): # logging context. blocking_d.callback(None) self.successResultOf(d) + + +class AwakenableSleeperTests(TestCase): + "Tests AwakenableSleeper" + + def test_sleep(self): + reactor, _ = get_clock() + sleeper = AwakenableSleeper(reactor) + + d = defer.ensureDeferred(sleeper.sleep("name", 1000)) + + reactor.pump([0.0]) + self.assertFalse(d.called) + + reactor.advance(0.5) + self.assertFalse(d.called) + + reactor.advance(0.6) + self.assertTrue(d.called) + + def test_explicit_wake(self): + reactor, _ = get_clock() + sleeper = AwakenableSleeper(reactor) + + d = defer.ensureDeferred(sleeper.sleep("name", 1000)) + + reactor.pump([0.0]) + self.assertFalse(d.called) + + reactor.advance(0.5) + self.assertFalse(d.called) + + sleeper.wake("name") + self.assertTrue(d.called) + + reactor.advance(0.6) From 3aba59cd8f8d4cb9a5345e5218c646a1a9d4562e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2022 15:12:08 +0100 Subject: [PATCH 2/7] Immediate retry any requests that have backed off when a server comes back online --- synapse/http/matrixfederationclient.py | 13 +++++++++++-- synapse/notifier.py | 8 +++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index e68644595546..192d308123d8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -73,7 +73,7 @@ from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.types import JsonDict from synapse.util import json_decoder -from synapse.util.async_helpers import timeout_deferred +from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -353,6 +353,13 @@ def schedule(x): self._cooperator = Cooperator(scheduler=schedule) + self._sleeper = AwakenableSleeper(self.reactor) + + def wake_destination(self, destination: str) -> None: + """Called when the remote server may have come back online.""" + + self._sleeper.wake(destination) + async def _send_request_with_optional_trailing_slash( self, request: MatrixFederationRequest, @@ -664,7 +671,9 @@ async def _send_request( delay, ) - await self.clock.sleep(delay) + # Sleep for the calculated delay, or wake up immediately + # if we get notified that the server is back up. + await self._sleeper.sleep(request.destination, delay * 1000) retries_left -= 1 else: raise diff --git a/synapse/notifier.py b/synapse/notifier.py index 16d15a1f3328..01a50b9d6226 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -228,9 +228,7 @@ def __init__(self, hs: "HomeServer"): # Called when there are new things to stream over replication self.replication_callbacks: List[Callable[[], None]] = [] - # Called when remote servers have come back online after having been - # down. - self.remote_server_up_callbacks: List[Callable[[str], None]] = [] + self._federation_client = hs.get_federation_http_client() self._third_party_rules = hs.get_third_party_event_rules() @@ -731,3 +729,7 @@ def notify_remote_server_up(self, server: str) -> None: # circular dependencies. if self.federation_sender: self.federation_sender.wake_destination(server) + + # Tell the federation client about the fact the server is back up, so + # that any in flight requests can be immediately retried. + self._federation_client.wake_destination(server) From fde708b2b3bffbfb7b9d922774b0add7d7d51ff1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Apr 2022 15:13:40 +0100 Subject: [PATCH 3/7] Newsfile --- changelog.d/12500.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12500.misc diff --git a/changelog.d/12500.misc b/changelog.d/12500.misc new file mode 100644 index 000000000000..dbe3f7f5d197 --- /dev/null +++ b/changelog.d/12500.misc @@ -0,0 +1 @@ +Immediately retry any requests that have backed off when a server comes back online. From b1deb3faca89af760fada890c1b2119e56f5cec7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Apr 2022 17:59:16 +0100 Subject: [PATCH 4/7] Mark remote server as up when we can talk to it --- synapse/http/matrixfederationclient.py | 5 +++++ synapse/util/retryutils.py | 24 +++++++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 192d308123d8..9a52b6682f30 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -355,6 +355,9 @@ def schedule(x): self._sleeper = AwakenableSleeper(self.reactor) + self._notifier = hs.get_notifier() + self._replication_client = hs.get_replication_command_handler() + def wake_destination(self, destination: str) -> None: """Called when the remote server may have come back online.""" @@ -481,6 +484,8 @@ async def _send_request( self._store, backoff_on_404=backoff_on_404, ignore_backoff=ignore_backoff, + notifier=self._notifier, + replication_client=self._replication_client, ) method_bytes = request.method.encode("ascii") diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index d81f2527d7be..81bfed268ee7 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -14,13 +14,17 @@ import logging import random from types import TracebackType -from typing import Any, Optional, Type +from typing import TYPE_CHECKING, Any, Optional, Type import synapse.logging.context from synapse.api.errors import CodeMessageException from synapse.storage import DataStore from synapse.util import Clock +if TYPE_CHECKING: + from synapse.notifier import Notifier + from synapse.replication.tcp.handler import ReplicationCommandHandler + logger = logging.getLogger(__name__) # the initial backoff, after the first transaction fails @@ -131,6 +135,8 @@ def __init__( retry_interval: int, backoff_on_404: bool = False, backoff_on_failure: bool = True, + notifier: Optional["Notifier"] = None, + replication_client: Optional["ReplicationCommandHandler"] = None, ): """Marks the destination as "down" if an exception is thrown in the context, except for CodeMessageException with code < 500. @@ -160,6 +166,9 @@ def __init__( self.backoff_on_404 = backoff_on_404 self.backoff_on_failure = backoff_on_failure + self.notifier = notifier + self.replication_client = replication_client + def __enter__(self) -> None: pass @@ -239,6 +248,19 @@ async def store_retry_timings() -> None: retry_last_ts, self.retry_interval, ) + + if self.notifier: + # Inform the relevant places that the remote server is back up. + self.notifier.notify_remote_server_up(self.destination) + + if self.replication_client: + # If we're on a worker we try and inform master about this. The + # replication client doesn't hook into the notifier to avoid + # infinite loops where we send a `REMOTE_SERVER_UP` command to + # master, which then echoes it back to us which in turn pokes + # the notifier. + self.replication_client.send_remote_server_up(self.destination) + except Exception: logger.exception("Failed to store destination_retry_timings") From 50bcd886e2adc5fd2329983d71554e9d8d7009b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Apr 2022 18:04:40 +0100 Subject: [PATCH 5/7] Fixup --- synapse/http/matrixfederationclient.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 9a52b6682f30..c2ec3caa0ea8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -355,9 +355,6 @@ def schedule(x): self._sleeper = AwakenableSleeper(self.reactor) - self._notifier = hs.get_notifier() - self._replication_client = hs.get_replication_command_handler() - def wake_destination(self, destination: str) -> None: """Called when the remote server may have come back online.""" @@ -484,8 +481,8 @@ async def _send_request( self._store, backoff_on_404=backoff_on_404, ignore_backoff=ignore_backoff, - notifier=self._notifier, - replication_client=self._replication_client, + notifier=self.hs.get_notifier(), + replication_client=self.hs.get_replication_command_handler(), ) method_bytes = request.method.encode("ascii") From 9a0db860f523c7316d34f6cb363f8b0d7a4bf67a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 May 2022 09:11:23 +0100 Subject: [PATCH 6/7] Add tests --- tests/util/test_async_helpers.py | 42 ++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py index 783bf2fa26da..9d5010bf9231 100644 --- a/tests/util/test_async_helpers.py +++ b/tests/util/test_async_helpers.py @@ -534,3 +534,45 @@ def test_explicit_wake(self): self.assertTrue(d.called) reactor.advance(0.6) + + def test_multiple_sleepers_timeout(self): + reactor, _ = get_clock() + sleeper = AwakenableSleeper(reactor) + + d1 = defer.ensureDeferred(sleeper.sleep("name", 1000)) + + reactor.advance(0.6) + self.assertFalse(d1.called) + + # Add another sleeper + d2 = defer.ensureDeferred(sleeper.sleep("name", 1000)) + + # Only the first sleep should time out now. + reactor.advance(0.6) + self.assertTrue(d1.called) + self.assertFalse(d2.called) + + reactor.advance(0.6) + self.assertTrue(d2.called) + + def test_multiple_sleepers_wake(self): + reactor, _ = get_clock() + sleeper = AwakenableSleeper(reactor) + + d1 = defer.ensureDeferred(sleeper.sleep("name", 1000)) + + reactor.advance(0.5) + self.assertFalse(d1.called) + + # Add another sleeper + d2 = defer.ensureDeferred(sleeper.sleep("name", 1000)) + + # Neither should fire yet + reactor.advance(0.3) + self.assertFalse(d1.called) + self.assertFalse(d2.called) + + # Explicitly waking both up works + sleeper.wake("name") + self.assertTrue(d1.called) + self.assertTrue(d2.called) From 5bb822b146536073de6beed69f297aa9611191d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 May 2022 09:11:35 +0100 Subject: [PATCH 7/7] Don't take a copy of the streams --- synapse/util/async_helpers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 422ce516dfe0..7f1d41eb3c7a 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -792,7 +792,7 @@ def __init__(self, reactor: IReactorTime) -> None: def wake(self, name: str) -> None: """Wake everything related to `name` that is currently sleeping.""" stream_set = self._streams.pop(name, set()) - for deferred in set(stream_set): + for deferred in stream_set: try: with PreserveLoggingContext(): deferred.callback(None) @@ -826,11 +826,11 @@ async def sleep(self, name: str, delay_ms: int) -> None: ) finally: # Clean up the state - stream_set.discard(notify_deferred) - curr_stream_set = self._streams.get(name) - if curr_stream_set is not None and len(curr_stream_set) == 0: - self._streams.pop(name) + if curr_stream_set is not None: + curr_stream_set.discard(notify_deferred) + if len(curr_stream_set) == 0: + self._streams.pop(name) # Cancel the sleep if we were woken up if call.active():