Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Spaces summary: call out to other servers #9653

Merged
merged 4 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9653.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add initial experimental support for a "space summary" API.
180 changes: 170 additions & 10 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
List,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)

import attr
from prometheus_client import Counter

from twisted.internet import defer
Expand Down Expand Up @@ -455,6 +457,7 @@ async def _try_destination_list(
description: str,
destinations: Iterable[str],
callback: Callable[[str], Awaitable[T]],
failover_on_unknown_endpoint: bool = False,
) -> T:
"""Try an operation on a series of servers, until it succeeds

Expand All @@ -474,6 +477,10 @@ async def _try_destination_list(
next server tried. Normally the stacktrace is logged but this is
suppressed if the exception is an InvalidResponseError.

failover_on_unknown_endpoint: if True, we will try other servers if it looks
like a server doesn't support the endpoint. This is typically useful
if the endpoint in question is new or experimental.

Returns:
The result of callback, if it succeeds

Expand All @@ -493,16 +500,31 @@ async def _try_destination_list(
except UnsupportedRoomVersionError:
raise
except HttpResponseException as e:
if not 500 <= e.code < 600:
raise e.to_synapse_error()
else:
logger.warning(
"Failed to %s via %s: %i %s",
description,
destination,
e.code,
e.args[0],
)
synapse_error = e.to_synapse_error()
failover = False

if 500 <= e.code < 600:
failover = True

elif failover_on_unknown_endpoint:
# there is no good way to detect an "unknown" endpoint. Dendrite
# returns a 404 (with no body); synapse returns a 400
# with M_UNRECOGNISED.
if e.code == 404 or (
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
):
failover = True

if not failover:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be if failover? I'm having some trouble following this logic. Previously the 500 error would raise a SynapseError, but now it won't (since failover is True in that case).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the 500 error would raise a SynapseError

I don't think that's right. Previously anything except a 500 error would raise a SynapseError.

I agree the logic here isn't super easy to follow. I'm very open to suggestions for how to reorganise it to make it clearer.

I was actually contemplating this logic in the shower this morning... Really, there ought to be a defined list of error codes that a well-behaved server can return for each endpoint - and anything else constitutes a misbehaving server whose response should be discounted. The trouble is that implementing that means getting into the weeds with refactoring the other code that uses _try_destination_list.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah I had missed the not in front of the first clause. 😢

I think the newer version is a bit clearer with the failover variable. I was just failing to see the equivalence to the old one.

I don't have any suggestions of how to make it better now. 👍

raise synapse_error from e

logger.warning(
"Failed to %s via %s: %i %s",
description,
destination,
e.code,
e.args[0],
)
except Exception:
logger.warning(
"Failed to %s via %s", description, destination, exc_info=True
Expand Down Expand Up @@ -1042,3 +1064,141 @@ async def get_room_complexity(
# If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us.
return None

async def get_space_summary(
self,
destinations: Iterable[str],
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> "FederationSpaceSummaryResult":
"""
Call other servers to get a summary of the given space


Args:
destinations: The remote servers. We will try them in turn, omitting any
that have been blacklisted.

room_id: ID of the space to be queried

suggested_only: If true, ask the remote server to only return children
with the "suggested" flag set

max_rooms_per_space: A limit on the number of children to return for each
space

exclude_rooms: A list of room IDs to tell the remote server to skip

Returns:
a parsed FederationSpaceSummaryResult

Raises:
SynapseError if we were unable to get a valid summary from any of the
remote servers
"""

async def send_request(destination: str) -> FederationSpaceSummaryResult:
res = await self.transport_layer.get_space_summary(
destination=destination,
room_id=room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_rooms_per_space,
exclude_rooms=exclude_rooms,
)

try:
return FederationSpaceSummaryResult.from_json_dict(res)
except ValueError as e:
raise InvalidResponseError(str(e))

return await self._try_destination_list(
"fetch space summary",
destinations,
send_request,
failover_on_unknown_endpoint=True,
)


@attr.s(frozen=True, slots=True)
class FederationSpaceSummaryEventResult:
"""Represents a single event in the result of a successful get_space_summary call.

It's essentially just a serialised event object, but we do a bit of parsing and
validation in `from_json_dict` and store some of the validated properties in
object attributes.
"""

event_type = attr.ib(type=str)
state_key = attr.ib(type=str)
via = attr.ib(type=Sequence[str])

# the raw data, including the above keys
data = attr.ib(type=JsonDict)

@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
"""Parse an event within the result of a /spaces/ request

Args:
d: json object to be parsed

Raises:
ValueError if d is not a valid event
"""

event_type = d.get("type")
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")

state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")

content = d.get("content")
if not isinstance(content, dict):
raise ValueError("Invalid event: 'content' must be a dict")

via = content.get("via")
if not isinstance(via, Sequence):
raise ValueError("Invalid event: 'via' must be a list")
if any(not isinstance(v, str) for v in via):
raise ValueError("Invalid event: 'via' must be a list of strings")

return cls(event_type, state_key, via, d)


@attr.s(frozen=True, slots=True)
class FederationSpaceSummaryResult:
"""Represents the data returned by a successful get_space_summary call."""

rooms = attr.ib(type=Sequence[JsonDict])
events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult])

@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
"""Parse the result of a /spaces/ request

Args:
d: json object to be parsed

Raises:
ValueError if d is not a valid /spaces/ response
"""
rooms = d.get("rooms")
if not isinstance(rooms, Sequence):
raise ValueError("'rooms' must be a list")
if any(not isinstance(r, dict) for r in rooms):
raise ValueError("Invalid room in 'rooms' list")

events = d.get("events")
if not isinstance(events, Sequence):
raise ValueError("'events' must be a list")
if any(not isinstance(e, dict) for e in events):
raise ValueError("Invalid event in 'events' list")
parsed_events = [
FederationSpaceSummaryEventResult.from_json_dict(e) for e in events
]

return cls(rooms, parsed_events)
35 changes: 34 additions & 1 deletion synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import logging
import urllib
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from synapse.api.constants import Membership
from synapse.api.errors import Codes, HttpResponseException, SynapseError
Expand All @@ -26,6 +26,7 @@
FEDERATION_V2_PREFIX,
)
from synapse.logging.utils import log_function
from synapse.types import JsonDict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -978,6 +979,38 @@ def get_room_complexity(self, destination, room_id):

return self.client.get_json(destination=destination, path=path)

async def get_space_summary(
self,
destination: str,
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> JsonDict:
"""
Args:
destination: The remote server
room_id: The room ID to ask about.
suggested_only: if True, only suggested rooms will be returned
max_rooms_per_space: an optional limit to the number of children to be
returned per space
exclude_rooms: a list of any rooms we can skip
"""
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
)

params = {
"suggested_only": suggested_only,
"exclude_rooms": exclude_rooms,
}
if max_rooms_per_space is not None:
params["max_rooms_per_space"] = max_rooms_per_space

return await self.client.post_json(
destination=destination, path=path, data=params
)


def _create_path(federation_prefix, path, *args):
"""
Expand Down
Loading