Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Track unconverted device list outbound pokes using a position instead (
Browse files Browse the repository at this point in the history
…#14516)

When a local device list change is added to
`device_lists_changes_in_room`, the `converted_to_destinations` flag is
set to `FALSE` and the `_handle_new_device_update_async` background
process is started. This background process looks for unconverted rows
in `device_lists_changes_in_room`, copies them to
`device_lists_outbound_pokes` and updates the flag.

To update the `converted_to_destinations` flag, the database performs a
`DELETE` and `INSERT` internally, which fragments the table. To avoid
this, track unconverted rows using a `(stream ID, room ID)` position
instead of the flag.

From now on, the `converted_to_destinations` column indicates rows that
need converting to outbound pokes, but does not indicate whether the
conversion has already taken place.

Closes #14037.

Signed-off-by: Sean Quah <[email protected]>
  • Loading branch information
squahtx authored Nov 22, 2022
1 parent 7eb7460 commit 9cae44f
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 49 deletions.
1 change: 1 addition & 0 deletions changelog.d/14516.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor conversion of device list changes in room to outbound pokes to track unconverted rows using a `(stream ID, room ID)` position instead of updating the `converted_to_destinations` flag on every row.
30 changes: 27 additions & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,13 +682,33 @@ async def _handle_new_device_update_async(self) -> None:
hosts_already_sent_to: Set[str] = set()

try:
stream_id, room_id = await self.store.get_device_change_last_converted_pos()

while True:
self._handle_new_device_update_new_data = False
rows = await self.store.get_uncoverted_outbound_room_pokes()
max_stream_id = self.store.get_device_stream_token()
rows = await self.store.get_uncoverted_outbound_room_pokes(
stream_id, room_id
)
if not rows:
# If the DB returned nothing then there is nothing left to
# do, *unless* a new device list update happened during the
# DB query.

# Advance `(stream_id, room_id)`.
# `max_stream_id` comes from *before* the query for unconverted
# rows, which means that any unconverted rows must have a larger
# stream ID.
if max_stream_id > stream_id:
stream_id, room_id = max_stream_id, ""
await self.store.set_device_change_last_converted_pos(
stream_id, room_id
)
else:
assert max_stream_id == stream_id
# Avoid moving `room_id` backwards.
pass

if self._handle_new_device_update_new_data:
continue
else:
Expand Down Expand Up @@ -718,7 +738,6 @@ async def _handle_new_device_update_async(self) -> None:
user_id=user_id,
device_id=device_id,
room_id=room_id,
stream_id=stream_id,
hosts=hosts,
context=opentracing_context,
)
Expand Down Expand Up @@ -752,6 +771,12 @@ async def _handle_new_device_update_async(self) -> None:
hosts_already_sent_to.update(hosts)
current_stream_id = stream_id

# Advance `(stream_id, room_id)`.
_, _, room_id, stream_id, _ = rows[-1]
await self.store.set_device_change_last_converted_pos(
stream_id, room_id
)

finally:
self._handle_new_device_update_is_processing = False

Expand Down Expand Up @@ -834,7 +859,6 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None:
user_id=user_id,
device_id=device_id,
room_id=room_id,
stream_id=None,
hosts=potentially_changed_hosts,
context=None,
)
Expand Down
13 changes: 7 additions & 6 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2075,13 +2075,14 @@ def simple_select_one_txn(
retcols: Collection[str],
allow_none: bool = False,
) -> Optional[Dict[str, Any]]:
select_sql = "SELECT %s FROM %s WHERE %s" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k,) for k in keyvalues),
)
select_sql = "SELECT %s FROM %s" % (", ".join(retcols), table)

if keyvalues:
select_sql += " WHERE %s" % (" AND ".join("%s = ?" % k for k in keyvalues),)
txn.execute(select_sql, list(keyvalues.values()))
else:
txn.execute(select_sql)

txn.execute(select_sql, list(keyvalues.values()))
row = txn.fetchone()

if not row:
Expand Down
107 changes: 69 additions & 38 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2008,27 +2008,48 @@ def _add_device_outbound_room_poke_txn(
)

async def get_uncoverted_outbound_room_pokes(
self, limit: int = 10
self, start_stream_id: int, start_room_id: str, limit: int = 10
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
"""Get device list changes by room that have not yet been handled and
written to `device_lists_outbound_pokes`.
Args:
start_stream_id: Together with `start_room_id`, indicates the position after
which to return device list changes.
start_room_id: Together with `start_stream_id`, indicates the position after
which to return device list changes.
limit: The maximum number of device list changes to return.
Returns:
A list of user ID, device ID, room ID, stream ID and optional opentracing context.
A list of user ID, device ID, room ID, stream ID and optional opentracing
context, in order of ascending (stream ID, room ID).
"""

sql = """
SELECT user_id, device_id, room_id, stream_id, opentracing_context
FROM device_lists_changes_in_room
WHERE NOT converted_to_destinations
ORDER BY stream_id
WHERE
(stream_id, room_id) > (?, ?) AND
stream_id <= ? AND
NOT converted_to_destinations
ORDER BY stream_id ASC, room_id ASC
LIMIT ?
"""

def get_uncoverted_outbound_room_pokes_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
txn.execute(sql, (limit,))
txn.execute(
sql,
(
start_stream_id,
start_room_id,
# Avoid returning rows if there may be uncommitted device list
# changes with smaller stream IDs.
self._device_list_id_gen.get_current_token(),
limit,
),
)

return [
(
Expand All @@ -2050,49 +2071,25 @@ async def add_device_list_outbound_pokes(
user_id: str,
device_id: str,
room_id: str,
stream_id: Optional[int],
hosts: Collection[str],
context: Optional[Dict[str, str]],
) -> None:
"""Queue the device update to be sent to the given set of hosts,
calculated from the room ID.
Marks the associated row in `device_lists_changes_in_room` as handled,
if `stream_id` is provided.
"""
if not hosts:
return

def add_device_list_outbound_pokes_txn(
txn: LoggingTransaction, stream_ids: List[int]
) -> None:
if hosts:
self._add_device_outbound_poke_to_stream_txn(
txn,
user_id=user_id,
device_id=device_id,
hosts=hosts,
stream_ids=stream_ids,
context=context,
)

if stream_id:
self.db_pool.simple_update_txn(
txn,
table="device_lists_changes_in_room",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"stream_id": stream_id,
"room_id": room_id,
},
updatevalues={"converted_to_destinations": True},
)

if not hosts:
# If there are no hosts then we don't try and generate stream IDs.
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
[],
self._add_device_outbound_poke_to_stream_txn(
txn,
user_id=user_id,
device_id=device_id,
hosts=hosts,
stream_ids=stream_ids,
context=context,
)

async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
Expand Down Expand Up @@ -2156,3 +2153,37 @@ def get_pending_remote_device_list_updates_for_room_txn(
"get_pending_remote_device_list_updates_for_room",
get_pending_remote_device_list_updates_for_room_txn,
)

async def get_device_change_last_converted_pos(self) -> Tuple[int, str]:
"""
Get the position of the last row in `device_list_changes_in_room` that has been
converted to `device_lists_outbound_pokes`.
Rows with a strictly greater position where `converted_to_destinations` is
`FALSE` have not been converted.
"""

row = await self.db_pool.simple_select_one(
table="device_lists_changes_converted_stream_position",
keyvalues={},
retcols=["stream_id", "room_id"],
desc="get_device_change_last_converted_pos",
)
return row["stream_id"], row["room_id"]

async def set_device_change_last_converted_pos(
self,
stream_id: int,
room_id: str,
) -> None:
"""
Set the position of the last row in `device_list_changes_in_room` that has been
converted to `device_lists_outbound_pokes`.
"""

await self.db_pool.simple_update_one(
table="device_lists_changes_converted_stream_position",
keyvalues={},
updatevalues={"stream_id": stream_id, "room_id": room_id},
desc="set_device_change_last_converted_pos",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Prior to this schema delta, we tracked the set of unconverted rows in
-- `device_lists_changes_in_room` using the `converted_to_destinations` flag. When rows
-- were converted to `device_lists_outbound_pokes`, the `converted_to_destinations` flag
-- would be set.
--
-- After this schema delta, the `converted_to_destinations` is still populated like
-- before, but the set of unconverted rows is determined by the `stream_id` in the new
-- `device_lists_changes_converted_stream_position` table.
--
-- If rolled back, Synapse will re-send all device list changes that happened since the
-- schema delta.

CREATE TABLE IF NOT EXISTS device_lists_changes_converted_stream_position(
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
-- The (stream id, room id) of the last row in `device_lists_changes_in_room` that
-- has been converted to `device_lists_outbound_pokes`. Rows with a strictly larger
-- (stream id, room id) where `converted_to_destinations` is `FALSE` have not been
-- converted.
stream_id BIGINT NOT NULL,
-- `room_id` may be an empty string, which compares less than all valid room IDs.
room_id TEXT NOT NULL,
CHECK (Lock='X')
);

INSERT INTO device_lists_changes_converted_stream_position (stream_id, room_id) VALUES (
(
SELECT COALESCE(
-- The last converted stream id is the smallest unconverted stream id minus
-- one.
MIN(stream_id) - 1,
-- If there is no unconverted stream id, the last converted stream id is the
-- largest stream id.
-- Otherwise, pick 1, since stream ids start at 2.
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_changes_in_room)
) FROM device_lists_changes_in_room WHERE NOT converted_to_destinations
),
''
);
3 changes: 1 addition & 2 deletions tests/storage/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def add_device_change(self, user_id, device_ids, host):
"""

for device_id in device_ids:
stream_id = self.get_success(
self.get_success(
self.store.add_device_change_to_streams(
user_id, [device_id], ["!some:room"]
)
Expand All @@ -39,7 +39,6 @@ def add_device_change(self, user_id, device_ids, host):
user_id=user_id,
device_id=device_id,
room_id="!some:room",
stream_id=stream_id,
hosts=[host],
context={},
)
Expand Down

0 comments on commit 9cae44f

Please sign in to comment.