From 3005d9be6727dc06890bb97a008ab2da2293d821 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Nov 2022 19:36:03 +0000 Subject: [PATCH] Process received rows by invalidating caches and notifying --- synapse/replication/tcp/client.py | 14 +++++++++++++- .../storage/databases/main/events_worker.py | 13 ++++++++++++- synapse/storage/databases/main/state.py | 18 +++++++++++++++++- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index b4dad47b45ad..658d89210d31 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -36,6 +36,7 @@ TagAccountDataStream, ToDeviceStream, TypingStream, + UnPartialStatedEventStream, UnPartialStatedRoomStream, ) from synapse.replication.tcp.streams.events import ( @@ -43,7 +44,10 @@ EventsStreamEventRow, EventsStreamRow, ) -from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow +from synapse.replication.tcp.streams.partial_state import ( + UnPartialStatedEventStreamRow, + UnPartialStatedRoomStreamRow, +) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure @@ -247,6 +251,14 @@ async def on_rdata( self._state_storage_controller.notify_room_un_partial_stated( row.room_id ) + elif stream_name == UnPartialStatedEventStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedEventStreamRow) + + # Wake up any tasks waiting for the event to be un-partial-stated. + self._state_storage_controller.notify_event_un_partial_stated( + row.event_id + ) await self._presence_handler.process_replication_rows( stream_name, instance_name, token, rows diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index cdeaaf141008..a5772fa347e6 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -59,8 +59,9 @@ run_as_background_process, wrap_as_background_process, ) -from synapse.replication.tcp.streams import BackfillStream +from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream from synapse.replication.tcp.streams.events import EventsStream +from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -390,6 +391,16 @@ def process_replication_rows( self._stream_id_gen.advance(instance_name, token) elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) + elif stream_name == UnPartialStatedEventStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedEventStreamRow) + + self.is_partial_state_event.invalidate(row.event_id) + + if row.rejection_status_changed: + # If the partial-stated event became rejected or unrejected + # when it wasn't before, we need to invalidate this cache. + self._invalidate_local_get_event_cache(row.event_id) super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index ba16f2dfdf98..9caf877b7476 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -14,7 +14,7 @@ # limitations under the License. import collections.abc import logging -from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple import attr @@ -24,6 +24,8 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.opentracing import trace +from synapse.replication.tcp.streams import UnPartialStatedEventStream +from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -82,6 +84,20 @@ def __init__( super().__init__(database, db_conn, hs) self._instance_name: str = hs.get_instance_name() + def process_replication_rows( + self, + stream_name: str, + instance_name: str, + token: int, + rows: Iterable[Any], + ) -> None: + if stream_name == UnPartialStatedEventStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedEventStreamRow) + self._get_state_group_for_event.invalidate((row.event_id,)) + + super().process_replication_rows(stream_name, instance_name, token, rows) + async def get_room_version(self, room_id: str) -> RoomVersion: """Get the room_version of a given room Raises: