From ac566f45f6369a9bd7f787ebddaf4526f00aac13 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 11:05:58 +0000 Subject: [PATCH 01/11] Rename method to make obvious it only applies to local users --- synapse/storage/databases/main/user_directory.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 97f09b73dde6..f2f618c8fd6f 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -91,7 +91,7 @@ def __init__( ) self.db_pool.updates.register_background_update_handler( "populate_user_directory_process_users", - self._populate_user_directory_process_users, + self._populate_user_directory_process_local_users, ) self.db_pool.updates.register_background_update_handler( "populate_user_directory_cleanup", self._populate_user_directory_cleanup @@ -265,7 +265,7 @@ def _get_next_batch( # Upsert a user_directory record for each remote user we see. for user_id, profile in users_with_profile.items(): # Local users are processed separately in - # `_populate_user_directory_users`; there we can read from + # `_populate_user_directory_local_users`; there we can read from # the `profiles` table to ensure we don't leak their per-room # profiles. It also means we write local users to this table # exactly once, rather than once for every room they're in. @@ -336,7 +336,7 @@ def _get_next_batch( return processed_event_count - async def _populate_user_directory_process_users( + async def _populate_user_directory_process_local_users( self, progress: JsonDict, batch_size: int ) -> int: """ From 04d091fbcb466ffb1b3462a7fb8754952044aa24 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 11:11:29 +0000 Subject: [PATCH 02/11] Add another temporary table to the user directory background update for storing remote users needing lookup --- synapse/storage/databases/main/user_directory.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index f2f618c8fd6f..45127deecdf1 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -142,6 +142,18 @@ def _make_staging_area(txn: LoggingTransaction) -> None: txn, TEMP_TABLE + "_users", keys=("user_id",), values=users ) + # A table for storing a list of remote users that *may* need a remote + # lookup in order to obtain a public profile. + # The list should be compared against the user directory's cache + # to see whether any queries can be skipped because the remote user + # also appeared in a public room. + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_remote_users_needing_lookup(user_id TEXT PRIMARY KEY NOT NULL)" + ) + txn.execute(sql) + new_pos = await self.get_max_stream_id_in_current_state_deltas() await self.db_pool.runInteraction( "populate_user_directory_temp_build", _make_staging_area @@ -171,6 +183,9 @@ async def _populate_user_directory_cleanup( def _delete_staging_area(txn: LoggingTransaction) -> None: txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users") + txn.execute( + "DROP TABLE IF EXISTS " + TEMP_TABLE + "_remote_users_needing_lookup" + ) txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") await self.db_pool.runInteraction( From 461cdb631fcb08ecc27e26e9740fdd14e4292521 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 11:57:05 +0000 Subject: [PATCH 03/11] Don't add private remote users straight to the user directory Instead, queue them up to be added to the stale profile queue. --- .../storage/databases/main/user_directory.py | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 45127deecdf1..376daefdcf22 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -277,6 +277,13 @@ def _get_next_batch( or await self.should_include_local_user_in_dir(user_id) } + # Determine whether the room is public + is_public = await self.is_room_world_readable_or_publicly_joinable( + room_id + ) + + remote_users_to_query_later = set() + # Upsert a user_directory record for each remote user we see. for user_id, profile in users_with_profile.items(): # Local users are processed separately in @@ -289,14 +296,29 @@ def _get_next_batch( # TODO `users_with_profile` above reads from the `user_directory` # table, meaning that `profile` is bespoke to this room. # and this leaks remote users' per-room profiles to the user directory. - await self.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url - ) + if is_public: + # If this is a public room, it's acceptable to add the profile + # into the user directory. + await self.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url + ) + else: + # Otherwise query the user at a later time + remote_users_to_query_later.add(user_id) + + # (insert the remote users needing a query in batch; + # use upsert with no values for 'INSERT OR IGNORE' semantics) + await self.db_pool.simple_upsert_many( + f"{TEMP_TABLE}_remote_users_needing_lookup", + ("user_id",), + [(u,) for u in remote_users_to_query_later], + (), + (), + desc="populate_user_directory_queue_remote_needing_lookup", + ) + del remote_users_to_query_later # Now update the room sharing tables to include this room. - is_public = await self.is_room_world_readable_or_publicly_joinable( - room_id - ) if is_public: if users_with_profile: await self.add_users_in_public_rooms( From d448469ea7ddb11eb0f856738a3ee6d9e69004bd Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 11:57:41 +0000 Subject: [PATCH 04/11] Add a background update stage to sort the remote users into the stale profile queue as appropriate --- .../storage/databases/main/user_directory.py | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 376daefdcf22..1beb01cd7793 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -93,6 +93,10 @@ def __init__( "populate_user_directory_process_users", self._populate_user_directory_process_local_users, ) + self.db_pool.updates.register_background_update_handler( + "populate_user_directory_process_remote_users", + self._populate_user_directory_process_remote_users, + ) self.db_pool.updates.register_background_update_handler( "populate_user_directory_cleanup", self._populate_user_directory_cleanup ) @@ -441,6 +445,104 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: return len(users_to_work_on) + async def _populate_user_directory_process_remote_users( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Sorts through the `_remote_users_needing_lookup` table and adds the + users within to the list of stale remote profiles, + unless we already populated a user directory entry for them (i.e. they were + also in a public room). + """ + + def _get_next_batch_txn( + txn: LoggingTransaction, done_up_to_user_id: str + ) -> Optional[str]: + """ + Given the last user ID we've processed, + Returns + - a user ID to process up to and including; or + - `None` if there is no limit left (i.e. we should just process all + remaining rows). + """ + # Should be a B-Tree index only scan: so reasonably efficient despite the + # OFFSET + # If we're lucky, will also warm up the disk cache for the subsequent query + # that actually does some work. + txn.execute( + f""" + SELECT user_id + FROM {TEMP_TABLE}_remote_users_needing_lookup + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET ? + """, + (done_up_to_user_id, batch_size), + ) + row = txn.fetchone() + if row: + return row[0] + else: + return None + + def _add_private_only_users_to_stale_profile_refresh_queue_txn( + txn: LoggingTransaction, from_exc: str, until_inc: Optional[str] + ) -> None: + end_condition = "AND user_id <= ?" if until_inc is not None else "" + end_args = (until_inc,) if until_inc is not None else () + + user_id_serverpart: str + if isinstance(self.database_engine, PostgresEngine): + user_id_serverpart = ( + "SUBSTRING(user_id FROM POSITION(':' IN user_id) + 1)" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + user_id_serverpart = "SUBSTR(user_id, INSTR(user_id, ':') + 1)" + else: + raise RuntimeError("Unknown database engine!") + + txn.execute( + f""" + INSERT INTO user_directory_stale_remote_users + (user_id, next_try_at_ts, retry_counter, user_server_name) + SELECT + user_id, 0, 0, {user_id_serverpart} + FROM {TEMP_TABLE}_remote_users_needing_lookup AS runl + LEFT JOIN user_directory AS ud USING (user_id) + WHERE ud.user_id IS NULL + AND ? < user_id {end_condition} + """, + (from_exc,) + end_args, + ) + + def _do_txn(txn: LoggingTransaction) -> None: + """ + Does a step of background update. + """ + last_user_id = progress.get("last_user_id", "@") + next_end_limit_inc = _get_next_batch_txn(txn, last_user_id) + _add_private_only_users_to_stale_profile_refresh_queue_txn( + txn, last_user_id, next_end_limit_inc + ) + + # Update the progress + progress["last_user_id"] = next_end_limit_inc + self.db_pool.updates._background_update_progress_txn( + txn, "populate_user_directory_process_remote_users", progress + ) + + if progress.get("last_user_id", "@") is None: + await self.db_pool.updates._end_background_update( + "populate_user_directory_process_remote_users" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_user_directory_process_remote_users", + _do_txn, + ) + return batch_size + async def should_include_local_user_in_dir(self, user: str) -> bool: """Certain classes of local user are omitted from the user directory. Is this user one of them? From 1552fa44dbb8ab43f0c13a06b512625851d9f919 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 17:41:19 +0000 Subject: [PATCH 05/11] (ugly?) Kick off the fetching of remote profiles once ready --- synapse/replication/tcp/commands.py | 18 ++++++++++++++++++ .../storage/databases/main/user_directory.py | 14 ++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 32f52e54d8c7..df229fc0a374 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -422,6 +422,21 @@ class RemoteServerUpCommand(_SimpleCommand): NAME = "REMOTE_SERVER_UP" +class ReadyToRefreshStaleUserDirectoryProfilesCommand(_SimpleCommand): + """ + Sent when a worker needs to tell the user directory worker that there are + stale remote user profiles that require refreshing. + + Triggered when the user directory background update has been completed. + + Format:: + + USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES '' + """ + + NAME = "USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES" + + _COMMANDS: Tuple[Type[Command], ...] = ( ServerCommand, RdataCommand, @@ -435,6 +450,7 @@ class RemoteServerUpCommand(_SimpleCommand): UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, + ReadyToRefreshStaleUserDirectoryProfilesCommand, ) # Map of command name to command type. @@ -448,6 +464,7 @@ class RemoteServerUpCommand(_SimpleCommand): ErrorCommand.NAME, PingCommand.NAME, RemoteServerUpCommand.NAME, + ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME, ) # The commands the client is allowed to send @@ -461,6 +478,7 @@ class RemoteServerUpCommand(_SimpleCommand): UserIpCommand.NAME, ErrorCommand.NAME, RemoteServerUpCommand.NAME, + ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME, ) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 1beb01cd7793..b94b45a5b089 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -27,6 +27,10 @@ cast, ) +from synapse.replication.tcp.commands import ( + ReadyToRefreshStaleUserDirectoryProfilesCommand, +) + try: # Figure out if ICU support is available for searching users. import icu @@ -535,6 +539,16 @@ def _do_txn(txn: LoggingTransaction) -> None: await self.db_pool.updates._end_background_update( "populate_user_directory_process_remote_users" ) + + # Now kick off querying remote homeservers for profile information. + if self.hs.config.worker.should_update_user_directory: + self.hs.get_user_directory_handler().kick_off_remote_profile_refresh_process() + else: + command_handler = self.hs.get_replication_command_handler() + command_handler.send_command( + ReadyToRefreshStaleUserDirectoryProfilesCommand("") + ) + return 1 await self.db_pool.runInteraction( From 48a637a6ff2602b0c340a0203e40ab85bc569a0f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 17:56:37 +0000 Subject: [PATCH 06/11] When we start populating the user directory, clear out the old tables first if they're there --- .../storage/databases/main/user_directory.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index b94b45a5b089..276d4fcec9f4 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -105,11 +105,22 @@ def __init__( "populate_user_directory_cleanup", self._populate_user_directory_cleanup ) + @staticmethod + def _delete_staging_area(txn: LoggingTransaction) -> None: + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users") + txn.execute( + "DROP TABLE IF EXISTS " + TEMP_TABLE + "_remote_users_needing_lookup" + ) + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") + async def _populate_user_directory_createtables( self, progress: JsonDict, batch_size: int ) -> int: - # Get all the rooms that we want to process. def _make_staging_area(txn: LoggingTransaction) -> None: + # Clear out any tables if they already exist beforehand. + UserDirectoryBackgroundUpdateStore._delete_staging_area(txn) + sql = ( "CREATE TABLE IF NOT EXISTS " + TEMP_TABLE @@ -188,16 +199,9 @@ async def _populate_user_directory_cleanup( ) await self.update_user_directory_stream_pos(position) - def _delete_staging_area(txn: LoggingTransaction) -> None: - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users") - txn.execute( - "DROP TABLE IF EXISTS " + TEMP_TABLE + "_remote_users_needing_lookup" - ) - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") - await self.db_pool.runInteraction( - "populate_user_directory_cleanup", _delete_staging_area + "populate_user_directory_cleanup", + UserDirectoryBackgroundUpdateStore._delete_staging_area, ) await self.db_pool.updates._end_background_update( From d49f230249e6052afe23be358b22e59b5dace157 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 17:56:50 +0000 Subject: [PATCH 07/11] When rebuilding user dir, schedule the new task --- synapse/rest/admin/background_updates.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py index 93a78db81186..a36bb1f8f13a 100644 --- a/synapse/rest/admin/background_updates.py +++ b/synapse/rest/admin/background_updates.py @@ -138,10 +138,15 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: "populate_user_directory_process_rooms", ), ( - "populate_user_directory_cleanup", + "populate_user_directory_process_remote_users", "{}", "populate_user_directory_process_users", ), + ( + "populate_user_directory_cleanup", + "{}", + "populate_user_directory_process_remote_users", + ), ] else: raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name") From 0db4dc813213839e509d4d7cef5d0becdf831d2b Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 17:56:55 +0000 Subject: [PATCH 08/11] Schedule a user directory rebuild --- .../delta/74/02_user_directory_rebuild.sql | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 synapse/storage/schema/main/delta/74/02_user_directory_rebuild.sql diff --git a/synapse/storage/schema/main/delta/74/02_user_directory_rebuild.sql b/synapse/storage/schema/main/delta/74/02_user_directory_rebuild.sql new file mode 100644 index 000000000000..8c6da8219b48 --- /dev/null +++ b/synapse/storage/schema/main/delta/74/02_user_directory_rebuild.sql @@ -0,0 +1,40 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Rebuild the user directory in light of the fix for leaking the per-room +-- profiles of remote users to the user directory. + +-- First cancel any existing rebuilds if already pending; we'll run from fresh. +DELETE FROM background_updates WHERE update_name IN ( + 'populate_user_directory_createtables', + 'populate_user_directory_process_rooms', + 'populate_user_directory_process_users', + 'populate_user_directory_process_remote_users', + 'populate_user_directory_cleanup' +); + +-- Then schedule the steps. +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + -- Set up user directory staging tables. + (7402, 'populate_user_directory_createtables', '{}', NULL), + -- Run through each room and update the user directory according to who is in it. + (7402, 'populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'), + -- Insert all users into the user directory, if search_all_users is on. + (7402, 'populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'), + -- Insert remote users into the queue for fetching. + (7402, 'populate_user_directory_process_remote_users', '{}', 'populate_user_directory_process_users'), + -- Clean up user directory staging tables. + (7402, 'populate_user_directory_cleanup', '{}', 'populate_user_directory_process_remote_users') +ON CONFLICT (update_name) DO NOTHING; From fc6c5ae213e52061aa3acf89a5cb8205a721bafa Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Feb 2023 18:01:28 +0000 Subject: [PATCH 09/11] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/15091.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15091.bugfix diff --git a/changelog.d/15091.bugfix b/changelog.d/15091.bugfix new file mode 100644 index 000000000000..12f979e9d041 --- /dev/null +++ b/changelog.d/15091.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change. \ No newline at end of file From d4df71d857059cd51250cc7a222fb08889b60230 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 28 Feb 2023 16:28:09 +0000 Subject: [PATCH 10/11] (We now kick off refreshing remote profiles from the background, so don't always have something to cancel.) --- tests/handlers/test_user_directory.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index da4d24082648..872c95ffb0f1 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -1128,9 +1128,10 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.user_dir_handler = hs.get_user_directory_handler() self.profile_handler = hs.get_profile_handler() - # Cancel the startup call: in the steady-state case we can't rely on it anyway. - assert self.user_dir_handler._refresh_remote_profiles_call_later is not None - self.user_dir_handler._refresh_remote_profiles_call_later.cancel() + if self.user_dir_handler._refresh_remote_profiles_call_later is not None: + # Cancel the startup call: in the steady-state case we can't rely on + # it anyway. + self.user_dir_handler._refresh_remote_profiles_call_later.cancel() def test_public_rooms_have_profiles_collected(self) -> None: """ From 4fdd64799e2b08a2fcb387ab2595c88e383055a4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 16 Mar 2023 12:11:14 +0000 Subject: [PATCH 11/11] Fix the portdb script --- synapse/_scripts/synapse_port_db.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 2c9cbf8b275b..29a5ef4311da 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -283,6 +283,13 @@ def should_send_federation(self) -> bool: def get_replication_notifier(self) -> ReplicationNotifier: return ReplicationNotifier() + def get_user_directory_handler(self) -> object: + class FakeUserDirectoryHandler: + def kick_off_remote_profile_refresh_process(self) -> None: + pass + + return FakeUserDirectoryHandler() + class Porter: def __init__(