Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove check current state membership up to date #13745

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13745.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove old queries to join room memberships to current state events. Contributed by Nick @ Beeper (@fizzadar).
202 changes: 47 additions & 155 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@

from synapse.api.constants import EventTypes, Membership
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -91,16 +88,6 @@ def __init__(
# at a time. Keyed by room_id.
self._joined_host_linearizer = Linearizer("_JoinedHostsCache")

# Is the current_state_events.membership up to date? Or is the
# background update still running?
self._current_state_events_membership_up_to_date = False

txn = db_conn.cursor(
txn_name="_check_safe_current_state_events_membership_updated"
)
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()

if (
self.hs.config.worker.run_background_tasks
and self.hs.config.metrics.metrics_flags.known_servers
Expand Down Expand Up @@ -157,34 +144,6 @@ def _transact(txn: LoggingTransaction) -> int:
self._known_servers_count = max([count, 1])
return self._known_servers_count

def _check_safe_current_state_events_membership_updated_txn(
self, txn: LoggingTransaction
) -> None:
"""Checks if it is safe to assume the new current_state_events
membership column is up to date
"""

pending_update = self.db_pool.simple_select_one_txn(
txn,
table="background_updates",
keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
retcols=["update_name"],
allow_none=True,
)

self._current_state_events_membership_up_to_date = not pending_update

# If the update is still running, reschedule to run.
if pending_update:
self._clock.call_later(
15.0,
run_as_background_process,
"_check_safe_current_state_events_membership_updated",
self.db_pool.runInteraction,
"_check_safe_current_state_events_membership_updated",
self._check_safe_current_state_events_membership_updated_txn,
)

@cached(max_entries=100000, iterable=True)
async def get_users_in_room(self, room_id: str) -> List[str]:
"""
Expand All @@ -205,31 +164,14 @@ def get_users_in_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[s
`get_current_hosts_in_room()` and so we can re-use the cache but it's
not horrible to have here either.
"""
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT c.state_key FROM current_state_events as c
/* Get the depth of the event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
/* Sorted by lowest depth first */
ORDER BY e.depth ASC;
"""
else:
sql = """
SELECT c.state_key FROM room_memberships as m
/* Get the depth of the event from the events table */
INNER JOIN events AS e USING (event_id)
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
/* Sorted by lowest depth first */
ORDER BY e.depth ASC;
"""
sql = """
SELECT c.state_key FROM current_state_events as c
/* Get the depth of the event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
/* Sorted by lowest depth first */
ORDER BY e.depth ASC;
"""

txn.execute(sql, (room_id, Membership.JOIN))
return [r[0] for r in txn]
Expand Down Expand Up @@ -346,28 +288,14 @@ def _get_room_summary_txn(
# We do this all in one transaction to keep the cache small.
# FIXME: get rid of this when we have room_stats

# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
# Note, rejected events will have a null membership field, so
# we we manually filter them out.
sql = """
SELECT count(*), membership FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
AND membership IS NOT NULL
GROUP BY membership
"""
else:
sql = """
SELECT count(*), m.membership FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""
# Note, rejected events will have a null membership field, so
# we we manually filter them out.
sql = """
SELECT count(*), membership FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
AND membership IS NOT NULL
GROUP BY membership
"""

txn.execute(sql, (room_id,))
res: Dict[str, MemberSummary] = {}
Expand All @@ -376,30 +304,18 @@ def _get_room_summary_txn(

# we order by membership and then fairly arbitrarily by event_id so
# heroes are consistent
if self._current_state_events_membership_up_to_date:
# Note, rejected events will have a null membership field, so
# we we manually filter them out.
sql = """
SELECT state_key, membership, event_id
FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
AND membership IS NOT NULL
ORDER BY
CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
event_id ASC
LIMIT ?
"""
else:
sql = """
SELECT c.state_key, m.membership, c.event_id
FROM room_memberships as m
INNER JOIN current_state_events as c USING (room_id, event_id)
WHERE c.type = 'm.room.member' AND c.room_id = ?
ORDER BY
CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
c.event_id ASC
LIMIT ?
"""
# Note, rejected events will have a null membership field, so
# we we manually filter them out.
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
sql = """
SELECT state_key, membership, event_id
FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ?
AND membership IS NOT NULL
ORDER BY
CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
event_id ASC
LIMIT ?
"""

# 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
Expand Down Expand Up @@ -642,27 +558,15 @@ def _get_rooms_for_user_with_stream_ordering_txn(
# We use `current_state_events` here and not `local_current_membership`
# as a) this gets called with remote users and b) this only gets called
# for rooms the server is participating in.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND c.membership = ?
"""
else:
sql = """
SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (room_id, event_id)
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND m.membership = ?
"""
sql = """
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND c.membership = ?
"""

txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(
Expand Down Expand Up @@ -700,27 +604,15 @@ def _get_rooms_for_users_with_stream_ordering_txn(
user_ids,
)

if self._current_state_events_membership_up_to_date:
sql = f"""
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
"""
else:
sql = f"""
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN room_memberships AS m USING (room_id, event_id)
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND m.membership = ?
AND {clause}
"""
sql = f"""
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.membership = ?
AND {clause}
"""

txn.execute(sql, [Membership.JOIN] + args)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2022 Beeper
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Forces through the `current_state_events_membership` background job so checks
for its completion can be removed.

Note the background job must still remain defined in the database class.
"""


def run_upgrade(cur, database_engine, *args, **kwargs):
cur.execute("SELECT update_name FROM background_updates")
rows = cur.fetchall()
for row in rows:
if row[0] == "current_state_events_membership":
break
# No pending background job so nothing to do here
else:
return

# Populate membership field for all current_state_events, this may take
# a while but was originally handled via a background update in 2019.
cur.execute(
"""
UPDATE current_state_events
SET membership = (
SELECT membership FROM room_memberships
WHERE event_id = current_state_events.event_id
)
"""
)

# Finally, delete the background job because we've handled it above
cur.execute(
"""
DELETE FROM background_updates
WHERE update_name = 'current_state_events_membership'
"""
)