Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Process received rows by invalidating caches and notifying
Browse files Browse the repository at this point in the history
  • Loading branch information
reivilibre committed Dec 6, 2022
1 parent b62e941 commit 3005d9b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
14 changes: 13 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,18 @@
TagAccountDataStream,
ToDeviceStream,
TypingStream,
UnPartialStatedEventStream,
UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
from synapse.replication.tcp.streams.partial_state import (
UnPartialStatedEventStreamRow,
UnPartialStatedRoomStreamRow,
)
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -247,6 +251,14 @@ async def on_rdata(
self._state_storage_controller.notify_room_un_partial_stated(
row.room_id
)
elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)

# Wake up any tasks waiting for the event to be un-partial-stated.
self._state_storage_controller.notify_event_un_partial_stated(
row.event_id
)

await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
Expand Down
13 changes: 12 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -390,6 +391,16 @@ def process_replication_rows(
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)
elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)

self.is_partial_state_event.invalidate(row.event_id)

if row.rejection_status_changed:
# If the partial-stated event became rejected or unrejected
# when it wasn't before, we need to invalidate this cache.
self._invalidate_local_get_event_cache(row.event_id)

super().process_replication_rows(stream_name, instance_name, token, rows)

Expand Down
18 changes: 17 additions & 1 deletion synapse/storage/databases/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
import collections.abc
import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple

import attr

Expand All @@ -24,6 +24,8 @@
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.opentracing import trace
from synapse.replication.tcp.streams import UnPartialStatedEventStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -82,6 +84,20 @@ def __init__(
super().__init__(database, db_conn, hs)
self._instance_name: str = hs.get_instance_name()

def process_replication_rows(
self,
stream_name: str,
instance_name: str,
token: int,
rows: Iterable[Any],
) -> None:
if stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
self._get_state_group_for_event.invalidate((row.event_id,))

super().process_replication_rows(stream_name, instance_name, token, rows)

async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
Raises:
Expand Down

0 comments on commit 3005d9b

Please sign in to comment.