-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Rewrite userdir to be faster #4537
Changes from 57 commits
07b82a2
f993fd6
e8dd750
cf079f3
e818649
35b33d1
c123c71
1b271f2
ee98058
994243e
b845be4
487bdc0
e7e94d7
be4f84b
84a0240
060c5fb
766b86d
1b3bc5b
cf03ec7
c3b168f
bd66799
2b3f166
c6a68b4
f21dcc7
b11de34
829fb4a
2cd5abc
e224f9c
ebe8bb5
aa13be6
3c4d418
ff78918
4716266
d3e216a
75174bb
f385f75
a88c1d6
28239aa
a689842
06a93b0
fab3b33
2574889
a50de76
960b3f0
d64fbc6
7c5b66e
59334f5
42d0d3e
8110905
d0d8a6e
be857fd
8e64d86
e652138
43b71b3
6c46504
95ed0fa
385a075
1e7d2a4
eed6db8
2bf1d6a
72e74f4
1335416
c628aaa
843d287
f35b3cf
8e963dd
2847e65
ea0cc09
765c458
a8c48b5
3408e8d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
|
||
import logging | ||
|
||
from six import iteritems | ||
from six import iteritems, iterkeys | ||
|
||
from twisted.internet import defer | ||
|
||
|
@@ -51,14 +51,14 @@ class UserDirectoryHandler(object): | |
INITIAL_USER_SLEEP_MS = 10 | ||
|
||
def __init__(self, hs): | ||
self.hs = hs | ||
self.store = hs.get_datastore() | ||
self.state = hs.get_state_handler() | ||
self.server_name = hs.hostname | ||
self.clock = hs.get_clock() | ||
self.notifier = hs.get_notifier() | ||
self.is_mine_id = hs.is_mine_id | ||
self.update_user_directory = hs.config.update_user_directory | ||
self.search_all_users = hs.config.user_directory_search_all_users | ||
|
||
# When start up for the first time we need to populate the user_directory. | ||
# This is a set of user_id's we've inserted already | ||
|
@@ -140,7 +140,6 @@ def handle_user_deactivated(self, user_id): | |
# FIXME(#3714): We should probably do this in the same worker as all | ||
# the other changes. | ||
yield self.store.remove_from_user_dir(user_id) | ||
yield self.store.remove_from_user_in_public_room(user_id) | ||
|
||
@defer.inlineCallbacks | ||
def _unsafe_process(self): | ||
|
@@ -196,7 +195,7 @@ def _do_initial_spam(self): | |
|
||
logger.info("Processed all rooms.") | ||
|
||
if self.search_all_users: | ||
if self.hs.config.user_directory_search_all_users: | ||
num_processed_users = 0 | ||
user_ids = yield self.store.get_all_local_users() | ||
logger.info( | ||
|
@@ -216,8 +215,6 @@ def _do_initial_spam(self): | |
|
||
self.initially_handled_users = None | ||
self.initially_handled_users_in_public = None | ||
hawkowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.initially_handled_users_share = None | ||
self.initially_handled_users_share_private_room = None | ||
|
||
yield self.store.update_user_directory_stream_pos(new_pos) | ||
|
||
|
@@ -238,23 +235,15 @@ def _handle_initial_room(self, room_id): | |
unhandled_users = user_ids - self.initially_handled_users | ||
|
||
yield self.store.add_profiles_to_user_dir( | ||
room_id, | ||
{user_id: users_with_profile[user_id] for user_id in unhandled_users}, | ||
) | ||
|
||
self.initially_handled_users |= unhandled_users | ||
|
||
if is_public: | ||
yield self.store.add_users_to_public_room( | ||
room_id, user_ids=user_ids - self.initially_handled_users_in_public | ||
) | ||
self.initially_handled_users_in_public |= user_ids | ||
hawkowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# We now go and figure out the new users who share rooms with user entries | ||
# We sleep aggressively here as otherwise it can starve resources. | ||
# We also batch up inserts/updates, but try to avoid too many at once. | ||
to_insert = set() | ||
to_update = set() | ||
count = 0 | ||
for user_id in user_ids: | ||
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: | ||
|
@@ -277,44 +266,18 @@ def _handle_initial_room(self, room_id): | |
count += 1 | ||
|
||
user_set = (user_id, other_user_id) | ||
richvdh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if user_set in self.initially_handled_users_share_private_room: | ||
continue | ||
|
||
if user_set in self.initially_handled_users_share: | ||
if is_public: | ||
continue | ||
to_update.add(user_set) | ||
else: | ||
to_insert.add(user_set) | ||
|
||
if is_public: | ||
self.initially_handled_users_share.add(user_set) | ||
else: | ||
self.initially_handled_users_share_private_room.add(user_set) | ||
to_insert.add(user_set) | ||
|
||
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: | ||
yield self.store.add_users_who_share_room( | ||
room_id, not is_public, to_insert | ||
) | ||
to_insert.clear() | ||
|
||
if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE: | ||
yield self.store.update_users_who_share_room( | ||
room_id, not is_public, to_update | ||
) | ||
to_update.clear() | ||
|
||
if to_insert: | ||
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) | ||
to_insert.clear() | ||
|
||
if to_update: | ||
yield self.store.update_users_who_share_room( | ||
room_id, not is_public, to_update | ||
) | ||
to_update.clear() | ||
|
||
@defer.inlineCallbacks | ||
def _handle_deltas(self, deltas): | ||
"""Called with the state deltas to process | ||
|
@@ -356,6 +319,7 @@ def _handle_deltas(self, deltas): | |
user_ids = yield self.store.get_users_in_dir_due_to_room( | ||
room_id | ||
) | ||
|
||
for user_id in user_ids: | ||
yield self._handle_remove_user(room_id, user_id) | ||
return | ||
|
@@ -436,14 +400,15 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): | |
# ignore the change | ||
return | ||
|
||
if change: | ||
users_with_profile = yield self.state.get_current_user_in_room(room_id) | ||
for user_id, profile in iteritems(users_with_profile): | ||
yield self._handle_new_user(room_id, user_id, profile) | ||
else: | ||
users = yield self.store.get_users_in_public_due_to_room(room_id) | ||
for user_id in users: | ||
yield self._handle_remove_user(room_id, user_id) | ||
users_with_profile = yield self.state.get_current_user_in_room(room_id) | ||
|
||
# Remove every user from the sharing tables for that room. | ||
for user_id in iterkeys(users_with_profile): | ||
yield self.store.remove_user_who_share_room(user_id, room_id) | ||
|
||
# Then, re-add them to the tables. | ||
for user_id, profile in iteritems(users_with_profile): | ||
yield self._handle_new_user(room_id, user_id, profile) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it feels like we're (still) repeating ourselves here. Suppose there are 3 local users in the room: A, B, C. First we call _handle_new_user with A. That will add entries: (A, A), (A, B), (A, C), (B, A), (C, A). So most of those entries are being added twice. (happy if you want to say that's an existing problem, to be ignored for now, but since we're rewriting this it seems a reasonable time to consider it, or at least add a comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without the adding yourself, that drops one of those, but yes, it does repeat that, but that's an existing problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ironically the "adding yourself" was the only entry which wasn't being redone.
Fair enough. I'd still be happier if you could add a TODO or something so that I know I'm not going mad next time I look at this code. |
||
|
||
@defer.inlineCallbacks | ||
def _handle_local_user(self, user_id): | ||
|
@@ -457,7 +422,7 @@ def _handle_local_user(self, user_id): | |
|
||
row = yield self.store.get_user_in_directory(user_id) | ||
if not row: | ||
yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) | ||
yield self.store.add_profiles_to_user_dir({user_id: profile}) | ||
|
||
@defer.inlineCallbacks | ||
def _handle_new_user(self, room_id, user_id, profile): | ||
|
@@ -471,55 +436,30 @@ def _handle_new_user(self, room_id, user_id, profile): | |
|
||
row = yield self.store.get_user_in_directory(user_id) | ||
if not row: | ||
yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) | ||
yield self.store.add_profiles_to_user_dir({user_id: profile}) | ||
|
||
is_public = yield self.store.is_room_world_readable_or_publicly_joinable( | ||
room_id | ||
) | ||
|
||
if is_public: | ||
row = yield self.store.get_user_in_public_room(user_id) | ||
if not row: | ||
yield self.store.add_users_to_public_room(room_id, [user_id]) | ||
else: | ||
logger.debug("Not adding new user to public dir, %r", user_id) | ||
|
||
# Now we update users who share rooms with users. We do this by getting | ||
# all the current users in the room and seeing which aren't already | ||
# marked in the database as sharing with `user_id` | ||
|
||
# Now we update users who share rooms with users. | ||
users_with_profile = yield self.state.get_current_user_in_room(room_id) | ||
|
||
to_insert = set() | ||
to_update = set() | ||
|
||
is_appservice = self.store.get_if_app_services_interested_in_user(user_id) | ||
|
||
# First, if they're our user then we need to update for every user | ||
if self.is_mine_id(user_id) and not is_appservice: | ||
# Returns a map of other_user_id -> shared_private. We only need | ||
# to update mappings if for users that either don't share a room | ||
# already (aren't in the map) or, if the room is private, those that | ||
# only share a public room. | ||
user_ids_shared = yield self.store.get_users_who_share_room_from_dir( | ||
user_id | ||
) | ||
if self.is_mine_id(user_id): | ||
|
||
for other_user_id in users_with_profile: | ||
if user_id == other_user_id: | ||
continue | ||
is_appservice = self.store.get_if_app_services_interested_in_user(user_id) | ||
|
||
# We don't care about appservice users. | ||
if not is_appservice: | ||
# Our users are always in a room with themselves | ||
to_insert.add((user_id, user_id)) | ||
|
||
for other_user_id in users_with_profile: | ||
if user_id == other_user_id: | ||
continue | ||
|
||
shared_is_private = user_ids_shared.get(other_user_id) | ||
if shared_is_private is True: | ||
# We've already marked in the database they share a private room | ||
continue | ||
elif shared_is_private is False: | ||
# They already share a public room, so only update if this is | ||
# a private room | ||
if not is_public: | ||
to_update.add((user_id, other_user_id)) | ||
elif shared_is_private is None: | ||
# This is the first time they both share a room | ||
to_insert.add((user_id, other_user_id)) | ||
|
||
# Next we need to update for every local user in the room | ||
|
@@ -531,29 +471,11 @@ def _handle_new_user(self, room_id, user_id, profile): | |
other_user_id | ||
) | ||
if self.is_mine_id(other_user_id) and not is_appservice: | ||
shared_is_private = yield self.store.get_if_users_share_a_room( | ||
other_user_id, user_id | ||
) | ||
if shared_is_private is True: | ||
# We've already marked in the database they share a private room | ||
continue | ||
elif shared_is_private is False: | ||
# They already share a public room, so only update if this is | ||
# a private room | ||
if not is_public: | ||
to_update.add((other_user_id, user_id)) | ||
elif shared_is_private is None: | ||
# This is the first time they both share a room | ||
to_insert.add((other_user_id, user_id)) | ||
to_insert.add((other_user_id, user_id)) | ||
|
||
if to_insert: | ||
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) | ||
|
||
if to_update: | ||
yield self.store.update_users_who_share_room( | ||
room_id, not is_public, to_update | ||
) | ||
|
||
@defer.inlineCallbacks | ||
def _handle_remove_user(self, room_id, user_id): | ||
"""Called when we might need to remove user to directory | ||
|
@@ -562,84 +484,16 @@ def _handle_remove_user(self, room_id, user_id): | |
room_id (str): room_id that user left or stopped being public that | ||
user_id (str) | ||
""" | ||
logger.debug("Maybe removing user %r", user_id) | ||
|
||
row = yield self.store.get_user_in_directory(user_id) | ||
update_user_dir = row and row["room_id"] == room_id | ||
|
||
row = yield self.store.get_user_in_public_room(user_id) | ||
update_user_in_public = row and row["room_id"] == room_id | ||
logger.debug("Removing user %r", user_id) | ||
|
||
if update_user_in_public or update_user_dir: | ||
# XXX: Make this faster? | ||
rooms = yield self.store.get_rooms_for_user(user_id) | ||
for j_room_id in rooms: | ||
if not update_user_in_public and not update_user_dir: | ||
break | ||
|
||
is_in_room = yield self.store.is_host_joined( | ||
j_room_id, self.server_name | ||
) | ||
|
||
if not is_in_room: | ||
continue | ||
|
||
if update_user_dir: | ||
update_user_dir = False | ||
yield self.store.update_user_in_user_dir(user_id, j_room_id) | ||
|
||
is_public = yield self.store.is_room_world_readable_or_publicly_joinable( | ||
j_room_id | ||
) | ||
# Remove user from sharing tables | ||
yield self.store.remove_user_who_share_room(user_id, room_id) | ||
|
||
if update_user_in_public and is_public: | ||
yield self.store.update_user_in_public_user_list(user_id, j_room_id) | ||
update_user_in_public = False | ||
# Are they still in a room with members? If not, remove them entirely. | ||
hawkowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id) | ||
|
||
if update_user_dir: | ||
if len(users_in_room_with) == 0: | ||
yield self.store.remove_from_user_dir(user_id) | ||
elif update_user_in_public: | ||
yield self.store.remove_from_user_in_public_room(user_id) | ||
|
||
# Now handle users_who_share_rooms. | ||
|
||
# Get a list of user tuples that were in the DB due to this room and | ||
# users (this includes tuples where the other user matches `user_id`) | ||
user_tuples = yield self.store.get_users_in_share_dir_with_room_id( | ||
user_id, room_id | ||
) | ||
|
||
for user_id, other_user_id in user_tuples: | ||
# For each user tuple get a list of rooms that they still share, | ||
# trying to find a private room, and update the entry in the DB | ||
rooms = yield self.store.get_rooms_in_common_for_users( | ||
user_id, other_user_id | ||
) | ||
|
||
# If they dont share a room anymore, remove the mapping | ||
if not rooms: | ||
yield self.store.remove_user_who_share_room(user_id, other_user_id) | ||
continue | ||
|
||
found_public_share = None | ||
for j_room_id in rooms: | ||
is_public = yield self.store.is_room_world_readable_or_publicly_joinable( | ||
j_room_id | ||
) | ||
|
||
if is_public: | ||
found_public_share = j_room_id | ||
else: | ||
found_public_share = None | ||
yield self.store.update_users_who_share_room( | ||
room_id, not is_public, [(user_id, other_user_id)] | ||
) | ||
break | ||
|
||
if found_public_share: | ||
yield self.store.update_users_who_share_room( | ||
room_id, not is_public, [(user_id, other_user_id)] | ||
) | ||
|
||
@defer.inlineCallbacks | ||
def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a rule, we try to avoid generic
self.hs
references, in favour of pulling things out of hs in the constructor. It makes dependencies more explicit and gives a better chance of detecting broken references early.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also makes it impossible to reload or edit config during tests :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if you want to argue that the advantages of this style outweigh the disadvantages, it's a topic for a retro or something. For now please put it back as it was.