Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Process previously failed backfill events in the background #15585

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
fd26164
Process previously failed backfill events in the background
MadLittleMods May 12, 2023
c5dc746
Add changelog
MadLittleMods May 12, 2023
8fc47d8
Add consideration
MadLittleMods May 12, 2023
b5d95f7
Fix lints
MadLittleMods May 12, 2023
ebc93be
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 16, 2023
e13f5a9
Always check for failed attempts
MadLittleMods May 16, 2023
70f5911
Add comments and concern about maybe queue
MadLittleMods May 16, 2023
45934fe
Process all failed events as a sequential task in the background
MadLittleMods May 16, 2023
b1998d7
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 16, 2023
93de856
Better comments
MadLittleMods May 16, 2023
631d7db
Add test for `separate_event_ids_with_failed_pull_attempts`
MadLittleMods May 16, 2023
beeccc3
Avoid doing extra work if the list is empty
MadLittleMods May 17, 2023
7eabc60
Make sure to retain the same order they were given in case the depth …
MadLittleMods May 17, 2023
7583c2c
Add comments why OrderedDict
MadLittleMods May 17, 2023
e101318
Make test more robust around ordering
MadLittleMods May 17, 2023
899fc34
Add test description
MadLittleMods May 17, 2023
b5aec4f
Same order separated results
MadLittleMods May 17, 2023
6edd126
Refactor to get_event_ids_with_failed_pull_attempts(...)
MadLittleMods May 17, 2023
d4b8ff7
Update comment doc
MadLittleMods May 17, 2023
6a0ec9d
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 18, 2023
d843557
Use List
MadLittleMods May 18, 2023
75bec52
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 23, 2023
c4e1533
Trace differentiaed events
MadLittleMods May 23, 2023
ec230a3
Prefer plain language
MadLittleMods May 24, 2023
22a69be
Use a `set` for efficient lookups
MadLittleMods May 24, 2023
65febed
Add some context
MadLittleMods May 24, 2023
6474b4e
Use dedicated `partition` function to separate list
MadLittleMods May 24, 2023
15527f7
Add context for why source order for MSC2716
MadLittleMods May 24, 2023
d59615f
Add sanity check test that failed pull attempt events are still proce…
MadLittleMods May 24, 2023
95ffa7c
Use obvious type
MadLittleMods May 25, 2023
50acf6a
Merge branch 'develop' into madlittlemods/process-previously-failed-e…
MadLittleMods May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15585.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Process previously failed backfill events in the background to avoid blocking requests for something that is bound to fail again.
70 changes: 62 additions & 8 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.iterutils import batch_iter, partition
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr

Expand Down Expand Up @@ -865,7 +865,7 @@ async def _process_pulled_events(
[event.event_id for event in events]
)

new_events = []
new_events: List[EventBase] = []
for event in events:
event_id = event.event_id

Expand Down Expand Up @@ -895,12 +895,66 @@ async def _process_pulled_events(
str(len(new_events)),
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@trace
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
# We want to sort these by depth so we process them and tell clients about
# them in order. It's also more efficient to backfill this way (`depth`
# ascending) because one backfill event is likely to be the `prev_event` of
# the next event we're going to process.
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)

# Check if we've already tried to process these events at some point in the
# past. We aren't concerned with the expontntial backoff here, just whether it
# has failed to be processed before.
event_ids_with_failed_pull_attempts = (
await self._store.get_event_ids_with_failed_pull_attempts(
[event.event_id for event in new_events]
)
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# We construct the event lists in source order from `/backfill` response because
# it's a) easiest, but also b) the order in which we process things matters for
# MSC2716 historical batches because many historical events are all at the same
# `depth` and we rely on the tenuous sort that the other server gave us and hope
# they're doing their best. The brittle nature of this ordering for historical
# messages over federation is one of the reasons why we don't want to continue
# on MSC2716 until we have online topological ordering.
events_with_failed_pull_attempts, fresh_events = partition(
new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts",
str(event_ids_with_failed_pull_attempts),
)
set_tag(
SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length",
str(len(events_with_failed_pull_attempts)),
)
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "fresh_events",
str([event.event_id for event in fresh_events]),
)
set_tag(
SynapseTags.RESULT_PREFIX + "fresh_events.length",
str(len(fresh_events)),
)

# Process previously failed backfill events in the background to not waste
# time on something that is likely to fail again.
if len(events_with_failed_pull_attempts) > 0:
run_as_background_process(
"_process_new_pulled_events_with_failed_pull_attempts",
_process_new_pulled_events,
events_with_failed_pull_attempts,
)

# We can optimistically try to process and wait for the event to be fully
# persisted if we've never tried before.
if len(fresh_events) > 0:
await _process_new_pulled_events(fresh_events)

@trace
@tag_args
Expand Down
31 changes: 30 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
Expand Down Expand Up @@ -1583,6 +1583,35 @@ def _record_event_failed_pull_attempt_upsert_txn(

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

@trace
async def get_event_ids_with_failed_pull_attempts(
self, event_ids: StrCollection
) -> Set[str]:
"""
Filter the given list of `event_ids` and return events which have any failed
pull attempts.

Args:
event_ids: A list of events to filter down.

Returns:
A filtered down list of `event_ids` that have previous failed pull attempts.
"""

rows = await self.db_pool.simple_select_many_batch(
table="event_failed_pull_attempts",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=("event_id",),
desc="get_event_ids_with_failed_pull_attempts",
)
event_ids_with_failed_pull_attempts: Set[str] = {
row["event_id"] for row in rows
}

return event_ids_with_failed_pull_attempts

@trace
async def get_event_ids_to_not_pull_from_backoff(
self,
Expand Down
27 changes: 27 additions & 0 deletions synapse/util/iterutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import heapq
from itertools import islice
from typing import (
Callable,
Collection,
Dict,
Generator,
Iterable,
Iterator,
List,
Mapping,
Set,
Sized,
Expand Down Expand Up @@ -71,6 +73,31 @@ def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen))


def partition(
iterable: Iterable[T], predicate: Callable[[T], bool]
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
) -> Tuple[List[T], List[T]]:
"""
Separate a given iterable into two lists based on the result of a predicate function.

Args:
iterable: the iterable to partition (separate)
predicate: a function that takes an item from the iterable and returns a boolean

Returns:
A tuple of two lists, the first containing all items for which the predicate
returned True, the second containing all items for which the predicate returned
False
"""
true_results = []
false_results = []
for item in iterable:
if predicate(item):
true_results.append(item)
else:
false_results.append(item)
return true_results, false_results
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved


def sorted_topologically(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
Expand Down
95 changes: 95 additions & 0 deletions tests/handlers/test_federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,101 @@ async def get_room_state(
StoreError,
)

def test_backfill_process_previously_failed_pull_attempt_event_in_the_background(
self,
) -> None:
"""
Sanity check that events are still processed even if it is in the background
for events that already have failed pull attempts.
"""
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main

# Create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# Allow the remote user to send state events
self.helper.send_state(
room_id,
"m.room.power_levels",
{"events_default": 0, "state_default": 0},
tok=tok,
)

# Add the remote user to the room
member_event = self.get_success(
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
)

initial_state_map = self.get_success(
main_store.get_partial_current_state_ids(room_id)
)

auth_event_ids = [
initial_state_map[("m.room.create", "")],
initial_state_map[("m.room.power_levels", "")],
member_event.event_id,
]

# Create a regular event that should process
pulled_event = make_event_from_dict(
self.add_hashes_and_signatures_from_other_server(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [
member_event.event_id,
],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled_event"},
}
),
room_version,
)

# Record a failed pull attempt for this event which will cause us to backfill it
# in the background from here on out.
self.get_success(
main_store.record_event_failed_pull_attempt(
room_id, pulled_event.event_id, "fake cause"
)
)

# We expect an outbound request to /backfill, so stub that out
self.mock_federation_transport_client.backfill.return_value = make_awaitable(
{
"origin": self.OTHER_SERVER_NAME,
"origin_server_ts": 123,
"pdus": [
pulled_event.get_pdu_json(),
],
}
)

# The function under test: try to backfill and process the pulled event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler().backfill(
self.OTHER_SERVER_NAME,
room_id,
limit=1,
extremities=["$some_extremity"],
)
)

# Ensure `run_as_background_process(...)` has a chance to run (essentially
# `wait_for_background_processes()`)
self.reactor.pump((0.1,))
Comment on lines +755 to +757
Copy link
Contributor Author

@MadLittleMods MadLittleMods May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing from previous comment,

The test passes even without this pump(). I'm uncertain about the exact intricacies and details of run_as_background_process(...) that allow it to pass. It is possible that the test passes due to its time-sensitive nature, and without the pump(), it might become flakey. However, these are merely theoretical assumptions.

But I want to ensure that this test is robust to any work that may happen in run_as_background_process(...) in the future. Ideally, we would have an idiomatic wait_for_background_processes() that would indicate clear intentions and handle the complexity of whatever waiting is necessary.


# Make sure we processed and persisted the pulled event
self.get_success(main_store.get_event(pulled_event.event_id, allow_none=False))

def test_process_pulled_event_with_rejected_missing_state(self) -> None:
"""Ensure that we correctly handle pulled events with missing state containing a
rejected state event
Expand Down
37 changes: 37 additions & 0 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,43 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["insertion_eventA"])

def test_get_event_ids_with_failed_pull_attempts(self) -> None:
"""
Test to make sure we properly get event_ids based on whether they have any
failed pull attempts.
"""
# Create the room
user_id = self.register_user("alice", "test")
tok = self.login("alice", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)

self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$failed_event_id1", "fake cause"
)
)
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$failed_event_id2", "fake cause"
)
)

event_ids_with_failed_pull_attempts = self.get_success(
self.store.get_event_ids_with_failed_pull_attempts(
event_ids=[
"$failed_event_id1",
"$fresh_event_id1",
"$failed_event_id2",
"$fresh_event_id2",
]
)
)

self.assertEqual(
event_ids_with_failed_pull_attempts,
{"$failed_event_id1", "$failed_event_id2"},
)

def test_get_event_ids_to_not_pull_from_backoff(self) -> None:
"""
Test to make sure only event IDs we should backoff from are returned.
Expand Down