From eeb2f9e546060ca9f2ef7260220b51d85d9b0d92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 11:51:01 +0100 Subject: [PATCH 01/34] Add user_directory to database --- synapse/handlers/user_directory.py | 218 ++++++++++++++++++++ synapse/notifier.py | 6 +- synapse/server.py | 5 + synapse/storage/__init__.py | 2 + synapse/storage/schema/delta/42/user_dir.py | 69 +++++++ synapse/storage/user_directory.py | 145 +++++++++++++ 6 files changed, 444 insertions(+), 1 deletion(-) create mode 100644 synapse/handlers/user_directory.py create mode 100644 synapse/storage/schema/delta/42/user_dir.py create mode 100644 synapse/storage/user_directory.py diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py new file mode 100644 index 000000000000..43e917c1a0b3 --- /dev/null +++ b/synapse/handlers/user_directory.py @@ -0,0 +1,218 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.storage.roommember import ProfileInfo +from synapse.util.metrics import Measure + + +logger = logging.getLogger(__name__) + + +class UserDirectoyHandler(object): + def __init__(self, hs): + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.server_name = hs.hostname + self.clock = hs.get_clock() + + self.initially_handled_users = set() + + self.pos = None + + self._is_processing = False + + @defer.inlineCallbacks + def notify_new_event(self): + if self._is_processing: + return + + self._is_processing = True + try: + yield self._unsafe_process() + finally: + self._is_processing = False + + @defer.inlineCallbacks + def _unsafe_process(self): + if self.pos is None: + self.pos = yield self.store.get_user_directory_stream_pos() + + if self.pos is None: + yield self._do_initial_spam() + self.pos = yield self.store.get_user_directory_stream_pos() + + while True: + with Measure(self.clock, "user_dir_delta"): + deltas = yield self.store.get_current_state_deltas(self.pos) + if not deltas: + return + + yield self._handle_deltas(deltas) + + max_stream_id = deltas[-1]["stream_id"] + yield self.store.update_user_directory_stream_pos(max_stream_id) + + @defer.inlineCallbacks + def _handle_room(self, room_id): + # TODO: Check we're still joined to room + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + if not is_public: + return + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users + + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) + + self.initially_handled_users |= unhandled_users + + @defer.inlineCallbacks + def _do_initial_spam(self): + yield self.store.delete_all_from_user_dir() + + room_ids = yield self.store.get_all_rooms() + + for room_id in room_ids: + yield self._handle_room(room_id) + + self.initially_handled_users = None + + yield self.store.update_user_directory_stream_pos(-1) + + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + row = yield self.store.get_user_in_directory(user_id) + if row: + return + + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + def _handle_remove_user(self, room_id, user_id): + row = yield self.store.get_user_in_directory(user_id) + if not row or row["room_id"] != room_id: + return + + # TODO: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if is_public: + return + + yield self.store.remove_from_user_dir(user_id) + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + if typ == EventTypes.RoomHistoryVisibility: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="history_visibility", + public_value="world_readable", + ) + if change is None: + continue + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + if change: + yield self._handle_new_user(room_id, user_id, profile) + else: + yield self._handle_remove_user(room_id, user_id) + elif typ == EventTypes.JoinRules: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="join_rules", + public_value=JoinRules.PUBLIC, + ) + if change is None: + continue + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + if change: + yield self._handle_new_user(room_id, user_id, profile) + else: + yield self._handle_remove_user(room_id, user_id) + elif typ == EventTypes.Member: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + + if change is None: + continue + + if change: + event = yield self.store.get_event(event_id) + profile = ProfileInfo( + avatar_url=event.content.get("avatar_url"), + display_name=event.content.get("displayname"), + ) + + yield self._handle_new_user(room_id, state_key, profile) + else: + yield self._handle_remove_user(room_id, state_key) + + @defer.inlineCallbacks + def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + prev_event = None + event = None + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + + if event_id: + event = yield self.store.get_event(event_id, allow_none=True) + + if not event and not prev_event: + defer.returnValue(None) + + prev_hist_vis = None + hist_vis = None + + if prev_event: + prev_hist_vis = prev_event.content.get(key_name, None) + + if event: + hist_vis = event.content.get(key_name, None) + + logger.info("prev: %r, new: %r", prev_hist_vis, hist_vis) + + if hist_vis == public_value and prev_hist_vis != public_value: + defer.returnValue(True) + elif hist_vis != public_value and prev_hist_vis == public_value: + defer.returnValue(False) + else: + defer.returnValue(None) diff --git a/synapse/notifier.py b/synapse/notifier.py index 48566187ab1c..6b1709d700d4 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -167,6 +167,7 @@ def __init__(self, hs): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.user_directory_handler = hs.get_user_directory_handler() if hs.should_send_federation(): self.federation_sender = hs.get_federation_sender() @@ -251,7 +252,10 @@ def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. preserve_fn(self.appservice_handler.notify_interested_services)( - room_stream_id) + room_stream_id + ) + + preserve_fn(self.user_directory_handler.notify_new_event)() if self.federation_sender: preserve_fn(self.federation_sender.notify_new_events)( diff --git a/synapse/server.py b/synapse/server.py index e400e278c6d1..a38e5179e0e3 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -49,6 +49,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.read_marker import ReadMarkerHandler +from synapse.handlers.user_directory import UserDirectoyHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier @@ -137,6 +138,7 @@ def build_DEPENDENCY(self) 'tcp_replication', 'read_marker_handler', 'action_generator', + 'user_directory_handler', ] def __init__(self, hostname, **kwargs): @@ -304,6 +306,9 @@ def build_tcp_replication(self): def build_action_generator(self): return ActionGenerator(self) + def build_user_directory_handler(self): + return UserDirectoyHandler(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d604e7668f10..11655bf60feb 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -49,6 +49,7 @@ from .account_data import AccountDataStore from .openid import OpenIdStore from .client_ips import ClientIpStore +from .user_directory import UserDirectoryStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator from .engines import PostgresEngine @@ -86,6 +87,7 @@ class DataStore(RoomMemberStore, RoomStore, ClientIpStore, DeviceStore, DeviceInboxStore, + UserDirectoryStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py new file mode 100644 index 000000000000..38538960a48f --- /dev/null +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -0,0 +1,69 @@ +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from synapse.storage.prepare_database import get_statements +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + +logger = logging.getLogger(__name__) + + +BOTH_TABLES = """ +CREATE TABLE user_directory_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT, + CHECK (Lock='X') +); + +INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); +""" + + +POSTGRES_TABLE = """ +CREATE TABLE user_directory ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + display_name TEXT, + avatar_url TEXT, + vector tsvector +); + +CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory + USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + for statement in get_statements(BOTH_TABLES.splitlines()): + cur.execute(statement) + + if isinstance(database_engine, PostgresEngine): + for statement in get_statements(POSTGRES_TABLE.splitlines()): + cur.execute(statement) + elif isinstance(database_engine, Sqlite3Engine): + for statement in get_statements(SQLITE_TABLE.splitlines()): + cur.execute(statement) + else: + raise Exception("Unrecognized database engine") + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py new file mode 100644 index 000000000000..6c7c8c4beec3 --- /dev/null +++ b/synapse/storage/user_directory.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.api.constants import EventTypes, JoinRules +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + + +class UserDirectoryStore(SQLBaseStore): + + @cachedInlineCallbacks(cache_context=True) + def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + current_state_ids = yield self.get_current_state_ids( + room_id, on_invalidate=cache_context.invalidate + ) + + join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) + if join_rules_id: + join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) + if join_rule_ev: + if join_rule_ev.content.get("join_rules") == JoinRules.PUBLIC: + defer.returnValue(True) + + hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) + if hist_vis_id: + hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True) + if hist_vis_ev: + if hist_vis_ev.content.get("history_visibility") == "world_readable": + defer.returnValue(True) + + defer.returnValue(False) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + if isinstance(self.database_engine, PostgresEngine): + sql = """ + INSERT INTO user_directory + (user_id, room_id, display_name, avatar_url, vector) + VALUES (?,?,?,?,to_tsvector('english', ?)) + """ + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + INSERT INTO user_directory + (user_id, room_id, display_name, avatar_url, value) + VALUES (?,?,?,?,?) + """ + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + def _add_profiles_to_user_dir_txn(txn): + txn.executemany(sql, ( + ( + user_id, room_id, p.display_name, p.avatar_url, + "%s %s" % (user_id, p.display_name,) if p.display_name else user_id + ) + for user_id, p in users_with_profile.iteritems() + )) + for user_id in users_with_profile: + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + + return self.runInteraction( + "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn + ) + + @defer.inlineCallbacks + def remove_from_user_dir(self, user_id): + yield self._simple_delete( + table="user_directory", + keyvalues={"user_id": user_id}, + desc="remove_from_user_dir", + ) + self.get_user_in_directory.invalidate((user_id,)) + + def get_all_rooms(self): + return self._simple_select_onecol( + table="current_state_events", + keyvalues={}, + retcol="DISTINCT room_id", + desc="get_all_rooms", + ) + + def delete_all_from_user_dir(self): + def _delete_all_from_user_dir_txn(txn): + txn.execute("DELETE FROM user_directory") + txn.call_after(self.get_user_in_directory.invalidate_all) + return self.runInteraction( + "delete_all_from_user_dir", _delete_all_from_user_dir_txn + ) + + @cached() + def get_user_in_directory(self, user_id): + return self._simple_select_one( + table="user_directory", + keyvalues={"user_id": user_id}, + retcols=("room_id", "display_name", "avatar_url",), + allow_none=True, + desc="get_user_in_directory", + ) + + def get_user_directory_stream_pos(self): + return self._simple_select_one_onecol( + table="user_directory_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="get_user_directory_stream_pos", + ) + + def update_user_directory_stream_pos(self, stream_id): + return self._simple_update_one( + table="user_directory_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_user_directory_stream_pos", + ) + + def get_current_state_deltas(self, prev_stream_id): + # TODO: Add stream change cache + # TODO: Add limit + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE stream_id > ? + ORDER BY stream_id ASC + """ + + return self._execute( + "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id + ) From 42137efde7aeb350e203fe19b5a661c2b27f208f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 11:55:13 +0100 Subject: [PATCH 02/34] Don't go round in circles --- synapse/handlers/user_directory.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 43e917c1a0b3..4b1b7df745b6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -66,8 +66,8 @@ def _unsafe_process(self): yield self._handle_deltas(deltas) - max_stream_id = deltas[-1]["stream_id"] - yield self.store.update_user_directory_stream_pos(max_stream_id) + self.pos = deltas[-1]["stream_id"] + yield self.store.update_user_directory_stream_pos(self.pos) @defer.inlineCallbacks def _handle_room(self, room_id): @@ -208,8 +208,6 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value): if event: hist_vis = event.content.get(key_name, None) - logger.info("prev: %r, new: %r", prev_hist_vis, hist_vis) - if hist_vis == public_value and prev_hist_vis != public_value: defer.returnValue(True) elif hist_vis != public_value and prev_hist_vis == public_value: From 3e123b84977f84f4c60ebafadda9b381baa9d00f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 11:56:27 +0100 Subject: [PATCH 03/34] Start later --- synapse/handlers/user_directory.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4b1b7df745b6..87c467f096bb 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -90,6 +90,9 @@ def _handle_room(self, room_id): @defer.inlineCallbacks def _do_initial_spam(self): + # TODO: pull from current delta stream_id + new_pos = self.store.get_room_max_stream_ordering() + yield self.store.delete_all_from_user_dir() room_ids = yield self.store.get_all_rooms() @@ -99,7 +102,7 @@ def _do_initial_spam(self): self.initially_handled_users = None - yield self.store.update_user_directory_stream_pos(-1) + yield self.store.update_user_directory_stream_pos(new_pos) @defer.inlineCallbacks def _handle_new_user(self, room_id, user_id, profile): From 168524543f70f7c3dc113b21fc704af65d832bf8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 11:59:36 +0100 Subject: [PATCH 04/34] Add call later --- synapse/handlers/user_directory.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 87c467f096bb..e9488ce554dd 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -38,6 +38,8 @@ def __init__(self, hs): self._is_processing = False + self.clock.call_later(0, self.notify_new_event) + @defer.inlineCallbacks def notify_new_event(self): if self._is_processing: From b5db4ed5f68ee81557393e94436d768b955b1aa0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 13:30:12 +0100 Subject: [PATCH 05/34] Update room column when room becomes unpublic --- synapse/handlers/user_directory.py | 23 +++++++++++++++++++++-- synapse/storage/user_directory.py | 10 ++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index e9488ce554dd..0cf403f59964 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -121,12 +121,13 @@ def _handle_remove_user(self, room_id, user_id): # TODO: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) - for room_id in rooms: + for j_room_id in rooms: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id + j_room_id ) if is_public: + yield self.store.update_user_in_user_dir(user_id, j_room_id) return yield self.store.remove_from_user_dir(user_id) @@ -149,6 +150,15 @@ def _handle_deltas(self, deltas): if change is None: continue + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if change and is_public: + continue + elif not change and not is_public: + continue + users_with_profile = yield self.state.get_current_user_in_room(room_id) for user_id, profile in users_with_profile.iteritems(): if change: @@ -164,6 +174,15 @@ def _handle_deltas(self, deltas): if change is None: continue + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if change and is_public: + continue + elif not change and not is_public: + continue + users_with_profile = yield self.state.get_current_user_in_room(room_id) for user_id, profile in users_with_profile.iteritems(): if change: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 6c7c8c4beec3..d72b93b58509 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -79,6 +79,16 @@ def _add_profiles_to_user_dir_txn(txn): "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn ) + @defer.inlineCallbacks + def update_user_in_user_dir(self, user_id, room_id): + yield self._simple_update_one( + table="user_directory", + keyvalues={"user_id": user_id}, + updatevalues={"room_id": room_id}, + desc="update_user_in_user_dir", + ) + self.get_user_in_directory.invalidate((user_id,)) + @defer.inlineCallbacks def remove_from_user_dir(self, user_id): yield self._simple_delete( From 3b5f22ca40303392e45c8407952ecf3ee15785f6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 14:00:01 +0100 Subject: [PATCH 06/34] Add search --- synapse/handlers/user_directory.py | 3 +++ synapse/storage/user_directory.py | 35 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 0cf403f59964..4a9565df93bc 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -40,6 +40,9 @@ def __init__(self, hs): self.clock.call_later(0, self.notify_new_event) + def search_users(self, search_term, limit): + return self.store.search_user_dir(search_term, limit) + @defer.inlineCallbacks def notify_new_event(self): if self._is_processing: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index d72b93b58509..650c49982d25 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -153,3 +153,38 @@ def get_current_state_deltas(self, prev_stream_id): return self._execute( "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id ) + + @defer.inlineCallbacks + def search_user_dir(self, search_term, limit): + if isinstance(self.database_engine, PostgresEngine): + sql = """ + SELECT user_id, display_name, avatar_url + FROM user_directory + WHERE vector @@ to_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + LIMIT ? + """ + args = (search_term, search_term, limit + 1,) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = """ + SELECT user_id, display_name, avatar_url + FROM user_directory + WHERE value MATCH ? + ORDER BY rank(matchinfo(user_directory)) DESC + LIMIT ? + """ + args = (search_term, limit + 1) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + results = yield self._execute( + "search_user_dir", self.cursor_to_dict, sql, *args + ) + + limited = len(results) > limit + + defer.returnValue({ + "limited": limited, + "results": results, + }) From 45a5df59147d9c5c4f2cdacaf92179e5935cd68a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 14:11:55 +0100 Subject: [PATCH 07/34] Add REST API --- synapse/rest/__init__.py | 2 + .../rest/client/v2_alpha/user_directory.py | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 synapse/rest/client/v2_alpha/user_directory.py diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index aa8d874f962d..3d809d181bf8 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -51,6 +51,7 @@ devices, thirdparty, sendtodevice, + user_directory, ) from synapse.http.server import JsonResource @@ -100,3 +101,4 @@ def register_servlets(client_resource, hs): devices.register_servlets(hs, client_resource) thirdparty.register_servlets(hs, client_resource) sendtodevice.register_servlets(hs, client_resource) + user_directory.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py new file mode 100644 index 000000000000..f1bae0b34da6 --- /dev/null +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from ._base import client_v2_patterns + +logger = logging.getLogger(__name__) + + +class UserDirectorySearchRestServlet(RestServlet): + PATTERNS = client_v2_patterns("/user_directory/search$") + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(UserDirectorySearchRestServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.user_directory_handler = hs.get_user_directory_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + yield self.auth.get_user_by_req(request, allow_guest=False) + body = parse_json_object_from_request(request) + + limit = body.get("limit", 10) + limit = min(limit, 50) + + try: + search_term = body["search_term"] + except: + raise SynapseError(400, "`search_term` is required field") + + results = yield self.user_directory_handler.search_users(search_term, limit) + + defer.returnValue((200, results)) + + +def register_servlets(hs, http_server): + UserDirectorySearchRestServlet(hs).register(http_server) From 535c99f157a76b7d2a27393ce62268d0cef4abef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 14:15:45 +0100 Subject: [PATCH 08/34] Use POST --- synapse/rest/client/v2_alpha/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index f1bae0b34da6..fe9120719538 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -38,7 +38,7 @@ def __init__(self, hs): self.user_directory_handler = hs.get_user_directory_handler() @defer.inlineCallbacks - def on_GET(self, request): + def on_POST(self, request): yield self.auth.get_user_by_req(request, allow_guest=False) body = parse_json_object_from_request(request) From 293ef296559fa5bb721592bfa9605f7282df0f6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 14:29:32 +0100 Subject: [PATCH 09/34] Weight differently --- synapse/storage/user_directory.py | 34 ++++++++++++++++++++++--------- synapse/types.py | 7 +++++++ 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 650c49982d25..ebcc8b963313 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -19,6 +19,7 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.types import get_domain_from_id, get_localpart_from_id class UserDirectoryStore(SQLBaseStore): @@ -50,26 +51,39 @@ def add_profiles_to_user_dir(self, room_id, users_with_profile): sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, vector) - VALUES (?,?,?,?,to_tsvector('english', ?)) + VALUES (?,?,?,?, + setweight(to_tsvector('english', ?), 'A') + || to_tsvector('english', ?) + || to_tsvector('english', COALESCE(?, '')) + ) """ + args = ( + ( + user_id, room_id, p.display_name, p.avatar_url, + get_localpart_from_id(user_id), get_domain_from_id(user_id), + p.display_name, + ) + for user_id, p in users_with_profile.iteritems() + ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, value) VALUES (?,?,?,?,?) """ - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - - def _add_profiles_to_user_dir_txn(txn): - txn.executemany(sql, ( + args = ( ( user_id, room_id, p.display_name, p.avatar_url, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) for user_id, p in users_with_profile.iteritems() - )) + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + def _add_profiles_to_user_dir_txn(txn): + txn.executemany(sql, args) for user_id in users_with_profile: txn.call_after( self.get_user_in_directory.invalidate, (user_id,) @@ -160,8 +174,8 @@ def search_user_dir(self, search_term, limit): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory - WHERE vector @@ to_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + WHERE vector @@ plainto_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? """ args = (search_term, search_term, limit + 1,) diff --git a/synapse/types.py b/synapse/types.py index 445bdcb4d7eb..111948540d37 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -62,6 +62,13 @@ def get_domain_from_id(string): return string[idx + 1:] +def get_localpart_from_id(string): + idx = string.find(":") + if idx == -1: + raise SynapseError(400, "Invalid ID: %r" % (string,)) + return string[1:idx] + + class DomainSpecificString( namedtuple("DomainSpecificString", ("localpart", "domain")) ): From 63fda37e20015f0fe56aed86f907035d42fdc2ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:00:29 +0100 Subject: [PATCH 10/34] Add comments --- synapse/handlers/user_directory.py | 161 +++++++++++++----- .../rest/client/v2_alpha/user_directory.py | 16 ++ synapse/storage/schema/delta/42/user_dir.py | 2 +- synapse/storage/user_directory.py | 39 ++++- 4 files changed, 173 insertions(+), 45 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4a9565df93bc..88b79e332575 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -26,25 +26,54 @@ class UserDirectoyHandler(object): + """Handles querying of and keeping updated the user_directory. + + N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY + """ + def __init__(self, hs): self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() + # When start up for the first time we need to populate the user_directory. + # This is a set of user_id's we've inserted already self.initially_handled_users = set() + # The current position in the current_state_delta stream self.pos = None + # Guard to ensure we only process deltas one at a time self._is_processing = False + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory self.clock.call_later(0, self.notify_new_event) def search_users(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ return self.store.search_user_dir(search_term, limit) @defer.inlineCallbacks def notify_new_event(self): + """Called when there may be more deltas to process + """ if self._is_processing: return @@ -56,13 +85,16 @@ def notify_new_event(self): @defer.inlineCallbacks def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB if self.pos is None: self.pos = yield self.store.get_user_directory_stream_pos() + # If still None then we need to do the initial fill of directory if self.pos is None: yield self._do_initial_spam() self.pos = yield self.store.get_user_directory_stream_pos() + # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "user_dir_delta"): deltas = yield self.store.get_current_state_deltas(self.pos) @@ -74,69 +106,53 @@ def _unsafe_process(self): self.pos = deltas[-1]["stream_id"] yield self.store.update_user_directory_stream_pos(self.pos) - @defer.inlineCallbacks - def _handle_room(self, room_id): - # TODO: Check we're still joined to room - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - unhandled_users = set(users_with_profile) - self.initially_handled_users - - yield self.store.add_profiles_to_user_dir( - room_id, { - user_id: users_with_profile[user_id] for user_id in unhandled_users - } - ) - - self.initially_handled_users |= unhandled_users - @defer.inlineCallbacks def _do_initial_spam(self): + """Populates the user_directory from the current state of the DB, used + when synapse first starts with user_directory support + """ + # TODO: pull from current delta stream_id new_pos = self.store.get_room_max_stream_ordering() + # Delete any existing entries just in case there are any yield self.store.delete_all_from_user_dir() + # We process by going through each existing room at a time. room_ids = yield self.store.get_all_rooms() for room_id in room_ids: - yield self._handle_room(room_id) + yield self._handle_intial_room(room_id) self.initially_handled_users = None yield self.store.update_user_directory_stream_pos(new_pos) @defer.inlineCallbacks - def _handle_new_user(self, room_id, user_id, profile): - row = yield self.store.get_user_in_directory(user_id) - if row: - return - - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + def _handle_intial_room(self, room_id): + """Called when we initially fill out user_directory one room at a time + """ + # TODO: Check we're still joined to room - def _handle_remove_user(self, room_id, user_id): - row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) + if not is_public: return - # TODO: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) + users_with_profile = yield self.state.get_current_user_in_room(room_id) + unhandled_users = set(users_with_profile) - self.initially_handled_users - if is_public: - yield self.store.update_user_in_user_dir(user_id, j_room_id) - return + yield self.store.add_profiles_to_user_dir( + room_id, { + user_id: users_with_profile[user_id] for user_id in unhandled_users + } + ) - yield self.store.remove_from_user_dir(user_id) + self.initially_handled_users |= unhandled_users @defer.inlineCallbacks def _handle_deltas(self, deltas): + """Called with the state deltas to process + """ for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -144,22 +160,33 @@ def _handle_deltas(self, deltas): event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] + # For join rule and visibility changes we need to check if the room + # may have become public or not and add/remove the users in said room if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( prev_event_id, event_id, key_name="history_visibility", public_value="world_readable", ) + + # If change is None, no change. True => become world readable, + # False => was world readable if change is None: continue + # There's been a change to or from being world readable. + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id ) - if change and is_public: + if change and not is_public: + # If we became world readable but room isn't currently public then + # we ignore the change continue - elif not change and not is_public: + elif not change and is_public: + # If we stopped being world readable but are still public, + # ignore the change continue users_with_profile = yield self.state.get_current_user_in_room(room_id) @@ -213,8 +240,60 @@ def _handle_deltas(self, deltas): else: yield self._handle_remove_user(room_id, state_key) + @defer.inlineCallbacks + def _handle_new_user(self, room_id, user_id, profile): + """Called when we might need to add user to directory + + Args: + room_id (str): room_id that user joined or started being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if row: + return + + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + def _handle_remove_user(self, room_id, user_id): + """Called when we might need to remove user to directory + + Args: + room_id (str): room_id that user left or stopped being public that + user_id (str) + """ + row = yield self.store.get_user_in_directory(user_id) + if not row or row["room_id"] != room_id: + # Either the user wasn't in directory or we're still in a room that + # is public (i.e. the room_id in the database) + return + + # TODO: Make this faster? + rooms = yield self.store.get_rooms_for_user(user_id) + for j_room_id in rooms: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_user_dir(user_id, j_room_id) + return + + yield self.store.remove_from_user_dir(user_id) + @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` o + neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + Falsse if it did match `public_value` but now doesn't + """ prev_event = None event = None if prev_event_id: diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py index fe9120719538..17d3dffc8f53 100644 --- a/synapse/rest/client/v2_alpha/user_directory.py +++ b/synapse/rest/client/v2_alpha/user_directory.py @@ -39,6 +39,22 @@ def __init__(self, hs): @defer.inlineCallbacks def on_POST(self, request): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ yield self.auth.get_user_by_req(request, allow_guest=False) body = parse_json_object_from_request(request) diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 38538960a48f..57b89ba552e7 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -34,7 +34,7 @@ POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, + room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, avatar_url TEXT, vector tsvector diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ebcc8b963313..83812bf092b7 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -26,6 +26,8 @@ class UserDirectoryStore(SQLBaseStore): @cachedInlineCallbacks(cache_context=True) def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + """Check if the room is either world_readable or publically joinable + """ current_state_ids = yield self.get_current_state_ids( room_id, on_invalidate=cache_context.invalidate ) @@ -47,14 +49,24 @@ def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): defer.returnValue(False) def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are in that is world_readable + or publically joinable + users_with_profile (dict): Users to add to directory in the form of + mapping of user_id -> ProfileInfo + """ if isinstance(self.database_engine, PostgresEngine): + # We weight the loclpart most highly, then display name and finally + # server name sql = """ INSERT INTO user_directory (user_id, room_id, display_name, avatar_url, vector) VALUES (?,?,?,?, setweight(to_tsvector('english', ?), 'A') - || to_tsvector('english', ?) - || to_tsvector('english', COALESCE(?, '')) + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') ) """ args = ( @@ -113,6 +125,8 @@ def remove_from_user_dir(self, user_id): self.get_user_in_directory.invalidate((user_id,)) def get_all_rooms(self): + """Get all room_ids we've ever known about + """ return self._simple_select_onecol( table="current_state_events", keyvalues={}, @@ -121,6 +135,8 @@ def get_all_rooms(self): ) def delete_all_from_user_dir(self): + """Delete the entire user directory + """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.call_after(self.get_user_in_directory.invalidate_all) @@ -170,12 +186,29 @@ def get_current_state_deltas(self, prev_stream_id): @defer.inlineCallbacks def search_user_dir(self, search_term, limit): + """Searches for users in directory + + Returns: + dict of the form:: + + { + "limited": , # whether there were more results or not + "results": [ # Ordered by best match first + { + "user_id": , + "display_name": , + "avatar_url": + } + ] + } + """ + if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory WHERE vector @@ plainto_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC + ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? """ args = (search_term, search_term, limit + 1,) From 350622a107c356da630eba09b63ed4b6de94b198 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:11:36 +0100 Subject: [PATCH 11/34] Handle the server leaving a public room --- synapse/handlers/user_directory.py | 23 ++++++++++++++++++--- synapse/state.py | 11 ++++++++++ synapse/storage/schema/delta/42/user_dir.py | 4 ++++ synapse/storage/user_directory.py | 11 ++++++++++ 4 files changed, 46 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 88b79e332575..4e491a43e693 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -132,7 +132,9 @@ def _do_initial_spam(self): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - # TODO: Check we're still joined to room + is_in_room = yield self.store.get_is_host_in_room(room_id, self.server_name) + if not is_in_room: + return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) if not is_public: @@ -229,7 +231,22 @@ def _handle_deltas(self, deltas): if change is None: continue - if change: + if not change: + # Need to check if the server left the room entirely, if so + # we might need to remove all the users in that room + is_in_room = yield self.store.get_is_host_in_room( + room_id, self.server_name, + ) + if not is_in_room: + # Fetch all the users that we marked as being in user + # directory due to being in the room and then check if + # need to remove those users or not + user_ids = yield self.store.get_users_in_dir_due_to_room(room_id) + for user_id in user_ids: + yield self._handle_remove_user(room_id, user_id) + return + + if change: # The user joined event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -237,7 +254,7 @@ def _handle_deltas(self, deltas): ) yield self._handle_new_user(room_id, state_key, profile) - else: + else: # The user left yield self._handle_remove_user(room_id, state_key) @defer.inlineCallbacks diff --git a/synapse/state.py b/synapse/state.py index 02fee47f397a..dffa79e4c933 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -186,6 +186,17 @@ def get_current_hosts_in_room(self, room_id, latest_event_ids=None): ) defer.returnValue(joined_hosts) + @defer.inlineCallbacks + def get_is_host_in_room(self, room_id, host, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.debug("calling resolve_state_groups from get_is_host_in_room") + entry = yield self.resolve_state_groups(room_id, latest_event_ids) + is_host_joined = yield self.store.is_host_joined( + room_id, host, entry.state_id, entry.state + ) + defer.returnValue(is_host_joined) + @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 57b89ba552e7..95a7a79fd328 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -41,6 +41,7 @@ ); CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); +CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,6 +49,9 @@ SQLITE_TABLE = """ CREATE VIRTUAL TABLE user_directory USING fts4 ( user_id, room_id, display_name, avatar_url, value ); + +CREATE INDEX user_directory_room_idx ON user_directory(room_id); +CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 83812bf092b7..0df979cb010a 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -124,6 +124,17 @@ def remove_from_user_dir(self, user_id): ) self.get_user_in_directory.invalidate((user_id,)) + def get_users_in_dir_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="user_directory", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + def get_all_rooms(self): """Get all room_ids we've ever known about """ From dc51af3d031030fdf553f8478f7930596f2694f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:13:49 +0100 Subject: [PATCH 12/34] Pull max id from correct table --- synapse/handlers/user_directory.py | 6 ++---- synapse/storage/user_directory.py | 8 ++++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 4e491a43e693..8331f6422edb 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -111,9 +111,7 @@ def _do_initial_spam(self): """Populates the user_directory from the current state of the DB, used when synapse first starts with user_directory support """ - - # TODO: pull from current delta stream_id - new_pos = self.store.get_room_max_stream_ordering() + new_pos = yield self.store.get_max_stream_id_in_current_state_deltas() # Delete any existing entries just in case there are any yield self.store.delete_all_from_user_dir() @@ -284,7 +282,7 @@ def _handle_remove_user(self, room_id, user_id): # is public (i.e. the room_id in the database) return - # TODO: Make this faster? + # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: is_public = yield self.store.is_room_world_readable_or_publicly_joinable( diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 0df979cb010a..011c711ec126 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -195,6 +195,14 @@ def get_current_state_deltas(self, prev_stream_id): "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id ) + def get_max_stream_id_in_current_state_deltas(self): + return self._simple_select_one_onecol( + table="current_state_delta_stream", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), -1)", + desc="get_max_stream_id_in_current_state_deltas", + ) + @defer.inlineCallbacks def search_user_dir(self, search_term, limit): """Searches for users in directory From 5d79d728f5f38463171f0d063713905f8cb9faec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:23:49 +0100 Subject: [PATCH 13/34] Split out directory and search tables --- synapse/storage/schema/delta/42/user_dir.py | 25 +++++---- synapse/storage/user_directory.py | 60 ++++++++++++++------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 95a7a79fd328..7e3266292801 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -28,30 +28,33 @@ ); INSERT INTO user_directory_stream_pos (stream_id) VALUES (null); -""" - -POSTGRES_TABLE = """ CREATE TABLE user_directory ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, -- A room_id that we know is public display_name TEXT, - avatar_url TEXT, - vector tsvector + avatar_url TEXT ); -CREATE INDEX user_directory_fts_idx ON user_directory USING gin(vector); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE INDEX user_directory_user_idx ON user_directory(user_id); """ -SQLITE_TABLE = """ -CREATE VIRTUAL TABLE user_directory - USING fts4 ( user_id, room_id, display_name, avatar_url, value ); +POSTGRES_TABLE = """ +CREATE TABLE user_directory_search ( + user_id TEXT NOT NULL, + vector tsvector +); -CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); +CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +""" + + +SQLITE_TABLE = """ +CREATE VIRTUAL TABLE user_directory_search + USING fts4 ( user_id, value ); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 011c711ec126..b1957cb873cc 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -61,9 +61,8 @@ def add_profiles_to_user_dir(self, room_id, users_with_profile): # We weight the loclpart most highly, then display name and finally # server name sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, vector) - VALUES (?,?,?,?, + INSERT INTO user_directory_search(user_id, vector) + VALUES (?, setweight(to_tsvector('english', ?), 'A') || setweight(to_tsvector('english', ?), 'D') || setweight(to_tsvector('english', COALESCE(?, '')), 'B') @@ -71,21 +70,19 @@ def add_profiles_to_user_dir(self, room_id, users_with_profile): """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, - get_localpart_from_id(user_id), get_domain_from_id(user_id), - p.display_name, + user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), + profile.display_name, ) - for user_id, p in users_with_profile.iteritems() + for user_id, profile in users_with_profile.iteritems() ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ - INSERT INTO user_directory - (user_id, room_id, display_name, avatar_url, value) - VALUES (?,?,?,?,?) + INSERT INTO user_directory_search(user_id, value) + VALUES (?,?) """ args = ( ( - user_id, room_id, p.display_name, p.avatar_url, + user_id, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) for user_id, p in users_with_profile.iteritems() @@ -96,6 +93,19 @@ def add_profiles_to_user_dir(self, room_id, users_with_profile): def _add_profiles_to_user_dir_txn(txn): txn.executemany(sql, args) + self._simple_insert_many_txn( + txn, + table="user_directory", + values=[ + { + "user_id": user_id, + "room_id": room_id, + "display_name": profile.display_name, + "avatar_url": profile.avatar_url, + } + for user_id, profile in users_with_profile.iteritems() + ] + ) for user_id in users_with_profile: txn.call_after( self.get_user_in_directory.invalidate, (user_id,) @@ -117,12 +127,23 @@ def update_user_in_user_dir(self, user_id, room_id): @defer.inlineCallbacks def remove_from_user_dir(self, user_id): - yield self._simple_delete( - table="user_directory", - keyvalues={"user_id": user_id}, - desc="remove_from_user_dir", + def _remove_from_user_dir_txn(txn): + self._simple_delete_txn( + txn, + table="user_directory", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="user_directory_search", + keyvalues={"user_id": user_id}, + ) + txn.call_after( + self.get_user_in_directory.invalidate, (user_id,) + ) + return self.runInteraction( + "remove_from_user_dir", _remove_from_user_dir_txn, ) - self.get_user_in_directory.invalidate((user_id,)) def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're @@ -150,6 +171,7 @@ def delete_all_from_user_dir(self): """ def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") + txn.execute("DELETE FROM user_directory_search") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -225,7 +247,8 @@ def search_user_dir(self, search_term, limit): if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE vector @@ plainto_tsquery('english', ?) ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC LIMIT ? @@ -234,7 +257,8 @@ def search_user_dir(self, search_term, limit): elif isinstance(self.database_engine, Sqlite3Engine): sql = """ SELECT user_id, display_name, avatar_url - FROM user_directory + FROM user_directory_search + INNER JOIN user_directory USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC LIMIT ? From 304880d18545b59a51c5d4b928e563c6d1514fdc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:46:36 +0100 Subject: [PATCH 14/34] Add stream change cache --- synapse/storage/__init__.py | 12 ++++++++++++ synapse/storage/events.py | 4 ++++ synapse/storage/user_directory.py | 4 +++- synapse/util/caches/stream_change_cache.py | 15 +++++++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 11655bf60feb..3c88ba98609e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -223,6 +223,18 @@ def __init__(self, db_conn, hs): "DeviceListFederationStreamChangeCache", device_list_max, ) + curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + db_conn, "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dfb57f9d12d5..77861488d291 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -755,6 +755,10 @@ def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): ] ) + self._curr_state_delta_stream_cache.enttity_has_changed( + room_id, max_stream_order, + ) + # Invalidate the various caches # Figure out the changes of membership to invalidate the diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index b1957cb873cc..15b8ea046075 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -204,7 +204,9 @@ def update_user_directory_stream_pos(self, stream_id): ) def get_current_state_deltas(self, prev_stream_id): - # TODO: Add stream change cache + if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): + return [] + # TODO: Add limit sql = """ SELECT stream_id, room_id, type, state_key, event_id, prev_event_id diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 70fe00ce0b31..c498aee46c06 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -89,6 +89,21 @@ def get_entities_changed(self, entities, stream_pos): return result + def has_any_entity_changed(self, stream_pos): + """Returns if any entity has changed + """ + assert type(stream_pos) is int + + if stream_pos >= self._earliest_known_stream_pos: + self.metrics.inc_hits() + if stream_pos >= max(self._cache): + return False + else: + return True + else: + self.metrics.inc_misses() + return True + def get_all_entities_changed(self, stream_pos): """Returns all entites that have had new things since the given position. If the position is too old it will return None. From 63c58c2a3fced42c254da1c1ae5e55a977b7141c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:17:58 +0100 Subject: [PATCH 15/34] Limit number of things we fetch out of the db --- synapse/storage/user_directory.py | 39 ++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 15b8ea046075..9137fc24eade 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -207,16 +207,37 @@ def get_current_state_deltas(self, prev_stream_id): if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): return [] - # TODO: Add limit - sql = """ - SELECT stream_id, room_id, type, state_key, event_id, prev_event_id - FROM current_state_delta_stream - WHERE stream_id > ? - ORDER BY stream_id ASC - """ + def get_current_state_deltas_txn(txn): + # First we calculate the max stream id that will give us less than + # N results + sql = """ + SELECT stream_id, count(*) + FROM current_state_delta_stream + WHERE stream_id > ? + GROUP BY stream_id + ORDER BY stream_id ASC + LIMIT 100 + """ + txn.execute(sql, (prev_stream_id,)) + + total = 0 + for max_stream_id, count in txn: + total += count + if total > 50: + break - return self._execute( - "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id + # Now actually get the deltas + sql = """ + SELECT stream_id, room_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute(sql, (prev_stream_id, max_stream_id,)) + return self.cursor_to_dict(txn) + + return self.runInteraction( + "get_current_state_deltas", get_current_state_deltas_txn ) def get_max_stream_id_in_current_state_deltas(self): From 4abcff0177768c43eb64ed7784ca8ebf30f3435c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:22:36 +0100 Subject: [PATCH 16/34] Fix typo --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 77861488d291..528f19eb8704 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -755,7 +755,7 @@ def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): ] ) - self._curr_state_delta_stream_cache.enttity_has_changed( + self._curr_state_delta_stream_cache.entity_has_changed( room_id, max_stream_order, ) From f0910617111fe81b79777a2c597b1e4240d61a9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:34:40 +0100 Subject: [PATCH 17/34] Fix tests --- synapse/handlers/user_directory.py | 4 ++-- synapse/storage/user_directory.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 8331f6422edb..75f259ee4e77 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -130,7 +130,7 @@ def _do_initial_spam(self): def _handle_intial_room(self, room_id): """Called when we initially fill out user_directory one room at a time """ - is_in_room = yield self.store.get_is_host_in_room(room_id, self.server_name) + is_in_room = yield self.state.get_is_host_in_room(room_id, self.server_name) if not is_in_room: return @@ -232,7 +232,7 @@ def _handle_deltas(self, deltas): if not change: # Need to check if the server left the room entirely, if so # we might need to remove all the users in that room - is_in_room = yield self.store.get_is_host_in_room( + is_in_room = yield self.state.get_is_host_in_room( room_id, self.server_name, ) if not is_in_room: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 9137fc24eade..348064436f27 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -221,6 +221,7 @@ def get_current_state_deltas_txn(txn): txn.execute(sql, (prev_stream_id,)) total = 0 + max_stream_id = prev_stream_id for max_stream_id, count in txn: total += count if total > 50: From f9791498ae2ee267aef6965bd04998d7d1bb8d43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 16:50:57 +0100 Subject: [PATCH 18/34] Typos --- synapse/handlers/user_directory.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 75f259ee4e77..7f8da1a876c3 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -304,10 +304,10 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value): `shared` to `world_readable` (`public_value`). Returns: - None if the field in the events either both match `public_value` o + None if the field in the events either both match `public_value` neither do, i.e. there has been no change. True if it didnt match `public_value` but now does - Falsse if it did match `public_value` but now doesn't + False if it did match `public_value` but now doesn't """ prev_event = None event = None @@ -320,18 +320,18 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value): if not event and not prev_event: defer.returnValue(None) - prev_hist_vis = None - hist_vis = None + prev_value = None + value = None if prev_event: - prev_hist_vis = prev_event.content.get(key_name, None) + prev_value = prev_event.content.get(key_name, None) if event: - hist_vis = event.content.get(key_name, None) + value = event.content.get(key_name, None) - if hist_vis == public_value and prev_hist_vis != public_value: + if value == public_value and prev_value != public_value: defer.returnValue(True) - elif hist_vis != public_value and prev_hist_vis == public_value: + elif value != public_value and prev_value == public_value: defer.returnValue(False) else: defer.returnValue(None) From b2d8d0710912ab25e327f03bfea69b3c8333b2c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:00:24 +0100 Subject: [PATCH 19/34] Lifts things into separate function --- synapse/handlers/user_directory.py | 110 ++++++++++++++--------------- 1 file changed, 54 insertions(+), 56 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 7f8da1a876c3..48d3c48181d3 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -162,63 +162,10 @@ def _handle_deltas(self, deltas): # For join rule and visibility changes we need to check if the room # may have become public or not and add/remove the users in said room - if typ == EventTypes.RoomHistoryVisibility: - change = yield self._get_key_change( - prev_event_id, event_id, - key_name="history_visibility", - public_value="world_readable", - ) - - # If change is None, no change. True => become world readable, - # False => was world readable - if change is None: - continue - - # There's been a change to or from being world readable. - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - - if change and not is_public: - # If we became world readable but room isn't currently public then - # we ignore the change - continue - elif not change and is_public: - # If we stopped being world readable but are still public, - # ignore the change - continue - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): - if change: - yield self._handle_new_user(room_id, user_id, profile) - else: - yield self._handle_remove_user(room_id, user_id) - elif typ == EventTypes.JoinRules: - change = yield self._get_key_change( - prev_event_id, event_id, - key_name="join_rules", - public_value=JoinRules.PUBLIC, + if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): + yield self._handle_room_publicity_change( + room_id, prev_event_id, event_id, typ, ) - if change is None: - continue - - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - - if change and is_public: - continue - elif not change and not is_public: - continue - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): - if change: - yield self._handle_new_user(room_id, user_id, profile) - else: - yield self._handle_remove_user(room_id, user_id) elif typ == EventTypes.Member: change = yield self._get_key_change( prev_event_id, event_id, @@ -255,6 +202,57 @@ def _handle_deltas(self, deltas): else: # The user left yield self._handle_remove_user(room_id, state_key) + def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): + """Handle a room having potentially changed from/to world_readable/publically + joinable. + + Args: + room_id (str) + prev_event_id (str|None): The previous event before the state change + event_id (str|None): The new event after the state change + typ (str): Type of the event + """ + if typ == EventTypes.RoomHistoryVisibility: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="history_visibility", + public_value="world_readable", + ) + elif typ == EventTypes.JoinRules: + change = yield self._get_key_change( + prev_event_id, event_id, + key_name="join_rules", + public_value=JoinRules.PUBLIC, + ) + else: + raise Exception("Invalid event type") + # If change is None, no change. True => become world_readable/public, + # False => was world_readable/public + if change is None: + return + + # There's been a change to or from being world readable. + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if change and not is_public: + # If we became world readable but room isn't currently public then + # we ignore the change + return + elif not change and is_public: + # If we stopped being world readable but are still public, + # ignore the change + return + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): + if change: + yield self._handle_new_user(room_id, user_id, profile) + else: + yield self._handle_remove_user(room_id, user_id) + @defer.inlineCallbacks def _handle_new_user(self, room_id, user_id, profile): """Called when we might need to add user to directory From f1378aef9199390ae0130cc6bda5c7f4fa7a2e33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:03:08 +0100 Subject: [PATCH 20/34] Convert to int --- synapse/storage/user_directory.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 348064436f27..71b05026467a 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -204,6 +204,7 @@ def update_user_directory_stream_pos(self, stream_id): ) def get_current_state_deltas(self, prev_stream_id): + prev_stream_id = int(prev_stream_id) if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): return [] From cc7609aa9fe525cc3096c87df2a7f6d090d500ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:11:18 +0100 Subject: [PATCH 21/34] Comment briefly on how we keep user_directory up to date --- synapse/handlers/user_directory.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 48d3c48181d3..7130cc6ee3f7 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -29,6 +29,16 @@ class UserDirectoyHandler(object): """Handles querying of and keeping updated the user_directory. N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY + + The user directory is filled with users who this server can see are joined to a + world_readable or publically joinable room. We keep a database table up to date + by streaming changes of the current state and recalculating whether users should + be in the directory or not when necessary. + + For each user in the directory we also store a room_id which is public and that the + user is joined to. This allows us to ignore history_visibility and join_rules changes + for that user in all other public rooms, as we know they'll still be in at least + one public room. """ def __init__(self, hs): From 5dd1b2c525cb786614d1503757bf52a7086f3bf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:29:12 +0100 Subject: [PATCH 22/34] Use unique indices --- synapse/storage/schema/delta/42/user_dir.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index 7e3266292801..c34aa5e7d214 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -37,7 +37,7 @@ ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); -CREATE INDEX user_directory_user_idx ON user_directory(user_id); +CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); """ @@ -48,7 +48,7 @@ ); CREATE INDEX user_directory_search_fts_idx ON user_directory_search USING gin(vector); -CREATE INDEX user_directory_search_user_idx ON user_directory_search(user_id); +CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search(user_id); """ From f5cc22bdc63e58857f435227b70d145d07aabb77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 17:30:26 +0100 Subject: [PATCH 23/34] Comment on why arbitrary comments --- synapse/storage/user_directory.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 71b05026467a..2e9175f50ad4 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -210,7 +210,9 @@ def get_current_state_deltas(self, prev_stream_id): def get_current_state_deltas_txn(txn): # First we calculate the max stream id that will give us less than - # N results + # N results. + # We arbitarily limit to 100 stream_id entries to ensure we don't + # select toooo many. sql = """ SELECT stream_id, count(*) FROM current_state_delta_stream @@ -225,7 +227,9 @@ def get_current_state_deltas_txn(txn): max_stream_id = prev_stream_id for max_stream_id, count in txn: total += count - if total > 50: + if total > 100: + # We arbitarily limit to 100 entries to ensure we don't + # select toooo many. break # Now actually get the deltas From a757dd4863d0a467becf5b73ca15eafeb3c2823c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 18:07:12 +0100 Subject: [PATCH 24/34] Use prefix matching --- synapse/storage/user_directory.py | 34 ++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 2e9175f50ad4..ca2be9daf251 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -21,6 +21,8 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id +import re + class UserDirectoryStore(SQLBaseStore): @@ -272,17 +274,17 @@ def search_user_dir(self, search_term, limit): ] } """ - + search_query = _parse_query(self.database_engine, search_term) if isinstance(self.database_engine, PostgresEngine): sql = """ SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) - WHERE vector @@ plainto_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, plainto_tsquery('english', ?)) DESC + WHERE vector @@ to_tsquery('english', ?) + ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC LIMIT ? """ - args = (search_term, search_term, limit + 1,) + args = (search_query, search_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ SELECT user_id, display_name, avatar_url @@ -292,7 +294,7 @@ def search_user_dir(self, search_term, limit): ORDER BY rank(matchinfo(user_directory)) DESC LIMIT ? """ - args = (search_term, limit + 1) + args = (search_query, limit + 1) else: # This should be unreachable. raise Exception("Unrecognized database engine") @@ -307,3 +309,25 @@ def search_user_dir(self, search_term, limit): "limited": limited, "results": results, }) + + +def _parse_query(database_engine, search_term): + """Takes a plain unicode string from the user and converts it into a form + that can be passed to database. + We use this so that we can add prefix matching, which isn't something + that is supported by default. + + We specifically add both a prefix and non prefix matching term so that + exact matches get ranked higher. + """ + + # Pull out the individual words, discarding any non-word characters. + results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + if isinstance(database_engine, PostgresEngine): + return " & ".join("%s:* & %s" % (result, result,) for result in results) + elif isinstance(database_engine, Sqlite3Engine): + return " & ".join("%s* & %s" % (result, result,) for result in results) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") From 036362ede6cadc4d6f289dbcabfc5e06d370a587 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 18:17:47 +0100 Subject: [PATCH 25/34] Order by if they have profile info --- synapse/storage/user_directory.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ca2be9daf251..79161f2745a5 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -274,14 +274,20 @@ def search_user_dir(self, search_term, limit): ] } """ + search_query = _parse_query(self.database_engine, search_term) + if isinstance(self.database_engine, PostgresEngine): + # We order by rank and then if they have profile info sql = """ SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) WHERE vector @@ to_tsquery('english', ?) - ORDER BY ts_rank_cd(vector, to_tsquery('english', ?)) DESC + ORDER BY + ts_rank_cd(vector, to_tsquery('english', ?)) DESC, + display_name IS NULL, + avatar_url IS NULL LIMIT ? """ args = (search_query, search_query, limit + 1,) @@ -291,7 +297,10 @@ def search_user_dir(self, search_term, limit): FROM user_directory_search INNER JOIN user_directory USING (user_id) WHERE value MATCH ? - ORDER BY rank(matchinfo(user_directory)) DESC + ORDER BY + rank(matchinfo(user_directory)) DESC, + display_name IS NULL, + avatar_url IS NULL LIMIT ? """ args = (search_query, limit + 1) From 0fe6f3c521498fc92c58c49d8edcc6984471da08 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 11:09:49 +0100 Subject: [PATCH 26/34] Bug fixes and logging - Check if room is public when a user joins before adding to user dir - Fix typo of field name "content.join_rules" -> "content.join_rule" --- synapse/handlers/user_directory.py | 22 +++++++++++++++++++++- synapse/storage/user_directory.py | 2 +- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 7130cc6ee3f7..130ff45ec525 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -170,6 +170,8 @@ def _handle_deltas(self, deltas): event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + # For join rule and visibility changes we need to check if the room # may have become public or not and add/remove the users in said room if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): @@ -201,7 +203,14 @@ def _handle_deltas(self, deltas): yield self._handle_remove_user(room_id, user_id) return + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + if change: # The user joined + if not is_public: + return + event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -211,7 +220,10 @@ def _handle_deltas(self, deltas): yield self._handle_new_user(room_id, state_key, profile) else: # The user left yield self._handle_remove_user(room_id, state_key) + else: + logger.debug("Ignoring irrelevant type: %r", typ) + @defer.inlineCallbacks def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): """Handle a room having potentially changed from/to world_readable/publically joinable. @@ -222,6 +234,8 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): event_id (str|None): The new event after the state change typ (str): Type of the event """ + logger.debug("Handling change for %s", typ) + if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( prev_event_id, event_id, @@ -231,7 +245,7 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): elif typ == EventTypes.JoinRules: change = yield self._get_key_change( prev_event_id, event_id, - key_name="join_rules", + key_name="join_rule", public_value=JoinRules.PUBLIC, ) else: @@ -239,6 +253,7 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): # If change is None, no change. True => become world_readable/public, # False => was world_readable/public if change is None: + logger.debug("No change") return # There's been a change to or from being world readable. @@ -247,6 +262,8 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): room_id ) + logger.debug("Change: %r, is_public: %r", change, is_public) + if change and not is_public: # If we became world readable but room isn't currently public then # we ignore the change @@ -326,6 +343,7 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value): event = yield self.store.get_event(event_id, allow_none=True) if not event and not prev_event: + logger.debug("Neither event exists: %r %r", prev_event_id, event_id) defer.returnValue(None) prev_value = None @@ -337,6 +355,8 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value): if event: value = event.content.get(key_name, None) + logger.debug("prev_value: %r -> value: %r", prev_value, value) + if value == public_value and prev_value != public_value: defer.returnValue(True) elif value != public_value and prev_value == public_value: diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 79161f2745a5..7323d783ac1c 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -38,7 +38,7 @@ def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): if join_rules_id: join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) if join_rule_ev: - if join_rule_ev.content.get("join_rules") == JoinRules.PUBLIC: + if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC: defer.returnValue(True) hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) From 9c7db2491bd051ca733d9556620ba23ceb52918a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 11:36:50 +0100 Subject: [PATCH 27/34] Fix removing users --- synapse/handlers/user_directory.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 130ff45ec525..85efd61d38a0 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -294,6 +294,7 @@ def _handle_new_user(self, room_id, user_id, profile): yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): """Called when we might need to remove user to directory @@ -301,10 +302,13 @@ def _handle_remove_user(self, room_id, user_id): room_id (str): room_id that user left or stopped being public that user_id (str) """ + logger.debug("Maybe removing user %r", user_id) + row = yield self.store.get_user_in_directory(user_id) if not row or row["room_id"] != room_id: # Either the user wasn't in directory or we're still in a room that # is public (i.e. the room_id in the database) + logger.debug("Not removing as row: %r", row) return # XXX: Make this faster? @@ -316,6 +320,7 @@ def _handle_remove_user(self, room_id, user_id): if is_public: yield self.store.update_user_in_user_dir(user_id, j_room_id) + logger.debug("Not removing as found other public room: %r", j_room_id) return yield self.store.remove_from_user_dir(user_id) From 59dbb470654ff812975f888a3ec41537916091ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 11:41:29 +0100 Subject: [PATCH 28/34] Remove spurious inlineCallbacks --- synapse/storage/user_directory.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 7323d783ac1c..a251aee46560 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -127,7 +127,6 @@ def update_user_in_user_dir(self, user_id, room_id): ) self.get_user_in_directory.invalidate((user_id,)) - @defer.inlineCallbacks def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): self._simple_delete_txn( From 8be6fd95a3a0f9d2924650ede4d19c1c22da8cd4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 13:05:39 +0100 Subject: [PATCH 29/34] Check if host is still in room --- synapse/handlers/user_directory.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 85efd61d38a0..83715e5ffe0c 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -195,6 +195,7 @@ def _handle_deltas(self, deltas): room_id, self.server_name, ) if not is_in_room: + logger.debug("Server left room: %r", room_id) # Fetch all the users that we marked as being in user # directory due to being in the room and then check if # need to remove those users or not @@ -202,6 +203,8 @@ def _handle_deltas(self, deltas): for user_id in user_ids: yield self._handle_remove_user(room_id, user_id) return + else: + logger.debug("Server is still in room: %r", room_id) is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id @@ -288,6 +291,7 @@ def _handle_new_user(self, room_id, user_id, profile): room_id (str): room_id that user joined or started being public that user_id (str) """ + logger.debug("Adding user to dir, %r", user_id) row = yield self.store.get_user_in_directory(user_id) if row: return @@ -314,6 +318,13 @@ def _handle_remove_user(self, room_id, user_id): # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: + is_in_room = yield self.state.get_is_host_in_room( + j_room_id, self.server_name, + ) + + if not is_in_room: + continue + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( j_room_id ) From 7233341eac0ad0a25dacf913d4a54d25994ea185 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 13:11:38 +0100 Subject: [PATCH 30/34] Comments --- synapse/handlers/user_directory.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 83715e5ffe0c..a8525fc86aad 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -346,7 +346,7 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value): Returns: None if the field in the events either both match `public_value` - neither do, i.e. there has been no change. + or if neither do, i.e. there has been no change. True if it didnt match `public_value` but now does False if it did match `public_value` but now doesn't """ @@ -366,10 +366,10 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value): value = None if prev_event: - prev_value = prev_event.content.get(key_name, None) + prev_value = prev_event.content.get(key_name) if event: - value = event.content.get(key_name, None) + value = event.content.get(key_name) logger.debug("prev_value: %r -> value: %r", prev_value, value) From 02a6108235610304b981939bd2c74ae7f36dd929 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 13:16:40 +0100 Subject: [PATCH 31/34] Tweak search query --- synapse/storage/user_directory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index a251aee46560..c2ea261289b6 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -333,9 +333,9 @@ def _parse_query(database_engine, search_term): results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) if isinstance(database_engine, PostgresEngine): - return " & ".join("%s:* & %s" % (result, result,) for result in results) + return " & ".join("(%s:* | %s)" % (result, result,) for result in results) elif isinstance(database_engine, Sqlite3Engine): - return " & ".join("%s* & %s" % (result, result,) for result in results) + return " & ".join("(%s* | %s)" % (result, result,) for result in results) else: # This should be unreachable. raise Exception("Unrecognized database engine") From d5477c7afd884f200c55a1c6a187983756f49577 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 13:27:28 +0100 Subject: [PATCH 32/34] Tweak search query --- synapse/storage/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index c2ea261289b6..4fe30ce72e2c 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -284,7 +284,7 @@ def search_user_dir(self, search_term, limit): INNER JOIN user_directory USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY - ts_rank_cd(vector, to_tsquery('english', ?)) DESC, + ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ? From 21e255a8f1948c2fd298ce2e037d20bdd25f2f69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 14:50:46 +0100 Subject: [PATCH 33/34] Split the table in two --- synapse/handlers/user_directory.py | 77 +++++++++++++-------- synapse/storage/_base.py | 5 ++ synapse/storage/schema/delta/42/user_dir.py | 10 ++- synapse/storage/user_directory.py | 77 ++++++++++++++++++++- 4 files changed, 138 insertions(+), 31 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a8525fc86aad..d795a9f8d5c8 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -50,6 +50,7 @@ def __init__(self, hs): # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() + self.initially_handled_users_in_public = set() # The current position in the current_state_delta stream self.pos = None @@ -145,8 +146,6 @@ def _handle_intial_room(self, room_id): return is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) - if not is_public: - return users_with_profile = yield self.state.get_current_user_in_room(room_id) unhandled_users = set(users_with_profile) - self.initially_handled_users @@ -159,6 +158,13 @@ def _handle_intial_room(self, room_id): self.initially_handled_users |= unhandled_users + if is_public: + yield self.store.add_users_to_public_room( + room_id, + user_ids=unhandled_users - self.initially_handled_users_in_public + ) + self.initially_handled_users_in_public != unhandled_users + @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process @@ -206,14 +212,7 @@ def _handle_deltas(self, deltas): else: logger.debug("Server is still in room: %r", room_id) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - room_id - ) - if change: # The user joined - if not is_public: - return - event = yield self.store.get_event(event_id) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), @@ -276,11 +275,13 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): # ignore the change return - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): - if change: + if change: + users_with_profile = yield self.state.get_current_user_in_room(room_id) + for user_id, profile in users_with_profile.iteritems(): yield self._handle_new_user(room_id, user_id, profile) - else: + else: + users = yield self.store.get_users_in_public_due_to_room(room_id) + for user_id in users: yield self._handle_remove_user(room_id, user_id) @defer.inlineCallbacks @@ -292,11 +293,21 @@ def _handle_new_user(self, room_id, user_id, profile): user_id (str) """ logger.debug("Adding user to dir, %r", user_id) + row = yield self.store.get_user_in_directory(user_id) - if row: + if not row: + yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if not is_public: return - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + row = yield self.store.get_user_in_public_room(user_id) + if not row: + yield self.store.add_users_to_public_room(room_id, [user_id]) @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): @@ -309,15 +320,20 @@ def _handle_remove_user(self, room_id, user_id): logger.debug("Maybe removing user %r", user_id) row = yield self.store.get_user_in_directory(user_id) - if not row or row["room_id"] != room_id: - # Either the user wasn't in directory or we're still in a room that - # is public (i.e. the room_id in the database) - logger.debug("Not removing as row: %r", row) + update_user_dir = row and row["room_id"] == room_id + + row = yield self.store.get_user_in_public_room(user_id) + update_user_in_public = row and row["room_id"] == room_id + + if not update_user_in_public and not update_user_dir: return # XXX: Make this faster? rooms = yield self.store.get_rooms_for_user(user_id) for j_room_id in rooms: + if not update_user_in_public and not update_user_dir: + break + is_in_room = yield self.state.get_is_host_in_room( j_room_id, self.server_name, ) @@ -325,16 +341,23 @@ def _handle_remove_user(self, room_id, user_id): if not is_in_room: continue - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) - - if is_public: + if update_user_dir: + update_user_dir = False yield self.store.update_user_in_user_dir(user_id, j_room_id) - logger.debug("Not removing as found other public room: %r", j_room_id) - return - yield self.store.remove_from_user_dir(user_id) + if update_user_in_public: + is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + j_room_id + ) + + if is_public: + yield self.store.update_user_in_public_user_list(user_id, j_room_id) + update_user_in_public = False + + if update_user_dir: + yield self.store.remove_from_user_dir(user_id) + elif update_user_in_public: + yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks def _get_key_change(self, prev_event_id, event_id, key_name, public_value): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 58b73af7d2b6..db816346f53d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -425,6 +425,11 @@ def _simple_insert_txn(txn, table, values): txn.execute(sql, vals) + def _simple_insert_many(self, table, values, desc): + return self.runInteraction( + desc, self._simple_insert_many_txn, table, values + ) + @staticmethod def _simple_insert_many_txn(txn, table, values): if not values: diff --git a/synapse/storage/schema/delta/42/user_dir.py b/synapse/storage/schema/delta/42/user_dir.py index c34aa5e7d214..ea6a18196da7 100644 --- a/synapse/storage/schema/delta/42/user_dir.py +++ b/synapse/storage/schema/delta/42/user_dir.py @@ -31,13 +31,21 @@ CREATE TABLE user_directory ( user_id TEXT NOT NULL, - room_id TEXT NOT NULL, -- A room_id that we know is public + room_id TEXT NOT NULL, -- A room_id that we know the user is joined to display_name TEXT, avatar_url TEXT ); CREATE INDEX user_directory_room_idx ON user_directory(room_id); CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id); + +CREATE TABLE users_in_pubic_room ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL -- A room_id that we know is public +); + +CREATE INDEX users_in_pubic_room_room_idx ON users_in_pubic_room(room_id); +CREATE UNIQUE INDEX users_in_pubic_room_user_idx ON users_in_pubic_room(user_id); """ diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 4fe30ce72e2c..cab0afc5c39c 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -50,12 +50,34 @@ def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): defer.returnValue(False) - def add_profiles_to_user_dir(self, room_id, users_with_profile): - """Add profiles to the user directory + @defer.inlineCallbacks + def add_users_to_public_room(self, room_id, user_ids): + """Add user to the list of users in public rooms Args: room_id (str): A room_id that all users are in that is world_readable or publically joinable + user_ids (list(str)): Users to add + """ + yield self._simple_insert_many( + table="users_in_pubic_room", + values=[ + { + "user_id": user_id, + "room_id": room_id, + } + for user_id in user_ids + ], + desc="add_users_to_public_room" + ) + for user_id in user_ids: + self.get_user_in_public_room.invalidate((user_id,)) + + def add_profiles_to_user_dir(self, room_id, users_with_profile): + """Add profiles to the user directory + + Args: + room_id (str): A room_id that all users are joined to users_with_profile (dict): Users to add to directory in the form of mapping of user_id -> ProfileInfo """ @@ -125,7 +147,15 @@ def update_user_in_user_dir(self, user_id, room_id): updatevalues={"room_id": room_id}, desc="update_user_in_user_dir", ) - self.get_user_in_directory.invalidate((user_id,)) + + @defer.inlineCallbacks + def update_user_in_public_user_list(self, user_id, room_id): + yield self._simple_update_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + updatevalues={"room_id": room_id}, + desc="update_user_in_public_user_list", + ) def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): @@ -139,13 +169,41 @@ def _remove_from_user_dir_txn(txn): table="user_directory_search", keyvalues={"user_id": user_id}, ) + self._simple_delete_txn( + txn, + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + ) txn.call_after( self.get_user_in_directory.invalidate, (user_id,) ) + txn.call_after( + self.get_user_in_public_room.invalidate, (user_id,) + ) return self.runInteraction( "remove_from_user_dir", _remove_from_user_dir_txn, ) + @defer.inlineCallbacks + def remove_from_user_in_public_room(self, user_id): + yield self._simple_delete( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + desc="remove_from_user_in_public_room", + ) + self.get_user_in_public_room.invalidate((user_id,)) + + def get_users_in_public_due_to_room(self, room_id): + """Get all user_ids that are in the room directory becuase they're + in the given room_id + """ + return self._simple_select_onecol( + table="users_in_pubic_room", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_public_due_to_room", + ) + def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory becuase they're in the given room_id @@ -173,6 +231,7 @@ def delete_all_from_user_dir(self): def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") + txn.execute("DELETE FROM users_in_pubic_room") txn.call_after(self.get_user_in_directory.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -188,6 +247,16 @@ def get_user_in_directory(self, user_id): desc="get_user_in_directory", ) + @cached() + def get_user_in_public_room(self, user_id): + return self._simple_select_one( + table="users_in_pubic_room", + keyvalues={"user_id": user_id}, + retcols=("room_id",), + allow_none=True, + desc="get_user_in_public_room", + ) + def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -282,6 +351,7 @@ def search_user_dir(self, search_term, limit): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE vector @@ to_tsquery('english', ?) ORDER BY ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC, @@ -295,6 +365,7 @@ def search_user_dir(self, search_term, limit): SELECT user_id, display_name, avatar_url FROM user_directory_search INNER JOIN user_directory USING (user_id) + INNER JOIN users_in_pubic_room USING (user_id) WHERE value MATCH ? ORDER BY rank(matchinfo(user_directory)) DESC, From 4d039aa2ca78730ca5f8bb9043ab75328004d7a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jun 2017 14:58:48 +0100 Subject: [PATCH 34/34] Fix sqlite --- synapse/storage/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index cab0afc5c39c..bcf24fa4d026 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -368,7 +368,7 @@ def search_user_dir(self, search_term, limit): INNER JOIN users_in_pubic_room USING (user_id) WHERE value MATCH ? ORDER BY - rank(matchinfo(user_directory)) DESC, + rank(matchinfo(user_directory_search)) DESC, display_name IS NULL, avatar_url IS NULL LIMIT ?