From fd2616497c5cc3fa56fe589395fa2ba51e617947 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 12 May 2023 16:19:38 -0500 Subject: [PATCH 01/26] Process previously failed backfill events in the background Fix https://github.com/matrix-org/synapse/issues/13623 Follow-up to https://github.com/matrix-org/synapse/issues/13621 and https://github.com/matrix-org/synapse/issues/13622 Part of making `/messages` faster: https://github.com/matrix-org/synapse/issues/13356 --- synapse/handlers/federation_event.py | 35 +++++++++++++++++++ .../databases/main/event_federation.py | 31 ++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 06343d40e44e..c7a8f41addb9 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -953,6 +953,41 @@ async def _process_pulled_event( ) return + # Check if we've already tried to process this event + failed_pull_attempt_info = await self._store.get_event_failed_pull_attempt_info( + event.room_id, event_id + ) + if failed_pull_attempt_info: + # Process previously failed backfill events in the background + # to not waste something that is bound to fail again. + run_as_background_process( + "_try_process_pulled_event", + self._try_process_pulled_event, + origin, + event, + backfilled, + ) + else: + # Otherwise, we can optimistically try to process and wait for the event to + # be fully persisted. + await self._try_process_pulled_event(origin, event, backfilled) + + async def _try_process_pulled_event( + self, origin: str, event: EventBase, backfilled: bool + ) -> None: + """ + Handles all of the async tasks necessary to process a pulled event. You should + not use this method directly, instead use `_process_pulled_event` which will + handle all of the quick sync checks that should happen before-hand. + + Params: + origin: The server we received this event from + events: The received event + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) + """ + event_id = event.event_id + try: try: context = await self._compute_event_context_with_maybe_missing_prevs( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ac19de183cb6..541432fc7c06 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -108,6 +108,15 @@ class BackfillQueueNavigationItem: type: str +@attr.s(frozen=True, slots=True, auto_attribs=True) +class EventFailedPullAttemptInfo: + event_id: str + room_id: str + num_attempts: int + last_attempt_ts: int + last_cause: str + + class _NoChainCoverIndex(Exception): def __init__(self, room_id: str): super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) @@ -1583,6 +1592,28 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) + @trace + async def get_event_failed_pull_attempt_info( + self, + room_id: str, + event_id: str, + ) -> Optional[EventFailedPullAttemptInfo]: + res = await self.db_pool.simple_select_one( + table="event_failed_pull_attempts", + keyvalues={"room_id": room_id, "event_id": event_id}, + retcols=["num_attempts", "last_attempt_ts", "last_cause"], + allow_none=True, + desc="get_event_failed_pull_attempt_info", + ) + + return EventFailedPullAttemptInfo( + event_id=event_id, + room_id=room_id, + num_attempts=res["num_attempts"], + last_attempt_ts=res["last_attempt_ts"], + last_cause=res["last_cause"], + ) + @trace async def get_event_ids_to_not_pull_from_backoff( self, From c5dc7464dee0e9761ff3b9358fce15a00a1c9b59 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 12 May 2023 16:37:36 -0500 Subject: [PATCH 02/26] Add changelog --- changelog.d/15585.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15585.feature diff --git a/changelog.d/15585.feature b/changelog.d/15585.feature new file mode 100644 index 000000000000..1adcfb69ee5a --- /dev/null +++ b/changelog.d/15585.feature @@ -0,0 +1 @@ +Process previously failed backfill events in the background to avoid blocking requests for something that is bound to fail again. From 8fc47d816e23685b41a96664d806b7b8c42fc804 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 12 May 2023 16:37:52 -0500 Subject: [PATCH 03/26] Add consideration --- synapse/handlers/federation_event.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index c7a8f41addb9..12207854cab5 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -957,6 +957,7 @@ async def _process_pulled_event( failed_pull_attempt_info = await self._store.get_event_failed_pull_attempt_info( event.room_id, event_id ) + # TODO: Should we only do this for `backfilled=true`? if failed_pull_attempt_info: # Process previously failed backfill events in the background # to not waste something that is bound to fail again. From b5d95f74c5fd21a43599f04c29d1670a714a1581 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 12 May 2023 16:43:55 -0500 Subject: [PATCH 04/26] Fix lints --- synapse/storage/databases/main/event_federation.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 541432fc7c06..59021ba26df4 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1606,6 +1606,9 @@ async def get_event_failed_pull_attempt_info( desc="get_event_failed_pull_attempt_info", ) + if res is None: + return None + return EventFailedPullAttemptInfo( event_id=event_id, room_id=room_id, From e13f5a9c58b4070ad731887dd3511da0bf2cef6e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 May 2023 01:08:07 -0500 Subject: [PATCH 05/26] Always check for failed attempts This function is used for backfill and when another server sends us a PDU. In either case, we're trying to avoid the costly state calculations to see if it's allowed so I think we should always do this check See https://github.com/matrix-org/synapse/pull/15585#discussion_r1194656433 --- synapse/handlers/federation_event.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 12207854cab5..94a0a139652f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -957,10 +957,9 @@ async def _process_pulled_event( failed_pull_attempt_info = await self._store.get_event_failed_pull_attempt_info( event.room_id, event_id ) - # TODO: Should we only do this for `backfilled=true`? if failed_pull_attempt_info: - # Process previously failed backfill events in the background - # to not waste something that is bound to fail again. + # Process previously failed backfill events in the background to not waste + # time on something that is bound to fail again. run_as_background_process( "_try_process_pulled_event", self._try_process_pulled_event, From 70f5911e95e23bc0d07b69b367beaa55ec398041 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 May 2023 01:31:47 -0500 Subject: [PATCH 06/26] Add comments and concern about maybe queue --- synapse/handlers/federation_event.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 94a0a139652f..39844e697461 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -953,13 +953,19 @@ async def _process_pulled_event( ) return - # Check if we've already tried to process this event + # Check if we've already tried to process this event at some point in the past. + # We aren't concerned with the expontntial backoff here, just whether it has + # failed before. failed_pull_attempt_info = await self._store.get_event_failed_pull_attempt_info( event.room_id, event_id ) if failed_pull_attempt_info: # Process previously failed backfill events in the background to not waste # time on something that is bound to fail again. + # + # TODO: Are we concerned with processing too many events in parallel since + # we just fire and forget this off to the background? Should we instead have + # a background queue to chew through? run_as_background_process( "_try_process_pulled_event", self._try_process_pulled_event, From 45934fe0fb218e6cfbfa5dda3cef6be7930b8e13 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 May 2023 02:57:21 -0500 Subject: [PATCH 07/26] Process all failed events as a sequential task in the background To avoid events stacking up in parallel exhausting our resources. See https://github.com/matrix-org/synapse/pull/15585#discussion_r1194689547 --- synapse/handlers/federation_event.py | 83 ++++++++----------- .../databases/main/event_federation.py | 54 ++++++------ 2 files changed, 61 insertions(+), 76 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 39844e697461..78438f79f4d4 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -890,12 +890,42 @@ async def _process_pulled_events( # Continue on with the events that are new to us. new_events.append(event) - # We want to sort these by depth so we process them and - # tell clients about them in order. - sorted_events = sorted(new_events, key=lambda x: x.depth) - for ev in sorted_events: - with nested_logging_context(ev.event_id): - await self._process_pulled_event(origin, ev, backfilled=backfilled) + @trace + async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: + # We want to sort these by depth so we process them and + # tell clients about them in order. + sorted_events = sorted(new_events, key=lambda x: x.depth) + for ev in sorted_events: + with nested_logging_context(ev.event_id): + await self._process_pulled_event(origin, ev, backfilled=backfilled) + + # Check if we've already tried to process these events at some point in the + # past. We aren't concerned with the expontntial backoff here, just whether it + # has failed to be processed before. + new_event_dict = {event.event_id: event for event in new_events} + ( + event_ids_with_failed_pull_attempts, + fresh_event_ids, + ) = await self._store.separate_event_ids_with_failed_pull_attempts( + new_event_dict.keys() + ) + + # Process previously failed backfill events in the background to not waste + # time on something that is bound to fail again. + run_as_background_process( + "_process_new_pulled_events", + _process_new_pulled_events, + [ + new_event_dict[event_id] + for event_id in event_ids_with_failed_pull_attempts + ], + ) + + # We can optimistically try to process and wait for the event to be fully + # persisted. + await _process_new_pulled_events( + [new_event_dict[event_id] for event_id in fresh_event_ids] + ) @trace @tag_args @@ -953,47 +983,6 @@ async def _process_pulled_event( ) return - # Check if we've already tried to process this event at some point in the past. - # We aren't concerned with the expontntial backoff here, just whether it has - # failed before. - failed_pull_attempt_info = await self._store.get_event_failed_pull_attempt_info( - event.room_id, event_id - ) - if failed_pull_attempt_info: - # Process previously failed backfill events in the background to not waste - # time on something that is bound to fail again. - # - # TODO: Are we concerned with processing too many events in parallel since - # we just fire and forget this off to the background? Should we instead have - # a background queue to chew through? - run_as_background_process( - "_try_process_pulled_event", - self._try_process_pulled_event, - origin, - event, - backfilled, - ) - else: - # Otherwise, we can optimistically try to process and wait for the event to - # be fully persisted. - await self._try_process_pulled_event(origin, event, backfilled) - - async def _try_process_pulled_event( - self, origin: str, event: EventBase, backfilled: bool - ) -> None: - """ - Handles all of the async tasks necessary to process a pulled event. You should - not use this method directly, instead use `_process_pulled_event` which will - handle all of the quick sync checks that should happen before-hand. - - Params: - origin: The server we received this event from - events: The received event - backfilled: True if this is part of a historical batch of events (inhibits - notification to clients, and validation of device keys.) - """ - event_id = event.event_id - try: try: context = await self._compute_event_context_with_maybe_missing_prevs( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 59021ba26df4..4a0f41755ef0 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -108,15 +108,6 @@ class BackfillQueueNavigationItem: type: str -@attr.s(frozen=True, slots=True, auto_attribs=True) -class EventFailedPullAttemptInfo: - event_id: str - room_id: str - num_attempts: int - last_attempt_ts: int - last_cause: str - - class _NoChainCoverIndex(Exception): def __init__(self, room_id: str): super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) @@ -1592,30 +1583,35 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) + # TODO: Add tests for this function @trace - async def get_event_failed_pull_attempt_info( - self, - room_id: str, - event_id: str, - ) -> Optional[EventFailedPullAttemptInfo]: - res = await self.db_pool.simple_select_one( - table="event_failed_pull_attempts", - keyvalues={"room_id": room_id, "event_id": event_id}, - retcols=["num_attempts", "last_attempt_ts", "last_cause"], - allow_none=True, - desc="get_event_failed_pull_attempt_info", - ) + async def separate_event_ids_with_failed_pull_attempts( + self, event_ids: Collection[str] + ) -> Tuple[Collection[str], Collection[str]]: + """ + Separate the given list of events into - if res is None: - return None + Args: + event_ids: A list of events to separate + + Returns: + A tuple with two lists that events separated into based on whether they have + failed pull attempts or not (event_ids_with_failed_pull_attempts, + fresh_event_ids). + """ - return EventFailedPullAttemptInfo( - event_id=event_id, - room_id=room_id, - num_attempts=res["num_attempts"], - last_attempt_ts=res["last_attempt_ts"], - last_cause=res["last_cause"], + rows = await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id",), + desc="separate_event_ids_with_failed_pull_attempts", ) + event_ids_with_failed_pull_attempts = {str(row["event_id"]) for row in rows} + fresh_event_ids = set(event_ids) - event_ids_with_failed_pull_attempts + + return (event_ids_with_failed_pull_attempts, fresh_event_ids) @trace async def get_event_ids_to_not_pull_from_backoff( From 93de8560a2ff00bf7c0d8ff3ed2718e5a0d176c4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 May 2023 16:27:33 -0500 Subject: [PATCH 08/26] Better comments --- synapse/handlers/federation_event.py | 2 +- synapse/storage/databases/main/event_federation.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 78438f79f4d4..951a5920f481 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -922,7 +922,7 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: ) # We can optimistically try to process and wait for the event to be fully - # persisted. + # persisted if we've never tried before. await _process_new_pulled_events( [new_event_dict[event_id] for event_id in fresh_event_ids] ) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 4a0f41755ef0..efa404ab02e7 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1589,15 +1589,16 @@ async def separate_event_ids_with_failed_pull_attempts( self, event_ids: Collection[str] ) -> Tuple[Collection[str], Collection[str]]: """ - Separate the given list of events into + Separate the given list of events into two lists based on whether they have any + failed pull attempts or not. Args: event_ids: A list of events to separate Returns: - A tuple with two lists that events separated into based on whether they have - failed pull attempts or not (event_ids_with_failed_pull_attempts, - fresh_event_ids). + A tuple with two lists where the given event_ids are separated based on + whether they have any failed pull attempts or not + (event_ids_with_failed_pull_attempts, fresh_event_ids). """ rows = await self.db_pool.simple_select_many_batch( From 631d7dba1a00adebebf29577207f38c3c1ae9cce Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 May 2023 16:51:04 -0500 Subject: [PATCH 09/26] Add test for `separate_event_ids_with_failed_pull_attempts` --- .../databases/main/event_federation.py | 1 - tests/storage/test_event_federation.py | 43 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index efa404ab02e7..19ede57af740 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1583,7 +1583,6 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) - # TODO: Add tests for this function @trace async def separate_event_ids_with_failed_pull_attempts( self, event_ids: Collection[str] diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 81e50bdd5523..f6244279aa78 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1134,6 +1134,49 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] self.assertEqual(backfill_event_ids, ["insertion_eventA"]) + def test_separate_event_ids_with_failed_pull_attempts(self) -> None: + """ + Test to make sure TODO + """ + # Create the room + user_id = self.register_user("alice", "test") + tok = self.login("alice", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "$failed_event_id", "fake cause" + ) + ) + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "$failed_event_id2", "fake cause" + ) + ) + + (event_ids_with_failed_pull_attempts, fresh_event_ids) = self.get_success( + self.store.separate_event_ids_with_failed_pull_attempts( + event_ids=[ + "$failed_event_id", + "$fresh_event_id", + "$failed_event_id2", + "$fresh_event_id2", + ] + ) + ) + + self.assertEqual( + event_ids_with_failed_pull_attempts, + # We expect a 2^1 hour backoff after a single failed attempt. + {"$failed_event_id", "$failed_event_id2"}, + ) + + self.assertEqual( + fresh_event_ids, + # We expect a 2^1 hour backoff after a single failed attempt. + {"$fresh_event_id", "$fresh_event_id2"}, + ) + def test_get_event_ids_to_not_pull_from_backoff(self) -> None: """ Test to make sure only event IDs we should backoff from are returned. From beeccc33dd6124bb34c6fd2b1539023aabbbc835 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 May 2023 02:22:56 -0500 Subject: [PATCH 10/26] Avoid doing extra work if the list is empty --- synapse/handlers/federation_event.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 951a5920f481..33dffd0857ef 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -912,20 +912,21 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: # Process previously failed backfill events in the background to not waste # time on something that is bound to fail again. - run_as_background_process( - "_process_new_pulled_events", - _process_new_pulled_events, - [ - new_event_dict[event_id] - for event_id in event_ids_with_failed_pull_attempts - ], - ) + events_with_failed_pull_attempts = [ + new_event_dict[event_id] for event_id in event_ids_with_failed_pull_attempts + ] + if len(events_with_failed_pull_attempts) > 0: + run_as_background_process( + "_process_new_pulled_events", + _process_new_pulled_events, + events_with_failed_pull_attempts, + ) # We can optimistically try to process and wait for the event to be fully # persisted if we've never tried before. - await _process_new_pulled_events( - [new_event_dict[event_id] for event_id in fresh_event_ids] - ) + fresh_events = [new_event_dict[event_id] for event_id in fresh_event_ids] + if len(fresh_events) > 0: + await _process_new_pulled_events(fresh_events) @trace @tag_args From 7eabc60a59b08d8950172c382b9f1dfc47d8170f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 May 2023 02:44:04 -0500 Subject: [PATCH 11/26] Make sure to retain the same order they were given in case the depth is the same --- synapse/handlers/federation_event.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 33dffd0857ef..cde854627d0f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -902,7 +902,9 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: # Check if we've already tried to process these events at some point in the # past. We aren't concerned with the expontntial backoff here, just whether it # has failed to be processed before. - new_event_dict = {event.event_id: event for event in new_events} + new_event_dict = collections.OrderedDict( + (event.event_id, event) for event in new_events + ) ( event_ids_with_failed_pull_attempts, fresh_event_ids, @@ -917,7 +919,7 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: ] if len(events_with_failed_pull_attempts) > 0: run_as_background_process( - "_process_new_pulled_events", + "_process_new_pulled_events_with_failed_pull_attempts", _process_new_pulled_events, events_with_failed_pull_attempts, ) From 7583c2cd31fa30b9750301f1435f4822a6d02078 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 May 2023 02:48:20 -0500 Subject: [PATCH 12/26] Add comments why OrderedDict --- synapse/handlers/federation_event.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index cde854627d0f..a35bf0d78b1f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -899,12 +899,17 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: with nested_logging_context(ev.event_id): await self._process_pulled_event(origin, ev, backfilled=backfilled) - # Check if we've already tried to process these events at some point in the - # past. We aren't concerned with the expontntial backoff here, just whether it - # has failed to be processed before. + # We use an `OrderedDict` because even though we sort by depth when we process + # the list, it's still important that we maintain the order of the given events + # in case the depth of two events is the same. MSC2716 relies on events at the + # same depth and `/backfill`` gives a carefully crafted order that we should try + # to maintain. new_event_dict = collections.OrderedDict( (event.event_id, event) for event in new_events ) + # Check if we've already tried to process these events at some point in the + # past. We aren't concerned with the expontntial backoff here, just whether it + # has failed to be processed before. ( event_ids_with_failed_pull_attempts, fresh_event_ids, From e10131825f5fb62c8afcc298ccbf6492adbc5d89 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 May 2023 02:48:42 -0500 Subject: [PATCH 13/26] Make test more robust around ordering --- tests/storage/test_event_federation.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index f6244279aa78..8d150ef23aff 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1143,9 +1143,17 @@ def test_separate_event_ids_with_failed_pull_attempts(self) -> None: tok = self.login("alice", "test") room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + # We purposely record the failed pull attempt for `$failed_event_id3` first + # to make sure we return results in the order of the event_ids passed in instead of + # the order in which we find things in the database. self.get_success( self.store.record_event_failed_pull_attempt( - room_id, "$failed_event_id", "fake cause" + room_id, "$failed_event_id3", "fake cause" + ) + ) + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "$failed_event_id1", "fake cause" ) ) self.get_success( @@ -1157,10 +1165,12 @@ def test_separate_event_ids_with_failed_pull_attempts(self) -> None: (event_ids_with_failed_pull_attempts, fresh_event_ids) = self.get_success( self.store.separate_event_ids_with_failed_pull_attempts( event_ids=[ - "$failed_event_id", - "$fresh_event_id", + "$failed_event_id1", + "$fresh_event_id1", "$failed_event_id2", "$fresh_event_id2", + "$failed_event_id3", + "$fresh_event_id3", ] ) ) @@ -1168,13 +1178,13 @@ def test_separate_event_ids_with_failed_pull_attempts(self) -> None: self.assertEqual( event_ids_with_failed_pull_attempts, # We expect a 2^1 hour backoff after a single failed attempt. - {"$failed_event_id", "$failed_event_id2"}, + {"$failed_event_id1", "$failed_event_id2", "$failed_event_id3"}, ) self.assertEqual( fresh_event_ids, # We expect a 2^1 hour backoff after a single failed attempt. - {"$fresh_event_id", "$fresh_event_id2"}, + {"$fresh_event_id1", "$fresh_event_id2", "$fresh_event_id3"}, ) def test_get_event_ids_to_not_pull_from_backoff(self) -> None: From 899fc34a5dd451a2c392b83218dbf6f1027fe164 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 May 2023 03:00:52 -0500 Subject: [PATCH 14/26] Add test description --- tests/storage/test_event_federation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 8d150ef23aff..5a1a79c978e1 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1136,7 +1136,8 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ def test_separate_event_ids_with_failed_pull_attempts(self) -> None: """ - Test to make sure TODO + Test to make sure we properly separate event_ids based on whether they have any + failed pull attempts """ # Create the room user_id = self.register_user("alice", "test") From b5aec4f25ce2d051cba3d6d147900c9a1b7a273d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 May 2023 03:28:03 -0500 Subject: [PATCH 15/26] Same order separated results --- .../databases/main/event_federation.py | 19 ++++++++++-- tests/storage/test_event_federation.py | 30 ++++++++++--------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 19ede57af740..5c50c5fb074f 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1597,7 +1597,8 @@ async def separate_event_ids_with_failed_pull_attempts( Returns: A tuple with two lists where the given event_ids are separated based on whether they have any failed pull attempts or not - (event_ids_with_failed_pull_attempts, fresh_event_ids). + (event_ids_with_failed_pull_attempts, fresh_event_ids). Lists are ordered + the same as the given event_ids. """ rows = await self.db_pool.simple_select_many_batch( @@ -1608,8 +1609,20 @@ async def separate_event_ids_with_failed_pull_attempts( retcols=("event_id",), desc="separate_event_ids_with_failed_pull_attempts", ) - event_ids_with_failed_pull_attempts = {str(row["event_id"]) for row in rows} - fresh_event_ids = set(event_ids) - event_ids_with_failed_pull_attempts + event_ids_with_failed_pull_attempts_from_database = [ + str(row["event_id"]) for row in rows + ] + # We want to maintain the order of the given event_ids so re-construct things + event_ids_with_failed_pull_attempts = [ + event_id + for event_id in event_ids + if event_id in event_ids_with_failed_pull_attempts_from_database + ] + fresh_event_ids = [ + event_id + for event_id in event_ids + if event_id not in event_ids_with_failed_pull_attempts + ] return (event_ids_with_failed_pull_attempts, fresh_event_ids) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 5a1a79c978e1..56186d2f7b39 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1144,34 +1144,36 @@ def test_separate_event_ids_with_failed_pull_attempts(self) -> None: tok = self.login("alice", "test") room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) - # We purposely record the failed pull attempt for `$failed_event_id3` first - # to make sure we return results in the order of the event_ids passed in instead of - # the order in which we find things in the database. + # We purposely record the failed pull attempt for `$failed_event_id3` first to + # make sure we return results in the order of the event_ids passed in instead of + # the order in which we find things in the database or the unordered collections + # we might accidentally use. They also purposely have reverse prefixed a-c in + # front to better test dubious sorting happening somewhere. self.get_success( self.store.record_event_failed_pull_attempt( - room_id, "$failed_event_id3", "fake cause" + room_id, "$a_failed_event_id3", "fake cause" ) ) self.get_success( self.store.record_event_failed_pull_attempt( - room_id, "$failed_event_id1", "fake cause" + room_id, "$c_failed_event_id1", "fake cause" ) ) self.get_success( self.store.record_event_failed_pull_attempt( - room_id, "$failed_event_id2", "fake cause" + room_id, "$b_failed_event_id2", "fake cause" ) ) (event_ids_with_failed_pull_attempts, fresh_event_ids) = self.get_success( self.store.separate_event_ids_with_failed_pull_attempts( event_ids=[ - "$failed_event_id1", - "$fresh_event_id1", - "$failed_event_id2", - "$fresh_event_id2", - "$failed_event_id3", - "$fresh_event_id3", + "$c_failed_event_id1", + "$c_fresh_event_id1", + "$b_failed_event_id2", + "$b_fresh_event_id2", + "$a_failed_event_id3", + "$a_fresh_event_id3", ] ) ) @@ -1179,13 +1181,13 @@ def test_separate_event_ids_with_failed_pull_attempts(self) -> None: self.assertEqual( event_ids_with_failed_pull_attempts, # We expect a 2^1 hour backoff after a single failed attempt. - {"$failed_event_id1", "$failed_event_id2", "$failed_event_id3"}, + ["$c_failed_event_id1", "$b_failed_event_id2", "$a_failed_event_id3"], ) self.assertEqual( fresh_event_ids, # We expect a 2^1 hour backoff after a single failed attempt. - {"$fresh_event_id1", "$fresh_event_id2", "$fresh_event_id3"}, + ["$c_fresh_event_id1", "$b_fresh_event_id2", "$a_fresh_event_id3"], ) def test_get_event_ids_to_not_pull_from_backoff(self) -> None: From 6edd126a61ab1534dea6712566c176930df6ed6a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 May 2023 15:39:25 -0500 Subject: [PATCH 16/26] Refactor to get_event_ids_with_failed_pull_attempts(...) --- synapse/handlers/federation_event.py | 29 +++++++++---------- .../databases/main/event_federation.py | 20 +++++-------- tests/storage/test_event_federation.py | 27 +++++++---------- 3 files changed, 31 insertions(+), 45 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index a35bf0d78b1f..9f6c1461a4d0 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -865,7 +865,7 @@ async def _process_pulled_events( [event.event_id for event in events] ) - new_events = [] + new_events: Collection[EventBase] = [] for event in events: event_id = event.event_id @@ -899,28 +899,21 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: with nested_logging_context(ev.event_id): await self._process_pulled_event(origin, ev, backfilled=backfilled) - # We use an `OrderedDict` because even though we sort by depth when we process - # the list, it's still important that we maintain the order of the given events - # in case the depth of two events is the same. MSC2716 relies on events at the - # same depth and `/backfill`` gives a carefully crafted order that we should try - # to maintain. - new_event_dict = collections.OrderedDict( - (event.event_id, event) for event in new_events - ) # Check if we've already tried to process these events at some point in the # past. We aren't concerned with the expontntial backoff here, just whether it # has failed to be processed before. - ( - event_ids_with_failed_pull_attempts, - fresh_event_ids, - ) = await self._store.separate_event_ids_with_failed_pull_attempts( - new_event_dict.keys() + event_ids_with_failed_pull_attempts = ( + await self._store.get_event_ids_with_failed_pull_attempts( + [event.event_id for event in new_events] + ) ) # Process previously failed backfill events in the background to not waste # time on something that is bound to fail again. events_with_failed_pull_attempts = [ - new_event_dict[event_id] for event_id in event_ids_with_failed_pull_attempts + event + for event in new_events + if event.event_id in event_ids_with_failed_pull_attempts ] if len(events_with_failed_pull_attempts) > 0: run_as_background_process( @@ -931,7 +924,11 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: # We can optimistically try to process and wait for the event to be fully # persisted if we've never tried before. - fresh_events = [new_event_dict[event_id] for event_id in fresh_event_ids] + fresh_events = [ + event + for event in new_events + if event.event_id not in event_ids_with_failed_pull_attempts + ] if len(fresh_events) > 0: await _process_new_pulled_events(fresh_events) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 5c50c5fb074f..14318929cda8 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -46,7 +46,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache @@ -1584,9 +1584,9 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) @trace - async def separate_event_ids_with_failed_pull_attempts( - self, event_ids: Collection[str] - ) -> Tuple[Collection[str], Collection[str]]: + async def get_event_ids_with_failed_pull_attempts( + self, event_ids: StrCollection + ) -> StrCollection: """ Separate the given list of events into two lists based on whether they have any failed pull attempts or not. @@ -1607,24 +1607,20 @@ async def separate_event_ids_with_failed_pull_attempts( iterable=event_ids, keyvalues={}, retcols=("event_id",), - desc="separate_event_ids_with_failed_pull_attempts", + desc="get_event_ids_with_failed_pull_attempts", ) event_ids_with_failed_pull_attempts_from_database = [ str(row["event_id"]) for row in rows ] - # We want to maintain the order of the given event_ids so re-construct things + # We want to maintain the order of the given `event_ids` so re-construct things + # since there is no gurantees from the database implementation/query. event_ids_with_failed_pull_attempts = [ event_id for event_id in event_ids if event_id in event_ids_with_failed_pull_attempts_from_database ] - fresh_event_ids = [ - event_id - for event_id in event_ids - if event_id not in event_ids_with_failed_pull_attempts - ] - return (event_ids_with_failed_pull_attempts, fresh_event_ids) + return event_ids_with_failed_pull_attempts @trace async def get_event_ids_to_not_pull_from_backoff( diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 56186d2f7b39..035ca4a5ba64 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1134,21 +1134,21 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] self.assertEqual(backfill_event_ids, ["insertion_eventA"]) - def test_separate_event_ids_with_failed_pull_attempts(self) -> None: + def test_get_event_ids_with_failed_pull_attempts(self) -> None: """ - Test to make sure we properly separate event_ids based on whether they have any - failed pull attempts + Test to make sure we properly get event_ids based on whether they have any + failed pull attempts. """ # Create the room user_id = self.register_user("alice", "test") tok = self.login("alice", "test") room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) - # We purposely record the failed pull attempt for `$failed_event_id3` first to - # make sure we return results in the order of the event_ids passed in instead of - # the order in which we find things in the database or the unordered collections - # we might accidentally use. They also purposely have reverse prefixed a-c in - # front to better test dubious sorting happening somewhere. + # We purposely record the failed pull attempt for `$c_failed_event_id3` first to + # make sure we return results in the order of the `event_ids` passed in instead + # of the order in which we find things in the database or the unordered + # collections we might accidentally use. They also purposely have reverse + # prefixed a-c in front to better test dubious sorting happening somewhere. self.get_success( self.store.record_event_failed_pull_attempt( room_id, "$a_failed_event_id3", "fake cause" @@ -1165,8 +1165,8 @@ def test_separate_event_ids_with_failed_pull_attempts(self) -> None: ) ) - (event_ids_with_failed_pull_attempts, fresh_event_ids) = self.get_success( - self.store.separate_event_ids_with_failed_pull_attempts( + event_ids_with_failed_pull_attempts = self.get_success( + self.store.get_event_ids_with_failed_pull_attempts( event_ids=[ "$c_failed_event_id1", "$c_fresh_event_id1", @@ -1180,16 +1180,9 @@ def test_separate_event_ids_with_failed_pull_attempts(self) -> None: self.assertEqual( event_ids_with_failed_pull_attempts, - # We expect a 2^1 hour backoff after a single failed attempt. ["$c_failed_event_id1", "$b_failed_event_id2", "$a_failed_event_id3"], ) - self.assertEqual( - fresh_event_ids, - # We expect a 2^1 hour backoff after a single failed attempt. - ["$c_fresh_event_id1", "$b_fresh_event_id2", "$a_fresh_event_id3"], - ) - def test_get_event_ids_to_not_pull_from_backoff(self) -> None: """ Test to make sure only event IDs we should backoff from are returned. From d4b8ff7b46419a00fe3f19e626b40eea8607689b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 May 2023 15:43:31 -0500 Subject: [PATCH 17/26] Update comment doc --- .../storage/databases/main/event_federation.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 14318929cda8..dcce043b11e2 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1588,17 +1588,15 @@ async def get_event_ids_with_failed_pull_attempts( self, event_ids: StrCollection ) -> StrCollection: """ - Separate the given list of events into two lists based on whether they have any - failed pull attempts or not. + Filter the given list of `event_ids` and return events which have any failed + pull attempts. Args: - event_ids: A list of events to separate + event_ids: A list of events to filter down. Returns: - A tuple with two lists where the given event_ids are separated based on - whether they have any failed pull attempts or not - (event_ids_with_failed_pull_attempts, fresh_event_ids). Lists are ordered - the same as the given event_ids. + A filtered down list of `event_ids` that have previous failed pull attempts + (order is maintained). """ rows = await self.db_pool.simple_select_many_batch( @@ -1612,8 +1610,8 @@ async def get_event_ids_with_failed_pull_attempts( event_ids_with_failed_pull_attempts_from_database = [ str(row["event_id"]) for row in rows ] - # We want to maintain the order of the given `event_ids` so re-construct things - # since there is no gurantees from the database implementation/query. + # We want to maintain the order of the given `event_ids` so we re-construct the + # list since there is no gurantees from the database implementation/query. event_ids_with_failed_pull_attempts = [ event_id for event_id in event_ids From d843557baf069b3ab03b05f04f2967dce4e28e7e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 18 May 2023 12:08:33 -0500 Subject: [PATCH 18/26] Use List See https://github.com/matrix-org/synapse/pull/15585#discussion_r1197029802 --- synapse/handlers/federation_event.py | 2 +- synapse/storage/databases/main/event_federation.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9f6c1461a4d0..991bcbf80383 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -865,7 +865,7 @@ async def _process_pulled_events( [event.event_id for event in events] ) - new_events: Collection[EventBase] = [] + new_events: List[EventBase] = [] for event in events: event_id = event.event_id diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index dcce043b11e2..9b3d925b2b60 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -46,7 +46,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import JsonDict, StrCollection +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache @@ -1585,8 +1585,8 @@ def _record_event_failed_pull_attempt_upsert_txn( @trace async def get_event_ids_with_failed_pull_attempts( - self, event_ids: StrCollection - ) -> StrCollection: + self, event_ids: List[str] + ) -> List[str]: """ Filter the given list of `event_ids` and return events which have any failed pull attempts. From c4e1533488e800786218b133590f3db78062f0bc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 May 2023 19:44:21 -0500 Subject: [PATCH 19/26] Trace differentiaed events --- synapse/handlers/federation_event.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index fc7814b66172..7b6d87547735 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -920,6 +920,14 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: for event in new_events if event.event_id in event_ids_with_failed_pull_attempts ] + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts", + str(event_ids_with_failed_pull_attempts), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length", + str(len(events_with_failed_pull_attempts)), + ) if len(events_with_failed_pull_attempts) > 0: run_as_background_process( "_process_new_pulled_events_with_failed_pull_attempts", @@ -934,6 +942,14 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: for event in new_events if event.event_id not in event_ids_with_failed_pull_attempts ] + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "fresh_events", + str([event.event_id for event in fresh_events]), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "fresh_events.length", + str(len(fresh_events)), + ) if len(fresh_events) > 0: await _process_new_pulled_events(fresh_events) From ec230a3687a5d36441d061f0930816fd72ad6f10 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 May 2023 13:25:12 -0500 Subject: [PATCH 20/26] Prefer plain language Co-authored-by: Patrick Cloke --- synapse/handlers/federation_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 7b6d87547735..9ff22fec754d 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -914,7 +914,7 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: ) # Process previously failed backfill events in the background to not waste - # time on something that is bound to fail again. + # time on something that is likely to fail again. events_with_failed_pull_attempts = [ event for event in new_events From 22a69bec89e5b7cf47bf5c6043f528855cd6831e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 May 2023 14:40:30 -0500 Subject: [PATCH 21/26] Use a `set` for efficient lookups Since we only use the result from `get_event_ids_with_failed_pull_attempts(...)` in lookups, we don't need to care about the order. See: - https://github.com/matrix-org/synapse/pull/15585#discussion_r1204479216 - https://github.com/matrix-org/synapse/pull/15585#discussion_r1204483214 --- .../databases/main/event_federation.py | 20 ++++---------- tests/storage/test_event_federation.py | 26 +++++-------------- 2 files changed, 12 insertions(+), 34 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 9b3d925b2b60..eac020db703a 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -46,7 +46,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache @@ -1585,8 +1585,8 @@ def _record_event_failed_pull_attempt_upsert_txn( @trace async def get_event_ids_with_failed_pull_attempts( - self, event_ids: List[str] - ) -> List[str]: + self, event_ids: StrCollection + ) -> Set[str]: """ Filter the given list of `event_ids` and return events which have any failed pull attempts. @@ -1595,8 +1595,7 @@ async def get_event_ids_with_failed_pull_attempts( event_ids: A list of events to filter down. Returns: - A filtered down list of `event_ids` that have previous failed pull attempts - (order is maintained). + A filtered down list of `event_ids` that have previous failed pull attempts. """ rows = await self.db_pool.simple_select_many_batch( @@ -1607,16 +1606,7 @@ async def get_event_ids_with_failed_pull_attempts( retcols=("event_id",), desc="get_event_ids_with_failed_pull_attempts", ) - event_ids_with_failed_pull_attempts_from_database = [ - str(row["event_id"]) for row in rows - ] - # We want to maintain the order of the given `event_ids` so we re-construct the - # list since there is no gurantees from the database implementation/query. - event_ids_with_failed_pull_attempts = [ - event_id - for event_id in event_ids - if event_id in event_ids_with_failed_pull_attempts_from_database - ] + event_ids_with_failed_pull_attempts = {str(row["event_id"]) for row in rows} return event_ids_with_failed_pull_attempts diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 035ca4a5ba64..4b8d8328d742 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1144,43 +1144,31 @@ def test_get_event_ids_with_failed_pull_attempts(self) -> None: tok = self.login("alice", "test") room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) - # We purposely record the failed pull attempt for `$c_failed_event_id3` first to - # make sure we return results in the order of the `event_ids` passed in instead - # of the order in which we find things in the database or the unordered - # collections we might accidentally use. They also purposely have reverse - # prefixed a-c in front to better test dubious sorting happening somewhere. self.get_success( self.store.record_event_failed_pull_attempt( - room_id, "$a_failed_event_id3", "fake cause" + room_id, "$failed_event_id1", "fake cause" ) ) self.get_success( self.store.record_event_failed_pull_attempt( - room_id, "$c_failed_event_id1", "fake cause" - ) - ) - self.get_success( - self.store.record_event_failed_pull_attempt( - room_id, "$b_failed_event_id2", "fake cause" + room_id, "$failed_event_id2", "fake cause" ) ) event_ids_with_failed_pull_attempts = self.get_success( self.store.get_event_ids_with_failed_pull_attempts( event_ids=[ - "$c_failed_event_id1", - "$c_fresh_event_id1", - "$b_failed_event_id2", - "$b_fresh_event_id2", - "$a_failed_event_id3", - "$a_fresh_event_id3", + "$failed_event_id1", + "$fresh_event_id1", + "$failed_event_id2", + "$fresh_event_id2", ] ) ) self.assertEqual( event_ids_with_failed_pull_attempts, - ["$c_failed_event_id1", "$b_failed_event_id2", "$a_failed_event_id3"], + {"$failed_event_id1", "$failed_event_id2"}, ) def test_get_event_ids_to_not_pull_from_backoff(self) -> None: From 65febed40e03f2c02b8fabf681becf740717abc6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 May 2023 14:50:21 -0500 Subject: [PATCH 22/26] Add some context See https://github.com/matrix-org/synapse/pull/15585#discussion_r1204514303 --- synapse/handlers/federation_event.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9ff22fec754d..d5a7aa79101f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -897,8 +897,10 @@ async def _process_pulled_events( @trace async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: - # We want to sort these by depth so we process them and - # tell clients about them in order. + # We want to sort these by depth so we process them and tell clients about + # them in order. It's also more efficient to backfill this way (`depth` + # ascending) because one backfill event is likely to be the `prev_event` of + # the next event we're going to process. sorted_events = sorted(new_events, key=lambda x: x.depth) for ev in sorted_events: with nested_logging_context(ev.event_id): From 6474b4ea6002cae550f7f60a9e79bc31dcd6492e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 May 2023 15:09:20 -0500 Subject: [PATCH 23/26] Use dedicated `partition` function to separate list See https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$3ygksoSclc6ZlWNCKqysasK3oU_lrLkdeOL6nveDliA?via=matrix.org&via=element.io&via=pixie.town --- synapse/handlers/federation_event.py | 36 ++++++++++++---------------- synapse/util/iterutils.py | 27 +++++++++++++++++++++ 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index d5a7aa79101f..7f0ac9bb5619 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -88,7 +88,7 @@ ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter +from synapse.util.iterutils import batch_iter, partition from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -915,13 +915,9 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: ) ) - # Process previously failed backfill events in the background to not waste - # time on something that is likely to fail again. - events_with_failed_pull_attempts = [ - event - for event in new_events - if event.event_id in event_ids_with_failed_pull_attempts - ] + events_with_failed_pull_attempts, fresh_events = partition( + new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts + ) set_tag( SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts", str(event_ids_with_failed_pull_attempts), @@ -930,6 +926,17 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length", str(len(events_with_failed_pull_attempts)), ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "fresh_events", + str([event.event_id for event in fresh_events]), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "fresh_events.length", + str(len(fresh_events)), + ) + + # Process previously failed backfill events in the background to not waste + # time on something that is likely to fail again. if len(events_with_failed_pull_attempts) > 0: run_as_background_process( "_process_new_pulled_events_with_failed_pull_attempts", @@ -939,19 +946,6 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: # We can optimistically try to process and wait for the event to be fully # persisted if we've never tried before. - fresh_events = [ - event - for event in new_events - if event.event_id not in event_ids_with_failed_pull_attempts - ] - set_tag( - SynapseTags.FUNC_ARG_PREFIX + "fresh_events", - str([event.event_id for event in fresh_events]), - ) - set_tag( - SynapseTags.RESULT_PREFIX + "fresh_events.length", - str(len(fresh_events)), - ) if len(fresh_events) > 0: await _process_new_pulled_events(fresh_events) diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 4938ddf70321..a0efb96d3b46 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -15,11 +15,13 @@ import heapq from itertools import islice from typing import ( + Callable, Collection, Dict, Generator, Iterable, Iterator, + List, Mapping, Set, Sized, @@ -71,6 +73,31 @@ def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]: return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen)) +def partition( + iterable: Iterable[T], predicate: Callable[[T], bool] +) -> Tuple[List[T], List[T]]: + """ + Separate a given iterable into two lists based on the result of a predicate function. + + Args: + iterable: the iterable to partition (separate) + predicate: a function that takes an item from the iterable and returns a boolean + + Returns: + A tuple of two lists, the first containing all items for which the predicate + returned True, the second containing all items for which the predicate returned + False + """ + true_results = [] + false_results = [] + for item in iterable: + if predicate(item): + true_results.append(item) + else: + false_results.append(item) + return true_results, false_results + + def sorted_topologically( nodes: Iterable[T], graph: Mapping[T, Collection[T]], From 15527f79d99dac03e5ac13cc40ce66c88500dd1f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 May 2023 15:37:53 -0500 Subject: [PATCH 24/26] Add context for why source order for MSC2716 See https://github.com/matrix-org/synapse/pull/15585#discussion_r1204514303 --- synapse/handlers/federation_event.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 7f0ac9bb5619..42141d367057 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -915,6 +915,13 @@ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None: ) ) + # We construct the event lists in source order from `/backfill` response because + # it's a) easiest, but also b) the order in which we process things matters for + # MSC2716 historical batches because many historical events are all at the same + # `depth` and we rely on the tenuous sort that the other server gave us and hope + # they're doing their best. The brittle nature of this ordering for historical + # messages over federation is one of the reasons why we don't want to continue + # on MSC2716 until we have online topological ordering. events_with_failed_pull_attempts, fresh_events = partition( new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts ) From d59615fd814b107c73905a2b5e460eaac6c8b341 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 May 2023 16:11:31 -0500 Subject: [PATCH 25/26] Add sanity check test that failed pull attempt events are still processed even if it is in the background See https://github.com/matrix-org/synapse/pull/15585#discussion_r1194696304 --- tests/handlers/test_federation_event.py | 95 +++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index c067e5bfe3dc..23f1b33b2fda 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -664,6 +664,101 @@ async def get_room_state( StoreError, ) + def test_backfill_process_previously_failed_pull_attempt_event_in_the_background( + self, + ) -> None: + """ + Sanity check that events are still processed even if it is in the background + for events that already have failed pull attempts. + """ + OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" + main_store = self.hs.get_datastores().main + + # Create the room + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + room_version = self.get_success(main_store.get_room_version(room_id)) + + # Allow the remote user to send state events + self.helper.send_state( + room_id, + "m.room.power_levels", + {"events_default": 0, "state_default": 0}, + tok=tok, + ) + + # Add the remote user to the room + member_event = self.get_success( + event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join") + ) + + initial_state_map = self.get_success( + main_store.get_partial_current_state_ids(room_id) + ) + + auth_event_ids = [ + initial_state_map[("m.room.create", "")], + initial_state_map[("m.room.power_levels", "")], + member_event.event_id, + ] + + # Create a regular event that should process + pulled_event = make_event_from_dict( + self.add_hashes_and_signatures_from_other_server( + { + "type": "test_regular_type", + "room_id": room_id, + "sender": OTHER_USER, + "prev_events": [ + member_event.event_id, + ], + "auth_events": auth_event_ids, + "origin_server_ts": 1, + "depth": 12, + "content": {"body": "pulled_event"}, + } + ), + room_version, + ) + + # Record a failed pull attempt for this event which will cause us to backfill it + # in the background from here on out. + self.get_success( + main_store.record_event_failed_pull_attempt( + room_id, pulled_event.event_id, "fake cause" + ) + ) + + # We expect an outbound request to /backfill, so stub that out + self.mock_federation_transport_client.backfill.return_value = make_awaitable( + { + "origin": self.OTHER_SERVER_NAME, + "origin_server_ts": 123, + "pdus": [ + pulled_event.get_pdu_json(), + ], + } + ) + + # The function under test: try to backfill and process the pulled event + with LoggingContext("test"): + self.get_success( + self.hs.get_federation_event_handler().backfill( + self.OTHER_SERVER_NAME, + room_id, + limit=1, + extremities=["$some_extremity"], + ) + ) + + # Ensure `run_as_background_process(...)` has a chance to run (essentially + # `wait_for_background_processes()`) + self.reactor.pump((0.1,)) + + # Make sure we processed and persisted the pulled event + self.get_success(main_store.get_event(pulled_event.event_id, allow_none=False)) + def test_process_pulled_event_with_rejected_missing_state(self) -> None: """Ensure that we correctly handle pulled events with missing state containing a rejected state event From 95ffa7cf975b84b569527ed4a97f76a2988d81ed Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 May 2023 22:22:07 -0500 Subject: [PATCH 26/26] Use obvious type See https://github.com/matrix-org/synapse/pull/15585#discussion_r1204866135 --- synapse/storage/databases/main/event_federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index eac020db703a..2681917d0b6a 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1606,7 +1606,9 @@ async def get_event_ids_with_failed_pull_attempts( retcols=("event_id",), desc="get_event_ids_with_failed_pull_attempts", ) - event_ids_with_failed_pull_attempts = {str(row["event_id"]) for row in rows} + event_ids_with_failed_pull_attempts: Set[str] = { + row["event_id"] for row in rows + } return event_ids_with_failed_pull_attempts