Skip to content

Commit

Permalink
Add rooms.bump_stamp to Sliding Sync /sync for easier client-side…
Browse files Browse the repository at this point in the history
… sorting (#17395)

`bump_stamp` corresponds to the `stream_ordering` of the latest `DEFAULT_BUMP_EVENT_TYPES` in the room. This helps clients sort more readily without them needing to pull in a bunch of the timeline to determine the last activity. `bump_event_types` is a thing because for example, we don't want display name changes to mark the room as unread and bump it to the top. For encrypted rooms, we just have to consider any activity as a bump because we can't see the content and the client has to figure it out for themselves.

Outside of Synapse, `bump_stamp` is just a free-form counter so other implementations could use `received_ts`or `origin_server_ts` (see the [*Security considerations* section in MSC3575 about the potential pitfalls of using `origin_server_ts`](https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#security-considerations)). It doesn't have any guarantee about always going up. In the Synapse case, it could go down if an event was redacted/removed (or purged in cases of retention policies).

In the future, we could add `bump_event_types` as [MSC3575](matrix-org/matrix-spec-proposals#3575) mentions if people need to customize the event types.

---

In the Sliding Sync proxy, a similar [`timestamp` field was added](matrix-org/sliding-sync#247) for the same purpose but the name is not obvious what it pertains to or what it's for.

The `timestamp` field was also added to Ruma in ruma/ruma#1622
  • Loading branch information
MadLittleMods authored Jul 8, 2024
1 parent 62134dc commit 3fef535
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 34 deletions.
1 change: 1 addition & 0 deletions changelog.d/17395.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `rooms.bump_stamp` for easier client-side sorting in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
4 changes: 4 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ class EventTypes:
SpaceParent: Final = "m.space.parent"

Reaction: Final = "m.reaction"
Sticker: Final = "m.sticker"
LiveLocationShareStart: Final = "m.beacon_info"

CallInvite: Final = "m.call.invite"

PollStart: Final = "m.poll.start"


class ToDeviceEventTypes:
RoomKeyRequest: Final = "m.room_key_request"
Expand Down
75 changes: 56 additions & 19 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@
logger = logging.getLogger(__name__)


# The event types that clients should consider as new activity.
DEFAULT_BUMP_EVENT_TYPES = {
EventTypes.Message,
EventTypes.Encrypted,
EventTypes.Sticker,
EventTypes.CallInvite,
EventTypes.PollStart,
EventTypes.LiveLocationShareStart,
}


def filter_membership_for_sync(
*, membership: str, user_id: str, sender: Optional[str]
) -> bool:
Expand Down Expand Up @@ -285,6 +296,7 @@ class _RoomMembershipForUser:
range
"""

room_id: str
event_id: Optional[str]
event_pos: PersistedEventPosition
membership: str
Expand Down Expand Up @@ -469,7 +481,9 @@ async def current_sync_for_user(
#
# Both sides of range are inclusive so we `+ 1`
max_num_rooms = range[1] - range[0] + 1
for room_id, _ in sorted_room_info[range[0] :]:
for room_membership in sorted_room_info[range[0] :]:
room_id = room_membership.room_id

if len(room_ids_in_list) >= max_num_rooms:
break

Expand Down Expand Up @@ -519,7 +533,7 @@ async def current_sync_for_user(
user=sync_config.user,
room_id=room_id,
room_sync_config=room_sync_config,
rooms_membership_for_user_at_to_token=sync_room_map[room_id],
room_membership_for_user_at_to_token=sync_room_map[room_id],
from_token=from_token,
to_token=to_token,
)
Expand Down Expand Up @@ -591,6 +605,7 @@ async def get_sync_room_ids_for_user(
# (below) because they are potentially from the current snapshot time
# instead from the time of the `to_token`.
room_for_user.room_id: _RoomMembershipForUser(
room_id=room_for_user.room_id,
event_id=room_for_user.event_id,
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
Expand Down Expand Up @@ -691,6 +706,7 @@ async def get_sync_room_ids_for_user(
is not None
):
sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=first_membership_change_after_to_token.prev_event_id,
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
Expand Down Expand Up @@ -785,6 +801,7 @@ async def get_sync_room_ids_for_user(
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
membership=last_membership_change_in_from_to_range.membership,
Expand Down Expand Up @@ -969,7 +986,7 @@ async def sort_rooms(
self,
sync_room_map: Dict[str, _RoomMembershipForUser],
to_token: StreamToken,
) -> List[Tuple[str, _RoomMembershipForUser]]:
) -> List[_RoomMembershipForUser]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.
Expand Down Expand Up @@ -1007,12 +1024,17 @@ async def sort_rooms(
else:
# Otherwise, if the user has left/been invited/knocked/been banned from
# a room, they shouldn't see anything past that point.
#
# FIXME: It's possible that people should see beyond this point in
# invited/knocked cases if for example the room has
# `invite`/`world_readable` history visibility, see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream

return sorted(
sync_room_map.items(),
sync_room_map.values(),
# Sort by the last activity (stream_ordering) in the room
key=lambda room_info: last_activity_in_room_map[room_info[0]],
key=lambda room_info: last_activity_in_room_map[room_info.room_id],
# We want descending order
reverse=True,
)
Expand All @@ -1022,7 +1044,7 @@ async def get_room_sync_data(
user: UserID,
room_id: str,
room_sync_config: RoomSyncConfig,
rooms_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
from_token: Optional[StreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
Expand All @@ -1036,7 +1058,7 @@ async def get_room_sync_data(
room_id: The room ID to fetch data for
room_sync_config: Config for what data we should fetch for a room in the
sync response.
rooms_membership_for_user_at_to_token: Membership information for the user
room_membership_for_user_at_to_token: Membership information for the user
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
Expand All @@ -1056,7 +1078,7 @@ async def get_room_sync_data(
if (
room_sync_config.timeline_limit > 0
# No timeline for invite/knock rooms (just `stripped_state`)
and rooms_membership_for_user_at_to_token.membership
and room_membership_for_user_at_to_token.membership
not in (Membership.INVITE, Membership.KNOCK)
):
limited = False
Expand All @@ -1069,12 +1091,12 @@ async def get_room_sync_data(
# We're going to paginate backwards from the `to_token`
from_bound = to_token.room_key
# People shouldn't see past their leave/ban event
if rooms_membership_for_user_at_to_token.membership in (
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
from_bound = (
rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token()
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)

# Determine whether we should limit the timeline to the token range.
Expand All @@ -1089,7 +1111,7 @@ async def get_room_sync_data(
to_bound = (
from_token.room_key
if from_token is not None
and not rooms_membership_for_user_at_to_token.newly_joined
and not room_membership_for_user_at_to_token.newly_joined
else None
)

Expand Down Expand Up @@ -1126,7 +1148,7 @@ async def get_room_sync_data(
self.storage_controllers,
user.to_string(),
timeline_events,
is_peeking=rooms_membership_for_user_at_to_token.membership
is_peeking=room_membership_for_user_at_to_token.membership
!= Membership.JOIN,
filter_send_to_client=True,
)
Expand Down Expand Up @@ -1181,16 +1203,16 @@ async def get_room_sync_data(
# Figure out any stripped state events for invite/knocks. This allows the
# potential joiner to identify the room.
stripped_state: List[JsonDict] = []
if rooms_membership_for_user_at_to_token.membership in (
if room_membership_for_user_at_to_token.membership in (
Membership.INVITE,
Membership.KNOCK,
):
# This should never happen. If someone is invited/knocked on room, then
# there should be an event for it.
assert rooms_membership_for_user_at_to_token.event_id is not None
assert room_membership_for_user_at_to_token.event_id is not None

invite_or_knock_event = await self.store.get_event(
rooms_membership_for_user_at_to_token.event_id
room_membership_for_user_at_to_token.event_id
)

stripped_state = []
Expand All @@ -1206,7 +1228,7 @@ async def get_room_sync_data(
stripped_state.append(strip_event(invite_or_knock_event))

# TODO: Handle state resets. For example, if we see
# `rooms_membership_for_user_at_to_token.membership = Membership.LEAVE` but
# `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
# `required_state` doesn't include it, we should indicate to the client that a
# state reset happened. Perhaps we should indicate this by setting `initial:
# True` and empty `required_state`.
Expand All @@ -1226,7 +1248,7 @@ async def get_room_sync_data(
# `invite`/`knock` rooms only have `stripped_state`. See
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
room_state: Optional[StateMap[EventBase]] = None
if rooms_membership_for_user_at_to_token.membership not in (
if room_membership_for_user_at_to_token.membership not in (
Membership.INVITE,
Membership.KNOCK,
):
Expand Down Expand Up @@ -1303,15 +1325,15 @@ async def get_room_sync_data(
# initial sync
if initial:
# People shouldn't see past their leave/ban event
if rooms_membership_for_user_at_to_token.membership in (
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
room_state = await self.storage_controllers.state.get_state_at(
room_id,
stream_position=to_token.copy_and_replace(
StreamKeyType.ROOM,
rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
),
state_filter=state_filter,
# Partially-stated rooms should have all state events except for
Expand Down Expand Up @@ -1341,6 +1363,20 @@ async def get_room_sync_data(
# we can return updates instead of the full required state.
raise NotImplementedError()

# Figure out the last bump event in the room
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
)
)

# By default, just choose the membership event position
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# But if we found a bump event, use that instead
if last_bump_event_result is not None:
_, bump_event_pos = last_bump_event_result
bump_stamp = bump_event_pos.stream

return SlidingSyncResult.RoomResult(
# TODO: Dummy value
name=None,
Expand All @@ -1358,6 +1394,7 @@ async def get_room_sync_data(
prev_batch=prev_batch_token,
limited=limited,
num_live=num_live,
bump_stamp=bump_stamp,
# TODO: Dummy values
joined_count=0,
invited_count=0,
Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,7 @@ async def encode_rooms(
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"bump_stamp": room_result.bump_stamp,
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
Expand Down
35 changes: 24 additions & 11 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
event_types: Optional[Collection[str]] = None,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
Expand All @@ -1186,6 +1187,7 @@ async def get_last_event_pos_in_room_before_stream_ordering(
Args:
room_id
end_token: The token used to stream from
event_types: Optional allowlist of event types to filter by
Returns:
The ID of the most recent event and it's position, or None if there are no
Expand All @@ -1207,9 +1209,17 @@ def get_last_event_pos_in_room_before_stream_ordering_txn(
min_stream = end_token.stream
max_stream = end_token.get_max_stream_pos()

# We use `union all` because we don't need any of the deduplication logic
# (`union` is really a union + distinct). `UNION ALL` does preserve the
# ordering of the operand queries but there is no actual gurantee that it
event_type_clause = ""
event_type_args: List[str] = []
if event_types is not None and len(event_types) > 0:
event_type_clause, event_type_args = make_in_list_sql_clause(
txn.database_engine, "type", event_types
)
event_type_clause = f"AND {event_type_clause}"

# We use `UNION ALL` because we don't need any of the deduplication logic
# (`UNION` is really a `UNION` + `DISTINCT`). `UNION ALL` does preserve the
# ordering of the operand queries but there is no actual guarantee that it
# has this behavior in all scenarios so we need the extra `ORDER BY` at the
# bottom.
sql = """
Expand All @@ -1218,6 +1228,7 @@ def get_last_event_pos_in_room_before_stream_ordering_txn(
FROM events
LEFT JOIN rejections USING (event_id)
WHERE room_id = ?
%s
AND ? < stream_ordering AND stream_ordering <= ?
AND NOT outlier
AND rejections.event_id IS NULL
Expand All @@ -1229,23 +1240,25 @@ def get_last_event_pos_in_room_before_stream_ordering_txn(
FROM events
LEFT JOIN rejections USING (event_id)
WHERE room_id = ?
%s
AND stream_ordering <= ?
AND NOT outlier
AND rejections.event_id IS NULL
ORDER BY stream_ordering DESC
LIMIT 1
) AS b
ORDER BY stream_ordering DESC
"""
""" % (
event_type_clause,
event_type_clause,
)
txn.execute(
sql,
(
room_id,
min_stream,
max_stream,
room_id,
min_stream,
),
[room_id]
+ event_type_args
+ [min_stream, max_stream, room_id]
+ event_type_args
+ [min_stream],
)

for instance_name, stream_ordering, topological_ordering, event_id in txn:
Expand Down
8 changes: 8 additions & 0 deletions synapse/types/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ class RoomResult:
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
bump_stamp: The `stream_ordering` of the last event according to the
`bump_event_types`. This helps clients sort more readily without them
needing to pull in a bunch of the timeline to determine the last activity.
`bump_event_types` is a thing because for example, we don't want display
name changes to mark the room as unread and bump it to the top. For
encrypted rooms, we just have to consider any activity as a bump because we
can't see the content and the client has to figure it out for themselves.
joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2
Expand Down Expand Up @@ -211,6 +218,7 @@ class RoomResult:
limited: Optional[bool]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
bump_stamp: int
joined_count: int
invited_count: int
notification_count: int
Expand Down
Loading

0 comments on commit 3fef535

Please sign in to comment.