Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Avoid rebuilding Edu objects in worker mode #4770

Merged
merged 1 commit into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4770.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise EDU transmission for the federation_sender worker.
14 changes: 7 additions & 7 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ def notify_new_events(self, current_id):
# stream.
pass

def send_edu(self, destination, edu_type, content, key=None):
def build_and_send_edu(self, destination, edu_type, content, key=None):
"""As per TransactionQueue"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return

pos = self._next_pos()

edu = Edu(
Expand Down Expand Up @@ -465,15 +469,11 @@ def process_rows_for_federation(transaction_queue, rows):

for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=key,
)
transaction_queue.send_edu(edu, key)

for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=None,
)
transaction_queue.send_edu(edu, None)

for destination in buff.device_destinations:
transaction_queue.send_device_messages(destination)
31 changes: 24 additions & 7 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,26 +361,43 @@ def _process_presence_inner(self, states):

self._attempt_new_transaction(destination)

def send_edu(self, destination, edu_type, content, key=None):
def build_and_send_edu(self, destination, edu_type, content, key=None):
"""Construct an Edu object, and queue it for sending

Args:
destination (str): name of server to send to
edu_type (str): type of EDU to send
content (dict): content of EDU
key (Any|None): clobbering key for this edu
"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return

edu = Edu(
origin=self.server_name,
destination=destination,
edu_type=edu_type,
content=content,
)

if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
self.send_edu(edu, key)

def send_edu(self, edu, key):
"""Queue an EDU for sending

Args:
edu (Edu): edu to send
key (Any|None): clobbering key for this edu
"""
if key:
self.pending_edus_keyed_by_dest.setdefault(
destination, {}
edu.destination, {}
)[(edu.edu_type, key)] = edu
else:
self.pending_edus_by_dest.setdefault(destination, []).append(edu)
self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu)

self._attempt_new_transaction(destination)
self._attempt_new_transaction(edu.destination)

def send_device_messages(self, destination):
if destination == self.server_name:
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ def send_presence_invite(self, observer_user, observed_user):
if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
yield self.federation.send_edu(
yield self.federation.build_and_send_edu(
destination=observed_user.domain,
edu_type="m.presence_invite",
content={
Expand All @@ -836,7 +836,7 @@ def invite_presence(self, observed_user, observer_user):
if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
self.federation.send_edu(
self.federation.build_and_send_edu(
destination=observer_user.domain,
edu_type="m.presence_accept",
content={
Expand All @@ -848,7 +848,7 @@ def invite_presence(self, observed_user, observer_user):
state_dict = yield self.get_state(observed_user, as_event=False)
state_dict = format_user_presence_state(state_dict, self.clock.time_msec())

self.federation.send_edu(
self.federation.build_and_send_edu(
destination=observer_user.domain,
edu_type="m.presence",
content={
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _push_remotes(self, receipt):
logger.debug("Sending receipt to: %r", remotedomains)

for domain in remotedomains:
self.federation.send_edu(
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content={
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def _push_remote(self, member, typing):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
self.federation.send_edu(
self.federation.build_and_send_edu(
destination=domain,
edu_type="m.typing",
content={
Expand Down