Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

When populating the user directory, query remote servers for user profiles instead of leaking the profiles in private rooms. [rei:userdirpriv] #15091

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions changelog.d/15091.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change.
7 changes: 7 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ def should_send_federation(self) -> bool:
def get_replication_notifier(self) -> ReplicationNotifier:
return ReplicationNotifier()

def get_user_directory_handler(self) -> object:
class FakeUserDirectoryHandler:
def kick_off_remote_profile_refresh_process(self) -> None:
pass

return FakeUserDirectoryHandler()


class Porter:
def __init__(
Expand Down
18 changes: 18 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,21 @@ class RemoteServerUpCommand(_SimpleCommand):
NAME = "REMOTE_SERVER_UP"


class ReadyToRefreshStaleUserDirectoryProfilesCommand(_SimpleCommand):
"""
Sent when a worker needs to tell the user directory worker that there are
stale remote user profiles that require refreshing.

Triggered when the user directory background update has been completed.

Format::

USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES ''
"""

NAME = "USER_DIRECTORY_READY_TO_REFRESH_STALE_REMOTE_PROFILES"


_COMMANDS: Tuple[Type[Command], ...] = (
ServerCommand,
RdataCommand,
Expand All @@ -435,6 +450,7 @@ class RemoteServerUpCommand(_SimpleCommand):
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
ReadyToRefreshStaleUserDirectoryProfilesCommand,
)

# Map of command name to command type.
Expand All @@ -448,6 +464,7 @@ class RemoteServerUpCommand(_SimpleCommand):
ErrorCommand.NAME,
PingCommand.NAME,
RemoteServerUpCommand.NAME,
ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME,
)

# The commands the client is allowed to send
Expand All @@ -461,6 +478,7 @@ class RemoteServerUpCommand(_SimpleCommand):
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
ReadyToRefreshStaleUserDirectoryProfilesCommand.NAME,
)


Expand Down
7 changes: 6 additions & 1 deletion synapse/rest/admin/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
"populate_user_directory_process_rooms",
),
(
"populate_user_directory_cleanup",
"populate_user_directory_process_remote_users",
"{}",
"populate_user_directory_process_users",
),
(
"populate_user_directory_cleanup",
"{}",
"populate_user_directory_process_remote_users",
),
]
else:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
Expand Down
189 changes: 173 additions & 16 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
cast,
)

from synapse.replication.tcp.commands import (
ReadyToRefreshStaleUserDirectoryProfilesCommand,
)

try:
# Figure out if ICU support is available for searching users.
import icu
Expand Down Expand Up @@ -91,17 +95,32 @@ def __init__(
)
self.db_pool.updates.register_background_update_handler(
"populate_user_directory_process_users",
self._populate_user_directory_process_users,
self._populate_user_directory_process_local_users,
)
self.db_pool.updates.register_background_update_handler(
"populate_user_directory_process_remote_users",
self._populate_user_directory_process_remote_users,
)
self.db_pool.updates.register_background_update_handler(
"populate_user_directory_cleanup", self._populate_user_directory_cleanup
)

@staticmethod
def _delete_staging_area(txn: LoggingTransaction) -> None:
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
txn.execute(
"DROP TABLE IF EXISTS " + TEMP_TABLE + "_remote_users_needing_lookup"
)
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")

async def _populate_user_directory_createtables(
self, progress: JsonDict, batch_size: int
) -> int:
# Get all the rooms that we want to process.
def _make_staging_area(txn: LoggingTransaction) -> None:
# Clear out any tables if they already exist beforehand.
UserDirectoryBackgroundUpdateStore._delete_staging_area(txn)

sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
Expand Down Expand Up @@ -142,6 +161,18 @@ def _make_staging_area(txn: LoggingTransaction) -> None:
txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
)

# A table for storing a list of remote users that *may* need a remote
# lookup in order to obtain a public profile.
# The list should be compared against the user directory's cache
# to see whether any queries can be skipped because the remote user
# also appeared in a public room.
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_remote_users_needing_lookup(user_id TEXT PRIMARY KEY NOT NULL)"
)
txn.execute(sql)

new_pos = await self.get_max_stream_id_in_current_state_deltas()
await self.db_pool.runInteraction(
"populate_user_directory_temp_build", _make_staging_area
Expand All @@ -168,13 +199,9 @@ async def _populate_user_directory_cleanup(
)
await self.update_user_directory_stream_pos(position)

def _delete_staging_area(txn: LoggingTransaction) -> None:
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")

await self.db_pool.runInteraction(
"populate_user_directory_cleanup", _delete_staging_area
"populate_user_directory_cleanup",
UserDirectoryBackgroundUpdateStore._delete_staging_area,
)

await self.db_pool.updates._end_background_update(
Expand Down Expand Up @@ -262,10 +289,17 @@ def _get_next_batch(
or await self.should_include_local_user_in_dir(user_id)
}

# Determine whether the room is public
is_public = await self.is_room_world_readable_or_publicly_joinable(
room_id
)

remote_users_to_query_later = set()

# Upsert a user_directory record for each remote user we see.
for user_id, profile in users_with_profile.items():
# Local users are processed separately in
# `_populate_user_directory_users`; there we can read from
# `_populate_user_directory_local_users`; there we can read from
# the `profiles` table to ensure we don't leak their per-room
# profiles. It also means we write local users to this table
# exactly once, rather than once for every room they're in.
Expand All @@ -274,14 +308,29 @@ def _get_next_batch(
# TODO `users_with_profile` above reads from the `user_directory`
# table, meaning that `profile` is bespoke to this room.
# and this leaks remote users' per-room profiles to the user directory.
await self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
if is_public:
# If this is a public room, it's acceptable to add the profile
# into the user directory.
await self.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)
else:
# Otherwise query the user at a later time
remote_users_to_query_later.add(user_id)

# (insert the remote users needing a query in batch;
# use upsert with no values for 'INSERT OR IGNORE' semantics)
await self.db_pool.simple_upsert_many(
f"{TEMP_TABLE}_remote_users_needing_lookup",
("user_id",),
[(u,) for u in remote_users_to_query_later],
(),
(),
desc="populate_user_directory_queue_remote_needing_lookup",
)
del remote_users_to_query_later

# Now update the room sharing tables to include this room.
is_public = await self.is_room_world_readable_or_publicly_joinable(
room_id
)
if is_public:
if users_with_profile:
await self.add_users_in_public_rooms(
Expand Down Expand Up @@ -336,7 +385,7 @@ def _get_next_batch(

return processed_event_count

async def _populate_user_directory_process_users(
async def _populate_user_directory_process_local_users(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Expand Down Expand Up @@ -404,6 +453,114 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:

return len(users_to_work_on)

async def _populate_user_directory_process_remote_users(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Sorts through the `_remote_users_needing_lookup` table and adds the
users within to the list of stale remote profiles,
unless we already populated a user directory entry for them (i.e. they were
also in a public room).
"""

def _get_next_batch_txn(
txn: LoggingTransaction, done_up_to_user_id: str
) -> Optional[str]:
"""
Given the last user ID we've processed,
Returns
- a user ID to process up to and including; or
- `None` if there is no limit left (i.e. we should just process all
remaining rows).
"""
# Should be a B-Tree index only scan: so reasonably efficient despite the
# OFFSET
# If we're lucky, will also warm up the disk cache for the subsequent query
# that actually does some work.
txn.execute(
f"""
SELECT user_id
FROM {TEMP_TABLE}_remote_users_needing_lookup
WHERE user_id > ?
ORDER BY user_id
LIMIT 1 OFFSET ?
""",
(done_up_to_user_id, batch_size),
)
row = txn.fetchone()
if row:
return row[0]
else:
return None

def _add_private_only_users_to_stale_profile_refresh_queue_txn(
txn: LoggingTransaction, from_exc: str, until_inc: Optional[str]
) -> None:
end_condition = "AND user_id <= ?" if until_inc is not None else ""
end_args = (until_inc,) if until_inc is not None else ()

user_id_serverpart: str
if isinstance(self.database_engine, PostgresEngine):
user_id_serverpart = (
"SUBSTRING(user_id FROM POSITION(':' IN user_id) + 1)"
)
elif isinstance(self.database_engine, Sqlite3Engine):
user_id_serverpart = "SUBSTR(user_id, INSTR(user_id, ':') + 1)"
else:
raise RuntimeError("Unknown database engine!")

txn.execute(
f"""
INSERT INTO user_directory_stale_remote_users
(user_id, next_try_at_ts, retry_counter, user_server_name)
SELECT
user_id, 0, 0, {user_id_serverpart}
FROM {TEMP_TABLE}_remote_users_needing_lookup AS runl
LEFT JOIN user_directory AS ud USING (user_id)
WHERE ud.user_id IS NULL
AND ? < user_id {end_condition}
""",
(from_exc,) + end_args,
)

def _do_txn(txn: LoggingTransaction) -> None:
"""
Does a step of background update.
"""
last_user_id = progress.get("last_user_id", "@")
next_end_limit_inc = _get_next_batch_txn(txn, last_user_id)
_add_private_only_users_to_stale_profile_refresh_queue_txn(
txn, last_user_id, next_end_limit_inc
)

# Update the progress
progress["last_user_id"] = next_end_limit_inc
self.db_pool.updates._background_update_progress_txn(
txn, "populate_user_directory_process_remote_users", progress
)

if progress.get("last_user_id", "@") is None:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_remote_users"
)

# Now kick off querying remote homeservers for profile information.
if self.hs.config.worker.should_update_user_directory:
self.hs.get_user_directory_handler().kick_off_remote_profile_refresh_process()
else:
command_handler = self.hs.get_replication_command_handler()
command_handler.send_command(
ReadyToRefreshStaleUserDirectoryProfilesCommand("")
)

return 1

await self.db_pool.runInteraction(
"populate_user_directory_process_remote_users",
_do_txn,
)
return batch_size

async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
Is this user one of them?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Rebuild the user directory in light of the fix for leaking the per-room
-- profiles of remote users to the user directory.

-- First cancel any existing rebuilds if already pending; we'll run from fresh.
DELETE FROM background_updates WHERE update_name IN (
'populate_user_directory_createtables',
'populate_user_directory_process_rooms',
'populate_user_directory_process_users',
'populate_user_directory_process_remote_users',
'populate_user_directory_cleanup'
);

-- Then schedule the steps.
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
-- Set up user directory staging tables.
(7402, 'populate_user_directory_createtables', '{}', NULL),
-- Run through each room and update the user directory according to who is in it.
(7402, 'populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables'),
-- Insert all users into the user directory, if search_all_users is on.
(7402, 'populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms'),
-- Insert remote users into the queue for fetching.
(7402, 'populate_user_directory_process_remote_users', '{}', 'populate_user_directory_process_users'),
-- Clean up user directory staging tables.
(7402, 'populate_user_directory_cleanup', '{}', 'populate_user_directory_process_remote_users')
ON CONFLICT (update_name) DO NOTHING;
7 changes: 4 additions & 3 deletions tests/handlers/test_user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,9 +1128,10 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.user_dir_handler = hs.get_user_directory_handler()
self.profile_handler = hs.get_profile_handler()

# Cancel the startup call: in the steady-state case we can't rely on it anyway.
assert self.user_dir_handler._refresh_remote_profiles_call_later is not None
self.user_dir_handler._refresh_remote_profiles_call_later.cancel()
if self.user_dir_handler._refresh_remote_profiles_call_later is not None:
# Cancel the startup call: in the steady-state case we can't rely on
# it anyway.
self.user_dir_handler._refresh_remote_profiles_call_later.cancel()

def test_public_rooms_have_profiles_collected(self) -> None:
"""
Expand Down