From b16ce0118345e5139af2ebc33576529854d2a4de Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 21 Dec 2021 13:45:58 -0500 Subject: [PATCH 01/13] Remove unneeded room_id. --- synapse/storage/databases/main/events.py | 4 +--- synapse/storage/databases/main/relations.py | 11 ++++------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index cce230559773..b5a38882d29d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1783,9 +1783,7 @@ def _handle_event_relations( ) if rel_type == RelationTypes.REPLACE: - txn.call_after( - self.store.get_applicable_edit.invalidate, (parent_id, event.room_id) - ) + txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) if rel_type == RelationTypes.THREAD: txn.call_after( diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index c6c4bd18da3e..052ee705fad4 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -326,9 +326,7 @@ def _get_aggregation_groups_for_event_txn( ) @cached() - async def get_applicable_edit( - self, event_id: str, room_id: str - ) -> Optional[EventBase]: + async def get_applicable_edit(self, event_id: str) -> Optional[EventBase]: """Get the most recent edit (if any) that has happened for the given event. @@ -336,7 +334,6 @@ async def get_applicable_edit( Args: event_id: The original event ID - room_id: The original event's room ID Returns: The most recent edit, if any. @@ -355,17 +352,17 @@ async def get_applicable_edit( original.event_id = relates_to_id AND edit.type = original.type AND edit.sender = original.sender + AND edit.room_id = original.room_id WHERE relates_to_id = ? AND relation_type = ? - AND edit.room_id = ? AND edit.type = 'm.room.message' ORDER by edit.origin_server_ts DESC, edit.event_id DESC LIMIT 1 """ def _get_applicable_edit_txn(txn: LoggingTransaction) -> Optional[str]: - txn.execute(sql, (event_id, RelationTypes.REPLACE, room_id)) + txn.execute(sql, (event_id, RelationTypes.REPLACE)) row = txn.fetchone() if row: return row[0] @@ -591,7 +588,7 @@ async def _get_bundled_aggregation_for_event( edit = None if event.type == EventTypes.Message: - edit = await self.get_applicable_edit(event_id, room_id) + edit = await self.get_applicable_edit(event_id) if edit: aggregations[RelationTypes.REPLACE] = edit From 4344f1085a3e48ff2be7047ff2e84564f72bc74d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 21 Dec 2021 14:12:50 -0500 Subject: [PATCH 02/13] Fetch multiple edits at once. --- synapse/storage/databases/main/relations.py | 86 ++++++++++++++++----- 1 file changed, 66 insertions(+), 20 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 052ee705fad4..ded8c95fe38d 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -44,6 +44,7 @@ RelationPaginationToken, ) from synapse.util.caches.descriptors import cached +from synapse.util.caches.lrucache import LruCache if TYPE_CHECKING: from synapse.server import HomeServer @@ -63,6 +64,11 @@ def __init__( self._msc1849_enabled = hs.config.experimental.msc1849_enabled self._msc3440_enabled = hs.config.experimental.msc3440_enabled + self.get_applicable_edit: LruCache[str, Optional[EventBase]] = LruCache( + cache_name="get_applicable_edit", + max_size=hs.config.caches.event_cache_size, # TODO + ) + @cached(tree=True) async def get_relations_for_event( self, @@ -325,28 +331,49 @@ def _get_aggregation_groups_for_event_txn( "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn ) - @cached() - async def get_applicable_edit(self, event_id: str) -> Optional[EventBase]: + async def _get_applicable_edits( + self, event_ids: Iterable[str] + ) -> Dict[str, EventBase]: """Get the most recent edit (if any) that has happened for the given - event. + events. Correctly handles checking whether edits were allowed to happen. Args: - event_id: The original event ID + event_ids: The original event IDs Returns: - The most recent edit, if any. + A map of the most recent edit for each event. A missing event implies + there is no edits. """ + # A map of the original event IDs to the edit events. + edits_by_original = {} + + # Check if an edit for this event is currently cached. + event_ids_to_check = [] + for event_id in event_ids: + if event_id not in self.get_applicable_edit: + event_ids_to_check.append(event_id) + else: + edit_event = self.get_applicable_edit[event_id] + if edit_event: + edits_by_original[event_id] = edit_event + + # If all events were cached, all done. + if not event_ids_to_check: + return edits_by_original + # We only allow edits for `m.room.message` events that have the same sender # and event type. We can't assert these things during regular event auth so # we have to do the checks post hoc. # Fetches latest edit that has the same type and sender as the # original, and is an `m.room.message`. + # + # TODO Should this ensure it does not return results for state events / redacted events? sql = """ - SELECT edit.event_id FROM events AS edit + SELECT original.event_id, edit.event_id FROM events AS edit INNER JOIN event_relations USING (event_id) INNER JOIN events AS original ON original.event_id = relates_to_id @@ -354,28 +381,46 @@ async def get_applicable_edit(self, event_id: str) -> Optional[EventBase]: AND edit.sender = original.sender AND edit.room_id = original.room_id WHERE - relates_to_id = ? + %s AND relation_type = ? AND edit.type = 'm.room.message' ORDER by edit.origin_server_ts DESC, edit.event_id DESC - LIMIT 1 """ - def _get_applicable_edit_txn(txn: LoggingTransaction) -> Optional[str]: - txn.execute(sql, (event_id, RelationTypes.REPLACE)) - row = txn.fetchone() - if row: - return row[0] - return None - - edit_id = await self.db_pool.runInteraction( + def _get_applicable_edit_txn(txn: LoggingTransaction) -> Dict[str, str]: + clause, args = make_in_list_sql_clause( + txn.database_engine, "relates_to_id", event_ids_to_check + ) + args.append(RelationTypes.REPLACE) + + txn.execute(sql % (clause,), args) + rows = txn.fetchall() + result = {} + for original_event_id, edit_event_id in rows: + # Only consider the latest edit (by origin server ts). + if original_event_id not in result: + result[original_event_id] = edit_event_id + return result + + edit_ids = await self.db_pool.runInteraction( "get_applicable_edit", _get_applicable_edit_txn ) - if not edit_id: - return None + edits = await self.get_events(edit_ids.values()) # type: ignore[attr-defined] + + # Add the newly checked events to the cache. If an edit exists, add it to + # the results. + for original_event_id in event_ids_to_check: + # There might not be an edit or the event might not be known. In + # either case, cache the None. + edit_event_id = edit_ids.get(original_event_id) + edit_event = edits.get(edit_event_id) + + self.get_applicable_edit.set(original_event_id, edit_event) + if edit_event: + edits_by_original[original_event_id] = edit_event - return await self.get_event(edit_id, allow_none=True) # type: ignore[attr-defined] + return edits_by_original @cached() async def get_thread_summary( @@ -588,7 +633,8 @@ async def _get_bundled_aggregation_for_event( edit = None if event.type == EventTypes.Message: - edit = await self.get_applicable_edit(event_id) + edits = await self._get_applicable_edits([event_id]) + edit = edits.get(event_id) if edit: aggregations[RelationTypes.REPLACE] = edit From 7e97e55dead56b2f022f8635387c1668ade14dde Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 21 Dec 2021 14:21:04 -0500 Subject: [PATCH 03/13] Fetch event edits in groups. --- synapse/storage/databases/main/relations.py | 28 +++++++++++---------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index ded8c95fe38d..305e36341fdf 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -16,6 +16,7 @@ from typing import ( TYPE_CHECKING, Any, + Collection, Dict, Iterable, List, @@ -28,7 +29,7 @@ import attr from frozendict import frozendict -from synapse.api.constants import EventTypes, RelationTypes +from synapse.api.constants import RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( @@ -631,14 +632,6 @@ async def _get_bundled_aggregation_for_event( if references.chunk: aggregations[RelationTypes.REFERENCE] = references.to_dict() - edit = None - if event.type == EventTypes.Message: - edits = await self._get_applicable_edits([event_id]) - edit = edits.get(event_id) - - if edit: - aggregations[RelationTypes.REPLACE] = edit - # If this event is the start of a thread, include a summary of the replies. if self._msc3440_enabled: ( @@ -656,7 +649,7 @@ async def _get_bundled_aggregation_for_event( return aggregations async def get_bundled_aggregations( - self, events: Iterable[EventBase] + self, events: Collection[EventBase] ) -> Dict[str, Dict[str, Any]]: """Generate bundled aggregations for events. @@ -671,12 +664,21 @@ async def get_bundled_aggregations( if not self._msc1849_enabled: return {} - # TODO Parallelize. - results = {} + # event ID -> bundled aggregation in non-serialized form. + results: Dict[str, Dict[str, Any]] = {} + + event_ids = [event.event_id for event in events] + + # Fetch any edits. + edits = await self._get_applicable_edits(event_ids) + for event_id, edit in edits.items(): + results.setdefault(event_id, {})[RelationTypes.REPLACE] = edit + + # Fetch other relations per event. for event in events: event_result = await self._get_bundled_aggregation_for_event(event) if event_result is not None: - results[event.event_id] = event_result + results.setdefault(event.event_id, {}).update(event_result) return results From d4a41c8c5d1302cf1487b2e716706ea6cb006150 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 29 Dec 2021 10:45:55 -0500 Subject: [PATCH 04/13] Newsfragment --- changelog.d/11660.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11660.misc diff --git a/changelog.d/11660.misc b/changelog.d/11660.misc new file mode 100644 index 000000000000..47e085e4d931 --- /dev/null +++ b/changelog.d/11660.misc @@ -0,0 +1 @@ +Improve performance when fetching bundled aggregations for multiple events. From e9908c8cd4ef032b9a7a3f4ce335e9583953165b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Jan 2022 10:35:38 -0500 Subject: [PATCH 05/13] edit -> edits --- synapse/storage/databases/main/relations.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 305e36341fdf..ba5f1459ecd6 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -388,7 +388,7 @@ async def _get_applicable_edits( ORDER by edit.origin_server_ts DESC, edit.event_id DESC """ - def _get_applicable_edit_txn(txn: LoggingTransaction) -> Dict[str, str]: + def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: clause, args = make_in_list_sql_clause( txn.database_engine, "relates_to_id", event_ids_to_check ) @@ -404,7 +404,7 @@ def _get_applicable_edit_txn(txn: LoggingTransaction) -> Dict[str, str]: return result edit_ids = await self.db_pool.runInteraction( - "get_applicable_edit", _get_applicable_edit_txn + "get_applicable_edits", _get_applicable_edits_txn ) edits = await self.get_events(edit_ids.values()) # type: ignore[attr-defined] From 7ac2a9d555a240b12eb92a518402e77c03b84738 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Jan 2022 10:37:20 -0500 Subject: [PATCH 06/13] Rename cache. --- synapse/storage/databases/main/events.py | 2 +- synapse/storage/databases/main/relations.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b5a38882d29d..8b1211cf4fc1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1783,7 +1783,7 @@ def _handle_event_relations( ) if rel_type == RelationTypes.REPLACE: - txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) + txn.call_after(self.store.applicable_edit_cache.invalidate, (parent_id,)) if rel_type == RelationTypes.THREAD: txn.call_after( diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index ba5f1459ecd6..214bbe878d65 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -65,8 +65,8 @@ def __init__( self._msc1849_enabled = hs.config.experimental.msc1849_enabled self._msc3440_enabled = hs.config.experimental.msc3440_enabled - self.get_applicable_edit: LruCache[str, Optional[EventBase]] = LruCache( - cache_name="get_applicable_edit", + self.applicable_edit_cache: LruCache[str, Optional[EventBase]] = LruCache( + cache_name="applicable_edit_cache", max_size=hs.config.caches.event_cache_size, # TODO ) @@ -354,10 +354,10 @@ async def _get_applicable_edits( # Check if an edit for this event is currently cached. event_ids_to_check = [] for event_id in event_ids: - if event_id not in self.get_applicable_edit: + if event_id not in self.applicable_edit_cache: event_ids_to_check.append(event_id) else: - edit_event = self.get_applicable_edit[event_id] + edit_event = self.applicable_edit_cache[event_id] if edit_event: edits_by_original[event_id] = edit_event @@ -417,7 +417,7 @@ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: edit_event_id = edit_ids.get(original_event_id) edit_event = edits.get(edit_event_id) - self.get_applicable_edit.set(original_event_id, edit_event) + self.applicable_edit_cache.set(original_event_id, edit_event) if edit_event: edits_by_original[original_event_id] = edit_event From aacf47e77462a1a303572a927d4c6e8282f7ef02 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Jan 2022 10:39:32 -0500 Subject: [PATCH 07/13] Use the previous cache size. --- synapse/storage/databases/main/relations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 214bbe878d65..8950a383de5f 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -67,7 +67,7 @@ def __init__( self.applicable_edit_cache: LruCache[str, Optional[EventBase]] = LruCache( cache_name="applicable_edit_cache", - max_size=hs.config.caches.event_cache_size, # TODO + max_size=1000, ) @cached(tree=True) From 813218006b5f752f298ed0b1afac0aa799655055 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 19 Jan 2022 07:57:54 -0500 Subject: [PATCH 08/13] Do not attempt to find edits for state/redacted events. --- synapse/storage/databases/main/relations.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 74a20c8464fa..ae06e6cb2988 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -371,8 +371,6 @@ async def _get_applicable_edits( # Fetches latest edit that has the same type and sender as the # original, and is an `m.room.message`. - # - # TODO Should this ensure it does not return results for state events / redacted events? sql = """ SELECT original.event_id, edit.event_id FROM events AS edit INNER JOIN event_relations USING (event_id) @@ -641,9 +639,6 @@ async def _get_bundled_aggregation_for_event( The bundled aggregations for an event, if bundled aggregations are enabled and the event can have bundled aggregations. """ - # State events and redacted events do not get bundled aggregations. - if event.is_state() or event.internal_metadata.is_redacted(): - return None # Do not bundle aggregations for an event which represents an edit or an # annotation. It does not make sense for them to have related events. @@ -706,6 +701,13 @@ async def get_bundled_aggregations( if not self._msc1849_enabled: return {} + # State events and redacted events do not get bundled aggregations. + events = [ + event + for event in events + if event.is_state() or event.internal_metadata.is_redacted() + ] + # event ID -> bundled aggregation in non-serialized form. results: Dict[str, Dict[str, Any]] = {} From e00cec088c072a8f97616dd3a82ae125c793ba12 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 19 Jan 2022 08:35:20 -0500 Subject: [PATCH 09/13] Fix logic. --- synapse/storage/databases/main/relations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index ae06e6cb2988..5f248aeed709 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -705,7 +705,7 @@ async def get_bundled_aggregations( events = [ event for event in events - if event.is_state() or event.internal_metadata.is_redacted() + if not event.is_state() and not event.internal_metadata.is_redacted() ] # event ID -> bundled aggregation in non-serialized form. From 8400c20086e7747336cc1cc174628ae182788bd7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 25 Jan 2022 10:42:51 -0500 Subject: [PATCH 10/13] Directly select latest edit. --- synapse/storage/databases/main/relations.py | 58 ++++++++++++++------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 5f248aeed709..0157a7e75af8 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -39,6 +39,7 @@ make_in_list_sql_clause, ) from synapse.storage.databases.main.stream import generate_pagination_where_clause +from synapse.storage.engines import PostgresEngine from synapse.storage.relations import ( AggregationPaginationToken, PaginationChunk, @@ -371,20 +372,42 @@ async def _get_applicable_edits( # Fetches latest edit that has the same type and sender as the # original, and is an `m.room.message`. - sql = """ - SELECT original.event_id, edit.event_id FROM events AS edit - INNER JOIN event_relations USING (event_id) - INNER JOIN events AS original ON - original.event_id = relates_to_id - AND edit.type = original.type - AND edit.sender = original.sender - AND edit.room_id = original.room_id - WHERE - %s - AND relation_type = ? - AND edit.type = 'm.room.message' - ORDER by edit.origin_server_ts DESC, edit.event_id DESC - """ + if isinstance(self.database_engine, PostgresEngine): + # The `DISTINCT ON` clause will pick the *first* row it encounters, + # so ordering by origin server ts + event ID desc will ensure we get + # the latest edit. + sql = """ + SELECT DISTINCT ON (original.event_id) original.event_id, edit.origin_server_ts, edit.event_id FROM events AS edit + INNER JOIN event_relations USING (event_id) + INNER JOIN events AS original ON + original.event_id = relates_to_id + AND edit.type = original.type + AND edit.sender = original.sender + AND edit.room_id = original.room_id + WHERE + %s + AND relation_type = ? + AND edit.type = 'm.room.message' + ORDER by original.event_id DESC, edit.origin_server_ts DESC, edit.event_id DESC + """ + else: + # SQLite has special handling for bare columns when using MIN/MAX + # with a `GROUP BY` clause where it picks the value from a row that + # matches the MIN/MAX. + sql = """ + SELECT original.event_id, MAX(edit.origin_server_ts), MAX(edit.event_id) FROM events AS edit + INNER JOIN event_relations USING (event_id) + INNER JOIN events AS original ON + original.event_id = relates_to_id + AND edit.type = original.type + AND edit.sender = original.sender + AND edit.room_id = original.room_id + WHERE + %s + AND relation_type = ? + AND edit.type = 'm.room.message' + GROUP BY (original.event_id) + """ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: clause, args = make_in_list_sql_clause( @@ -394,12 +417,7 @@ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: txn.execute(sql % (clause,), args) rows = txn.fetchall() - result = {} - for original_event_id, edit_event_id in rows: - # Only consider the latest edit (by origin server ts). - if original_event_id not in result: - result[original_event_id] = edit_event_id - return result + return {row[0]: row[2] for row in rows} edit_ids = await self.db_pool.runInteraction( "get_applicable_edits", _get_applicable_edits_txn From ffad611fbb85be1d715dc5f5872d54a80bdaaa24 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 26 Jan 2022 09:22:33 -0500 Subject: [PATCH 11/13] Use cachedList instead of homegrown solution. --- synapse/storage/databases/main/events.py | 2 +- synapse/storage/databases/main/relations.py | 73 +++++++++------------ 2 files changed, 31 insertions(+), 44 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c07ffa522b94..5af811f82a27 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1801,7 +1801,7 @@ def _handle_event_relations( ) if rel_type == RelationTypes.REPLACE: - txn.call_after(self.store.applicable_edit_cache.invalidate, (parent_id,)) + txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) if rel_type == RelationTypes.THREAD: txn.call_after( diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 80cbab29baf1..7e45923d0eff 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,7 +13,17 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, + cast, +) import attr from frozendict import frozendict @@ -35,8 +45,7 @@ RelationPaginationToken, ) from synapse.types import JsonDict -from synapse.util.caches.descriptors import cached -from synapse.util.caches.lrucache import LruCache +from synapse.util.caches.descriptors import cached, cachedList if TYPE_CHECKING: from synapse.server import HomeServer @@ -80,11 +89,6 @@ def __init__( self._msc1849_enabled = hs.config.experimental.msc1849_enabled self._msc3440_enabled = hs.config.experimental.msc3440_enabled - self.applicable_edit_cache: LruCache[str, Optional[EventBase]] = LruCache( - cache_name="applicable_edit_cache", - max_size=1000, - ) - @cached(tree=True) async def get_relations_for_event( self, @@ -347,9 +351,14 @@ def _get_aggregation_groups_for_event_txn( "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn ) + @cached() + def get_applicable_edit(self, event_id: str) -> Optional[EventBase]: + raise NotImplementedError() + + @cachedList(cached_method_name="get_applicable_edit", list_name="event_ids") async def _get_applicable_edits( - self, event_ids: Iterable[str] - ) -> Dict[str, EventBase]: + self, event_ids: Collection[str] + ) -> Dict[str, Optional[EventBase]]: """Get the most recent edit (if any) that has happened for the given events. @@ -359,27 +368,10 @@ async def _get_applicable_edits( event_ids: The original event IDs Returns: - A map of the most recent edit for each event. A missing event implies - there is no edits. + A map of the most recent edit for each event. If there are no edits, + the event will map to None. """ - # A map of the original event IDs to the edit events. - edits_by_original = {} - - # Check if an edit for this event is currently cached. - event_ids_to_check = [] - for event_id in event_ids: - if event_id not in self.applicable_edit_cache: - event_ids_to_check.append(event_id) - else: - edit_event = self.applicable_edit_cache[event_id] - if edit_event: - edits_by_original[event_id] = edit_event - - # If all events were cached, all done. - if not event_ids_to_check: - return edits_by_original - # We only allow edits for `m.room.message` events that have the same sender # and event type. We can't assert these things during regular event auth so # we have to do the checks post hoc. @@ -425,7 +417,7 @@ async def _get_applicable_edits( def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: clause, args = make_in_list_sql_clause( - txn.database_engine, "relates_to_id", event_ids_to_check + txn.database_engine, "relates_to_id", event_ids ) args.append(RelationTypes.REPLACE) @@ -439,19 +431,14 @@ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: edits = await self.get_events(edit_ids.values()) # type: ignore[attr-defined] - # Add the newly checked events to the cache. If an edit exists, add it to - # the results. - for original_event_id in event_ids_to_check: - # There might not be an edit or the event might not be known. In - # either case, cache the None. - edit_event_id = edit_ids.get(original_event_id) - edit_event = edits.get(edit_event_id) - - self.applicable_edit_cache.set(original_event_id, edit_event) - if edit_event: - edits_by_original[original_event_id] = edit_event - - return edits_by_original + # Map to the original event IDs to the edit events. + # + # There might not be an edit event due to there being no edits or + # due to the event not being known, either case is treated the same. + return { + original_event_id: edits.get(edit_ids.get(original_event_id)) + for original_event_id in event_ids + } @cached() async def get_thread_summary( From 5ce3e07dc0713ac41b92530174e661b84a775556 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 26 Jan 2022 13:19:26 -0500 Subject: [PATCH 12/13] Use less efficient, but correct query for SQLite. --- synapse/storage/databases/main/relations.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 7e45923d0eff..50564e1264b8 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -383,7 +383,7 @@ async def _get_applicable_edits( # so ordering by origin server ts + event ID desc will ensure we get # the latest edit. sql = """ - SELECT DISTINCT ON (original.event_id) original.event_id, edit.origin_server_ts, edit.event_id FROM events AS edit + SELECT DISTINCT ON (original.event_id) original.event_id, edit.event_id FROM events AS edit INNER JOIN event_relations USING (event_id) INNER JOIN events AS original ON original.event_id = relates_to_id @@ -397,11 +397,12 @@ async def _get_applicable_edits( ORDER by original.event_id DESC, edit.origin_server_ts DESC, edit.event_id DESC """ else: - # SQLite has special handling for bare columns when using MIN/MAX - # with a `GROUP BY` clause where it picks the value from a row that - # matches the MIN/MAX. + # SQLite uses a simplified query which returns all edits for an + # original event. The results are then de-duplicated when turned into + # a dict. Due to the chosen ordering, the latest edit stomps on + # earlier edits. sql = """ - SELECT original.event_id, MAX(edit.origin_server_ts), MAX(edit.event_id) FROM events AS edit + SELECT original.event_id, edit.event_id FROM events AS edit INNER JOIN event_relations USING (event_id) INNER JOIN events AS original ON original.event_id = relates_to_id @@ -412,7 +413,7 @@ async def _get_applicable_edits( %s AND relation_type = ? AND edit.type = 'm.room.message' - GROUP BY (original.event_id) + ORDER by edit.origin_server_ts, edit.event_id """ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: @@ -422,8 +423,7 @@ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: args.append(RelationTypes.REPLACE) txn.execute(sql % (clause,), args) - rows = txn.fetchall() - return {row[0]: row[2] for row in rows} + return dict(txn.fetchall()) edit_ids = await self.db_pool.runInteraction( "get_applicable_edits", _get_applicable_edits_txn From 9a7cc1a885e37b2a04ec16ba8c4abd789a928910 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 26 Jan 2022 13:29:06 -0500 Subject: [PATCH 13/13] Lint --- synapse/storage/databases/main/relations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 50564e1264b8..2a54afdedb19 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -423,7 +423,7 @@ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]: args.append(RelationTypes.REPLACE) txn.execute(sql % (clause,), args) - return dict(txn.fetchall()) + return dict(cast(Iterable[Tuple[str, str]], txn.fetchall())) edit_ids = await self.db_pool.runInteraction( "get_applicable_edits", _get_applicable_edits_txn