Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Speed up SS room sorting #17468

Merged
merged 7 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17468.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up sorting of the room list in sliding sync.
43 changes: 21 additions & 22 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,34 +1217,33 @@ async def sort_rooms(
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
# user should see in the room (<= `to_token`)
last_activity_in_room_map: Dict[str, int] = {}
for room_id, room_for_user in sync_room_map.items():
# If they are fully-joined to the room, let's find the latest activity
# at/before the `to_token`.
if room_for_user.membership == Membership.JOIN:
last_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id, to_token.room_key
)
)

# If the room has no events at/before the `to_token`, this is probably a
# mistake in the code that generates the `sync_room_map` since that should
# only give us rooms that the user had membership in during the token range.
assert last_event_result is not None

_, event_pos = last_event_result

last_activity_in_room_map[room_id] = event_pos.stream
else:
# Otherwise, if the user has left/been invited/knocked/been banned from
# a room, they shouldn't see anything past that point.
for room_id, room_for_user in sync_room_map.items():
if room_for_user.membership != Membership.JOIN:
# If the user has left/been invited/knocked/been banned from a
# room, they shouldn't see anything past that point.
#
# FIXME: It's possible that people should see beyond this point in
# invited/knocked cases if for example the room has
# FIXME: It's possible that people should see beyond this point
# in invited/knocked cases if for example the room has
# `invite`/`world_readable` history visibility, see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream

# For fully-joined rooms, we find the latest activity at/before the
# `to_token`.
joined_room_positions = (
await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
[
room_id
for room_id, room_for_user in sync_room_map.items()
if room_for_user.membership == Membership.JOIN
],
to_token.room_key,
)
)

last_activity_in_room_map.update(joined_room_positions)

return sorted(
sync_room_map.values(),
# Sort by the last activity (stream_ordering) in the room
Expand Down
5 changes: 5 additions & 0 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,11 @@ async def get_forward_extremities_for_room_at_stream_ordering(
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # type: ignore[attr-defined]
if last_change is None:
# If the room isn't in the cache we know that the last change was
# somewhere before the earliest known position of the cache, so we
# can clamp to that.
last_change = self._events_stream_cache.get_earliest_known_position() # type: ignore[attr-defined]

# We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a
Expand Down
123 changes: 122 additions & 1 deletion synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -1293,6 +1294,126 @@ def get_last_event_pos_in_room_before_stream_ordering_txn(
get_last_event_pos_in_room_before_stream_ordering_txn,
)

async def bulk_get_last_event_pos_in_room_before_stream_ordering(
self,
room_ids: StrCollection,
end_token: RoomStreamToken,
) -> Dict[str, int]:
"""Bulk fetch the stream position of the latest events in the given
rooms
"""

min_token = end_token.stream
max_token = end_token.get_max_stream_pos()
results: Dict[str, int] = {}

# First, we check for the rooms in the stream change cache to see if we
# can just use the latest position from it.
missing_room_ids: Set[str] = set()
for room_id in room_ids:
stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id)
if stream_pos and stream_pos <= min_token:
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
results[room_id] = stream_pos
else:
missing_room_ids.add(room_id)

# Next, we query the stream position from the DB. At first we fetch all
# positions less than the *max* stream pos in the token, then filter
# them down. We do this as a) this is a cheaper query, and b) the vast
# majority of rooms will have a latest token from before the min stream
# pos.

def bulk_get_last_event_pos_txn(
txn: LoggingTransaction, batch_room_ids: StrCollection
) -> Dict[str, int]:
# This query fetches the latest stream position in the rooms before
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
# the given max position.
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch_room_ids
)
sql = f"""
SELECT room_id, (
SELECT stream_ordering FROM events AS e
LEFT JOIN rejections USING (event_id)
WHERE e.room_id = r.room_id
AND stream_ordering <= ?
AND NOT outlier
AND rejection_reason IS NULL
ORDER BY stream_ordering DESC
LIMIT 1
)
FROM rooms AS r
WHERE {clause}
"""
txn.execute(sql, [max_token] + args)
return {row[0]: row[1] for row in txn}

recheck_rooms: Set[str] = set()
for batched in batch_iter(missing_room_ids, 1000):
result = await self.db_pool.runInteraction(
"bulk_get_last_event_pos_in_room_before_stream_ordering",
bulk_get_last_event_pos_txn,
batched,
)

# Check that the stream position for the rooms are from before the
# minimum position of the token. If not then we need to fetch more
# rows.
for room_id, stream in result.items():
if stream <= min_token:
results[room_id] = stream
else:
recheck_rooms.add(room_id)

if not recheck_rooms:
return results

# For the remaining rooms we need to fetch all rows between the min and
# max stream positions in the end token, and filter out the rows that
# are after the end token.
#
# This query should be fast as the range between the min and max should
# be small.

def bulk_get_last_event_pos_recheck_txn(
txn: LoggingTransaction, batch_room_ids: StrCollection
) -> Dict[str, int]:
clause, args = make_in_list_sql_clause(
self.database_engine, "room_id", batch_room_ids
)
sql = f"""
SELECT room_id, instance_name, stream_ordering
FROM events
WHERE ? < stream_ordering AND stream_ordering <= ?
AND NOT outlier
AND rejection_reason IS NULL
AND {clause}
ORDER BY stream_ordering ASC
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
"""
txn.execute(sql, [min_token, max_token] + args)

# We take the max stream ordering that is less than the token. Since
# we ordered by stream ordering we just need to iterate through and
# take the last matching stream ordering.
txn_results: Dict[str, int] = {}
for row in txn:
room_id = row[0]
event_pos = PersistedEventPosition(row[1], row[2])
if not event_pos.persisted_after(end_token):
txn_results[room_id] = event_pos.stream

return txn_results

for batched in batch_iter(recheck_rooms, 1000):
recheck_result = await self.db_pool.runInteraction(
"bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
bulk_get_last_event_pos_recheck_txn,
batched,
)
results.update(recheck_result)

return results

async def get_current_room_stream_token_for_room_id(
self, room_id: str
) -> RoomStreamToken:
Expand Down
12 changes: 8 additions & 4 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,19 @@ def _evict(self) -> None:
for entity in r:
self._entity_to_key.pop(entity, None)

def get_max_pos_of_last_change(self, entity: EntityType) -> int:
def get_max_pos_of_last_change(self, entity: EntityType) -> Optional[int]:
"""Returns an upper bound of the stream id of the last change to an
entity.

Args:
entity: The entity to check.

Return:
The stream position of the latest change for the given entity or
the earliest known stream position if the entitiy is unknown.
The stream position of the latest change for the given entity, if
known
"""
return self._entity_to_key.get(entity, self._earliest_known_stream_pos)
return self._entity_to_key.get(entity)

def get_earliest_known_position(self) -> int:
"""Returns the earliest position in the cache."""
return self._earliest_known_stream_pos
4 changes: 2 additions & 2 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,5 @@ def test_max_pos(self) -> None:
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 3)
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 4)

# Unknown entities will return the stream start position.
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 1)
# Unknown entities will return None
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), None)
Loading