Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Spaces summary: call out to other servers #9653

Merged
merged 4 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9653.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add initial experimental support for a "space summary" API.
85 changes: 85 additions & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
List,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)

import attr
from prometheus_client import Counter

from twisted.internet import defer
Expand Down Expand Up @@ -1042,3 +1044,86 @@ async def get_room_complexity(
# If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us.
return None

async def get_space_summary(
self,
destinations: Iterable[str],
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> "FederationSpaceSummaryResult":
async def send_request(destination: str) -> FederationSpaceSummaryResult:
res = await self.transport_layer.get_space_summary(
destination=destination,
room_id=room_id,
suggested_only=suggested_only,
max_rooms_per_space=max_rooms_per_space,
exclude_rooms=exclude_rooms,
)

try:
return FederationSpaceSummaryResult.from_json_dict(res)
except ValueError as e:
raise InvalidResponseError(str(e))

return await self._try_destination_list(
"fetch space summary", destinations, send_request
)


@attr.s(frozen=True, slots=True)
class FederationSpaceSummaryEventResult:
event_type = attr.ib(type=str)
state_key = attr.ib(type=str)
via = attr.ib(type=Sequence[str])

# the raw data, including the above keys
data = attr.ib(type=JsonDict)

@classmethod
def from_json_dict(cls, d: JsonDict):
event_type = d.get("type")
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")

state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")

content = d.get("content")
if not isinstance(content, dict):
raise ValueError("Invalid event: 'content' must be a dict")

via = content.get("via")
if not isinstance(via, Sequence):
raise ValueError("Invalid event: 'via' must be a list")
if any(not isinstance(v, str) for v in via):
raise ValueError("Invalid event: 'via' must be a list of strings")

return cls(event_type, state_key, via, d)


@attr.s(frozen=True, slots=True)
class FederationSpaceSummaryResult:
rooms = attr.ib(type=Sequence[JsonDict])
events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult])

@classmethod
def from_json_dict(cls, d: JsonDict):
rooms = d.get("rooms")
if not isinstance(rooms, Sequence):
raise ValueError("'rooms' must be a list")
if any(not isinstance(r, dict) for r in rooms):
raise ValueError("Invalid room in 'rooms' list")

events = d.get("events")
if not isinstance(events, Sequence):
raise ValueError("'events' must be a list")
if any(not isinstance(e, dict) for e in events):
raise ValueError("Invalid event in 'events' list")
parsed_events = [
FederationSpaceSummaryEventResult.from_json_dict(e) for e in events
]

return cls(rooms, parsed_events)
35 changes: 34 additions & 1 deletion synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import logging
import urllib
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from synapse.api.constants import Membership
from synapse.api.errors import Codes, HttpResponseException, SynapseError
Expand All @@ -26,6 +26,7 @@
FEDERATION_V2_PREFIX,
)
from synapse.logging.utils import log_function
from synapse.types import JsonDict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -978,6 +979,38 @@ def get_room_complexity(self, destination, room_id):

return self.client.get_json(destination=destination, path=path)

async def get_space_summary(
self,
destination: str,
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: List[str],
) -> JsonDict:
"""
Args:
destination: The remote server
room_id: The room ID to ask about.
suggested_only: if True, only suggested rooms will be returned
max_rooms_per_space: an optional limit to the number of children to be
returned per space
exclude_rooms: a list of any rooms we can skip
"""
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
)

params = {
"suggested_only": suggested_only,
"exclude_rooms": exclude_rooms,
}
if max_rooms_per_space is not None:
params["max_rooms_per_space"] = max_rooms_per_space

return await self.client.post_json(
destination=destination, path=path, data=params
)


def _create_path(federation_prefix, path, *args):
"""
Expand Down
135 changes: 119 additions & 16 deletions synapse/handlers/space_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import itertools
import logging
from collections import deque
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple, cast

import attr

Expand All @@ -38,6 +38,9 @@
# max number of events to return per room.
MAX_ROOMS_PER_SPACE = 50

# max number of federation servers to hit per room
MAX_SERVERS_PER_SPACE = 3


class SpaceSummaryHandler:
def __init__(self, hs: "HomeServer"):
Expand All @@ -47,6 +50,8 @@ def __init__(self, hs: "HomeServer"):
self._state_handler = hs.get_state_handler()
self._store = hs.get_datastore()
self._event_serializer = hs.get_event_client_serializer()
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()

async def get_space_summary(
self,
Expand Down Expand Up @@ -78,35 +83,81 @@ async def get_space_summary(
await self._auth.check_user_in_room_or_world_readable(room_id, requester)

# the queue of rooms to process
room_queue = deque((_RoomQueueEntry(room_id),))
room_queue = deque((_RoomQueueEntry(room_id, ()),))

# rooms we have already processed
processed_rooms = set() # type: Set[str]

# events we have already processed. We don't necessarily have their event ids,
# so instead we key on (room id, state key)
processed_events = set() # type: Set[Tuple[str, str]]

rooms_result = [] # type: List[JsonDict]
events_result = [] # type: List[JsonDict]

while room_queue and len(rooms_result) < MAX_ROOMS:
queue_entry = room_queue.popleft()
room_id = queue_entry.room_id
if room_id in processed_rooms:
# already done this room
continue

logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)

is_in_room = await self._store.is_host_joined(room_id, self._server_name)

# The client-specified max_rooms_per_space limit doesn't apply to the
# room_id specified in the request, so we ignore it if this is the
# first room we are processing.
max_children = max_rooms_per_space if processed_rooms else None

rooms, events = await self._summarize_local_room(
requester, room_id, suggested_only, max_children
if is_in_room:
rooms, events = await self._summarize_local_room(
requester, room_id, suggested_only, max_children
)
else:
rooms, events = await self._summarize_remote_room(
queue_entry,
suggested_only,
max_children,
exclude_rooms=processed_rooms,
)

logger.debug(
"Query of %s returned rooms %s, events %s",
queue_entry.room_id,
[room.get("room_id") for room in rooms],
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)

rooms_result.extend(rooms)
events_result.extend(events)

# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(_RoomQueueEntry(edge_event["state_key"]))
# any rooms returned don't need visiting again
processed_rooms.update(cast(str, room.get("room_id")) for room in rooms)

# the room we queried may or may not have been returned, but don't process
# it again, anyway.
processed_rooms.add(room_id)

# XXX: is it ok that we blindly iterate through any events returned by
# a remote server, whether or not they actually link to any rooms in our
# tree?
Comment on lines +142 to +144
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure of the implications of this, we're not persisting any of this as state, so joining the room should get the proper data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it should.

The concern here is that a malicious or buggy remote server could return a load of events (and rooms, for that matter) which are nothing to do with the requested room. That means that we'll end up spending resources looking for rooms we don't actually care about, and will return that data to the client (which might either ignore it, or (incorrectly) add it to the UI). Still, there shouldn't be any lasting effect here.

for ev in events:
# remote servers might return events we have already processed
# (eg, Dendrite returns inward pointers as well as outward ones), so
# we need to filter them out, to avoid returning duplicate links to the
# client.
ev_key = (ev["room_id"], ev["state_key"])
if ev_key in processed_events:
continue
events_result.append(ev)

# add the child to the queue. we have already validated
# that the vias are a list of server names.
room_queue.append(
_RoomQueueEntry(ev["state_key"], ev["content"]["via"])
)
processed_events.add(ev_key)

return {"rooms": rooms_result, "events": events_result}

Expand Down Expand Up @@ -149,20 +200,23 @@ async def federation_space_summary(

while room_queue and len(rooms_result) < MAX_ROOMS:
room_id = room_queue.popleft()
if room_id in processed_rooms:
# already done this room
continue
clokep marked this conversation as resolved.
Show resolved Hide resolved

logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)

rooms, events = await self._summarize_local_room(
None, room_id, suggested_only, max_rooms_per_space
)

processed_rooms.add(room_id)

rooms_result.extend(rooms)
events_result.extend(events)

# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(edge_event["state_key"])
# add any children to the queue
room_queue.extend(edge_event["state_key"] for edge_event in events)

return {"rooms": rooms_result, "events": events_result}

Expand Down Expand Up @@ -200,6 +254,43 @@ async def _summarize_local_room(
)
return (room_entry,), events_result

async def _summarize_remote_room(
self,
room: "_RoomQueueEntry",
suggested_only: bool,
max_children: Optional[int],
exclude_rooms: Iterable[str],
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
room_id = room.room_id
logger.info("Requesting summary for %s via %s", room_id, room.via)

# we need to make the exclusion list json-serialisable
exclude_rooms = list(exclude_rooms)
clokep marked this conversation as resolved.
Show resolved Hide resolved

via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE)
try:
res = await self._federation_client.get_space_summary(
via,
room.room_id,
clokep marked this conversation as resolved.
Show resolved Hide resolved
suggested_only=suggested_only,
max_rooms_per_space=max_children,
exclude_rooms=exclude_rooms,
)
except Exception as e:
logger.warning(
"Unable to get summary of %s via federation: %s",
room_id,
e,
exc_info=logger.isEnabledFor(logging.DEBUG),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only send this if the debug logging is enabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically because in 90% of cases, it's going to be overly verbose: it'll be due to servers not supporting the endpoint, or having left the room, or being down, or whatever - none of which merit a stacktrace. On the other hand, if it's going wrong for reasons we don't understand, it'll be useful to be able to see the stacktrace.

)
return (), ()

return res.rooms, tuple(
ev.data
for ev in res.events
if ev.event_type == EventTypes.MSC1772_SPACE_CHILD
)

async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
# if we have an authenticated requesting user, first check if they are in the
# room
Expand Down Expand Up @@ -276,12 +367,24 @@ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
)

# filter out any events without a "via" (which implies it has been redacted)
return (e for e in events if e.content.get("via"))
return (e for e in events if _has_valid_via(e))


@attr.s(frozen=True, slots=True)
class _RoomQueueEntry:
room_id = attr.ib(type=str)
via = attr.ib(type=Sequence[str])


def _has_valid_via(e: EventBase) -> bool:
via = e.content.get("via")
if not via or not isinstance(via, Sequence):
return False
for v in via:
if not isinstance(v, str):
logger.debug("Ignoring edge event %s with invalid via entry", e.event_id)
return False
return True


def _is_suggested_child_event(edge_event: EventBase) -> bool:
Expand Down