Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add type hints to for the federation sender. #9681

Merged
merged 8 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@

import logging
from collections import namedtuple
from typing import TYPE_CHECKING, Dict, List, Optional, Sized, Tuple, Type
from typing import (
TYPE_CHECKING,
Dict,
Hashable,
Iterable,
List,
Optional,
Sized,
Tuple,
Type,
)

from sortedcontainers import SortedDict

from twisted.internet import defer

from synapse.api.presence import UserPresenceState
from synapse.federation.sender import FederationSender
from synapse.federation.sender import AbstractFederationSender, FederationSender
from synapse.metrics import LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt
Expand All @@ -52,7 +60,7 @@
logger = logging.getLogger(__name__)


class FederationRemoteSendQueue:
class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""

def __init__(self, hs: "HomeServer"):
Expand All @@ -77,7 +85,7 @@ def __init__(self, hs: "HomeServer"):
# Stream position -> (user_id, destinations)
self.presence_destinations = (
SortedDict()
) # type: SortedDict[int, Tuple[str, List[str]]]
) # type: SortedDict[int, Tuple[str, Iterable[str]]]

# (destination, key) -> EDU
self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
Expand Down Expand Up @@ -204,7 +212,7 @@ def build_and_send_edu(
destination: str,
edu_type: str,
content: JsonDict,
key: Optional[tuple] = None,
key: Optional[Hashable] = None,
) -> None:
"""As per FederationSender"""
if destination == self.server_name:
Expand All @@ -229,14 +237,13 @@ def build_and_send_edu(

self.notifier.on_new_replication_data()

def send_read_receipt(self, receipt: ReadReceipt) -> defer.Deferred:
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""As per FederationSender

Args:
receipt:
"""
# nothing to do here: the replication listener will handle it.
return defer.succeed(None)

def send_presence(self, states: List[UserPresenceState]) -> None:
"""As per FederationSender
Expand All @@ -256,7 +263,7 @@ def send_presence(self, states: List[UserPresenceState]) -> None:
self.notifier.on_new_replication_data()

def send_presence_to_destinations(
self, states: List[UserPresenceState], destinations: List[str]
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""As per FederationSender

Expand All @@ -276,6 +283,9 @@ def send_device_messages(self, destination: str) -> None:
# We don't need to replicate this as it gets sent down a different
# stream.

def wake_destination(self, server: str) -> None:
pass

def get_current_token(self) -> int:
return self.pos - 1

Expand Down
106 changes: 93 additions & 13 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import logging
from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple

from prometheus_client import Counter

from twisted.internet import defer

import synapse
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
Expand All @@ -40,9 +40,12 @@
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt, RoomStreamToken
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)

sent_pdus_destination_dist_count = Counter(
Expand All @@ -65,8 +68,85 @@
CATCH_UP_STARTUP_INTERVAL_SEC = 5


class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
class AbstractFederationSender(metaclass=abc.ABCMeta):
@abc.abstractmethod
def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""

@abc.abstractmethod
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room

Args:
receipt: receipt to be sent
"""

@abc.abstractmethod
def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.

This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""

@abc.abstractmethod
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.

Args:
destinations:
"""

@abc.abstractmethod
def build_and_send_edu(
self,
destination: str,
edu_type: str,
content: JsonDict,
key: Optional[Hashable] = None,
) -> None:
"""Construct an Edu object, and queue it for sending

Args:
destination: name of server to send to
edu_type: type of EDU to send
content: content of EDU
key: clobbering key for this edu
"""

@abc.abstractmethod
def send_device_messages(self, destination: str) -> None:
pass

@abc.abstractmethod
def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.

This is mainly useful if the remote server has been down and we think it
might have come back.
"""

@abc.abstractmethod
def get_current_token(self) -> int:
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return 0

@abc.abstractmethod
async def get_replication_rows(
self, instance_name: str, from_token: int, to_token: int, target_row_count: int
) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return [], 0, False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I tend to do raise NotImplementedError so that I don't have to make up return values

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I had changed it to pass in a future commit, but raising NotImplementedError makes more sense. 👍



class FederationSender(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.server_name = hs.hostname

Expand Down Expand Up @@ -432,7 +512,7 @@ def _flush_rrs_for_room(self, room_id: str) -> None:
queue.flush_read_receipts_for_room(room_id)

@preserve_fn # the caller should not yield on this
async def send_presence(self, states: List[UserPresenceState]):
async def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.

This actually queues up the presence states ready for sending and
Expand Down Expand Up @@ -494,7 +574,7 @@ def send_presence_to_destinations(
self._get_per_destination_queue(destination).send_presence(states)

@measure_func("txnqueue._process_presence")
async def _process_presence_inner(self, states: List[UserPresenceState]):
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
Expand All @@ -516,9 +596,9 @@ def build_and_send_edu(
self,
destination: str,
edu_type: str,
content: dict,
content: JsonDict,
key: Optional[Hashable] = None,
):
) -> None:
"""Construct an Edu object, and queue it for sending

Args:
Expand All @@ -545,7 +625,7 @@ def build_and_send_edu(

self.send_edu(edu, key)

def send_edu(self, edu: Edu, key: Optional[Hashable]):
def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
"""Queue an EDU for sending

Args:
Expand All @@ -563,7 +643,7 @@ def send_edu(self, edu: Edu, key: Optional[Hashable]):
else:
queue.send_edu(edu)

def send_device_messages(self, destination: str):
def send_device_messages(self, destination: str) -> None:
if destination == self.server_name:
logger.warning("Not sending device update to ourselves")
return
Expand All @@ -575,7 +655,7 @@ def send_device_messages(self, destination: str):

self._get_per_destination_queue(destination).attempt_new_transaction()

def wake_destination(self, destination: str):
def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.

This is mainly useful if the remote server has been down and we think it
Expand Down Expand Up @@ -607,7 +687,7 @@ async def get_replication_rows(
# to a worker.
return [], 0, False

async def _wake_destinations_needing_catchup(self):
async def _wake_destinations_needing_catchup(self) -> None:
"""
Wakes up destinations that need catch-up and are not currently being
backed off from.
Expand Down
6 changes: 3 additions & 3 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,16 +312,16 @@ class FederationAckCommand(Command):

NAME = "FEDERATION_ACK"

def __init__(self, instance_name, token):
def __init__(self, instance_name: str, token: int):
self.instance_name = instance_name
self.token = token

@classmethod
def from_line(cls, line):
def from_line(cls, line: str) -> "FederationAckCommand":
instance_name, token = line.split(" ")
return cls(instance_name, int(token))

def to_line(self):
def to_line(self) -> str:
return "%s %s" % (self.instance_name, self.token)


Expand Down