Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix race in federation sender that delayed device updates. (#6799)
Browse files Browse the repository at this point in the history
* commit '6b9e1014c':
  Fix race in federation sender that delayed device updates. (#6799)
  • Loading branch information
anoadragon453 committed Mar 23, 2020
2 parents abf0e75 + 6b9e101 commit 70e37d5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog.d/6799.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race in federation sender worker that delayed sending of device updates.
20 changes: 19 additions & 1 deletion synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.replication.tcp.streams._base import (
DeviceListsStream,
ReceiptsStream,
ToDeviceStream,
)
from synapse.server import HomeServer
from synapse.storage.database import Database
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -256,6 +260,20 @@ def process_replication_rows(self, stream_name, token, rows):
"process_receipts_for_federation", self._on_new_receipts, rows
)

# ... as well as device updates and messages
elif stream_name == DeviceListsStream.NAME:
hosts = set(row.destination for row in rows)
for host in hosts:
self.federation_sender.send_device_messages(host)

elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
# clients and remote servers, so we ignore entities that start with
# '@' (since they'll be local users rather than destinations).
hosts = set(row.entity for row in rows if not row.entity.startswith("@"))
for host in hosts:
self.federation_sender.send_device_messages(host)

@defer.inlineCallbacks
def _on_new_receipts(self, rows):
"""
Expand Down
32 changes: 3 additions & 29 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ def __init__(self, hs):

self.edus = SortedDict() # stream position -> Edu

self.device_messages = SortedDict() # stream position -> destination

self.pos = 1
self.pos_time = SortedDict()

Expand All @@ -92,7 +90,6 @@ def register(name, queue):
"keyed_edu",
"keyed_edu_changed",
"edus",
"device_messages",
"pos_time",
"presence_destinations",
]:
Expand Down Expand Up @@ -171,12 +168,6 @@ def _clear_queue_before_pos(self, position_to_delete):
for key in keys[:i]:
del self.edus[key]

# Delete things out of device map
keys = self.device_messages.keys()
i = self.device_messages.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]

def notify_new_events(self, current_id):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
Expand Down Expand Up @@ -249,9 +240,8 @@ def send_presence_to_destinations(self, states, destinations):

def send_device_messages(self, destination):
"""As per FederationSender"""
pos = self._next_pos()
self.device_messages[pos] = destination
self.notifier.on_new_replication_data()
# We don't need to replicate this as it gets sent down a different
# stream.

def get_current_token(self):
return self.pos - 1
Expand Down Expand Up @@ -339,14 +329,6 @@ async def get_replication_rows(
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))

# Fetch changed device messages
i = self.device_messages.bisect_right(from_token)
j = self.device_messages.bisect_right(to_token) + 1
device_messages = {v: k for k, v in self.device_messages.items()[i:j]}

for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(destination=destination)))

# Sort rows based on pos
rows.sort()

Expand Down Expand Up @@ -504,7 +486,6 @@ def add_to_buffer(self, buff):
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"device_destinations", # set of destinations
),
)

Expand All @@ -523,11 +504,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.

buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
device_destinations=set(),
presence=[], presence_destinations=[], keyed_edus={}, edus={},
)

# Parse the rows in the stream and add to the buffer
Expand Down Expand Up @@ -555,6 +532,3 @@ def process_rows_for_federation(transaction_queue, rows):
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(edu, None)

for destination in buff.device_destinations:
transaction_queue.send_device_messages(destination)

0 comments on commit 70e37d5

Please sign in to comment.