Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support federation in the new spaces summary API (MSC2946) #10569

Merged
merged 12 commits into from
Aug 16, 2021
1 change: 1 addition & 0 deletions changelog.d/10569.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946).
68 changes: 68 additions & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,74 @@ async def send_request(destination: str) -> FederationSpaceSummaryResult:
failover_on_unknown_endpoint=True,
)

async def get_room_hierarchy(
self,
destinations: Iterable[str],
room_id: str,
suggested_only: bool,
) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
"""
Call other servers to get a hierarchy of the given room.

Performs simple data validates and parsing of the response.

Args:
destinations: The remote servers. We will try them in turn, omitting any
that have been blacklisted.
room_id: ID of the space to be queried
suggested_only: If true, ask the remote server to only return children
with the "suggested" flag set

Returns:
A tuple of:
The room as a JSON dictionary.
A list of children rooms, as JSON dictionaries.
A list of inaccessible children room IDs.

Raises:
SynapseError if we were unable to get a valid summary from any of the
remote servers
"""

async def send_request(
destination: str,
) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
res = await self.transport_layer.get_room_hierarchy(
destination=destination,
room_id=room_id,
suggested_only=suggested_only,
)

room = res.get("room")
if not isinstance(room, dict):
raise InvalidResponseError("'room' must be a dict")

# TODO Validate children_state of the room.
clokep marked this conversation as resolved.
Show resolved Hide resolved

children = res.get("children", [])
if not isinstance(children, Sequence):
raise InvalidResponseError("'children' must be a list")
if any(not isinstance(r, dict) for r in children):
raise InvalidResponseError("Invalid room in 'children' list")

inaccessible_children = res.get("inaccessible_children", [])
if not isinstance(inaccessible_children, Sequence):
raise InvalidResponseError("'inaccessible_children' must be a list")
if any(not isinstance(r, str) for r in inaccessible_children):
raise InvalidResponseError(
"Invalid room ID in 'inaccessible_children' list"
)

return room, children, inaccessible_children

# TODO Fallback to the old federation API and translate the results.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a follow-up.

return await self._try_destination_list(
"fetch room hierarchy",
destinations,
send_request,
failover_on_unknown_endpoint=True,
)


@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryEventResult:
Expand Down
22 changes: 22 additions & 0 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,28 @@ async def get_space_summary(
destination=destination, path=path, data=params
)

async def get_room_hierarchy(
self,
destination: str,
room_id: str,
suggested_only: bool,
) -> JsonDict:
"""
Args:
destination: The remote server
room_id: The room ID to ask about.
suggested_only: if True, only suggested rooms will be returned
"""
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/hierarchy/%s", room_id
)

return await self.client.get_json(
destination=destination,
path=path,
args={"suggested_only": "true" if suggested_only else "false"},
)


def _create_path(federation_prefix: str, path: str, *args: str) -> str:
"""
Expand Down
28 changes: 28 additions & 0 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1936,6 +1936,33 @@ async def on_POST(
)


class FederationRoomHierarchyServlet(BaseFederationServlet):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single python files should not contain 2158 lines and 59 class definitions. But that is a problem to solve another day.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #10590

PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
PATH = "/hierarchy/(?P<room_id>[^/]*)"

def __init__(
self,
hs: HomeServer,
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
server_name: str,
):
super().__init__(hs, authenticator, ratelimiter, server_name)
self.handler = hs.get_space_summary_handler()

async def on_GET(
self,
origin: str,
content: Literal[None],
query: Mapping[bytes, Sequence[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
suggested_only = parse_boolean_from_args(query, "suggested_only", default=False)
return 200, await self.handler.get_federation_hierarchy(
origin, room_id, suggested_only
)


class RoomComplexityServlet(BaseFederationServlet):
"""
Indicates to other servers how complex (and therefore likely
Expand Down Expand Up @@ -1999,6 +2026,7 @@ async def on_GET(
FederationVersionServlet,
RoomComplexityServlet,
FederationSpaceSummaryServlet,
FederationRoomHierarchyServlet,
FederationV1SendKnockServlet,
FederationMakeKnockServlet,
)
Expand Down
189 changes: 168 additions & 21 deletions synapse/handlers/space_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,17 @@ async def _get_room_hierarchy(

logger.debug("Processing room %s", room_id)

# A map of summaries for children rooms that might be returned over
# federation. The rationale for caching these and *maybe* using them
# is to prefer any information local to the homeserver before trusting
# data received over federation.
children_room_entries: Dict[str, JsonDict] = {}
# A set of room IDs which are children that did not have information
# returned over federation and are known to be inaccessible to the
# current server. We should not reach out over federation to try to
# summarise these rooms.
inaccessible_children: Set[str] = set()

is_in_room = await self._store.is_host_joined(room_id, self._server_name)
if is_in_room:
room_entry = await self._summarize_local_room(
Expand All @@ -359,27 +370,50 @@ async def _get_room_hierarchy(
# TODO Handle max children.
max_children=None,
)
else:
# If a previous call got information for this room *and* it is
# not a space (or the max-depth has been achieved), include it.
if queue_entry.remote_room and (
queue_entry.remote_room.get("room_type") != RoomTypes.SPACE
clokep marked this conversation as resolved.
Show resolved Hide resolved
or (max_depth is not None and current_depth >= max_depth)
):
room_entry = _RoomEntry(
queue_entry.room_id, queue_entry.remote_room
)
else:
(
room_entry,
children_room_entries,
inaccessible_children,
) = await self._summarize_remote_room_hiearchy(
queue_entry,
suggested_only,
)

if room_entry:
rooms_result.append(room_entry.as_json())

# Add the child to the queue. We have already validated
# that the vias are a list of server names.
#
# If the current depth is the maximum depth, do not queue
# more entries.
if max_depth is None or current_depth < max_depth:
room_queue.extendleft(
_RoomQueueEntry(
ev["state_key"], ev["content"]["via"], current_depth + 1
)
for ev in reversed(room_entry.children)
)
processed_rooms.add(room_id)

processed_rooms.add(room_id)
else:
# TODO Federation.
pass
if room_entry:
# Remote rooms need their accessibility checked separately.
if not is_in_room and not await self._is_remote_room_accessible(
requester, queue_entry.room_id, room_entry.room
):
continue

rooms_result.append(room_entry.as_json())

# If this room is not at the max-depth, we might want to include
# children.
if max_depth is None or current_depth < max_depth:
room_queue.extendleft(
_RoomQueueEntry(
ev["state_key"],
ev["content"]["via"],
current_depth + 1,
children_room_entries.get(ev["state_key"]),
)
for ev in reversed(room_entry.children)
if ev["state_key"] not in inaccessible_children
)

result: JsonDict = {"rooms": rooms_result}

Expand Down Expand Up @@ -459,6 +493,64 @@ async def federation_space_summary(

return {"rooms": rooms_result, "events": events_result}

async def get_federation_hierarchy(
self,
origin: str,
requested_room_id: str,
suggested_only: bool,
):
"""
Implementation of the room hierarchy Federation API.

This is similar to get_room_hierarchy, but does not recurse into the space.
It also considers whether anyone on the server may be able to access the
room, as opposed to whether a specific user can.

Args:
origin: The server requesting the spaces summary.
requested_room_id: The room ID to start the hierarchy at (the "root" room).
suggested_only: whether we should only return children with the "suggested"
flag set.

Returns:
The JSON hierarchy dictionary.
"""
root_room_entry = await self._summarize_local_room(
None, origin, requested_room_id, suggested_only, max_children=None
)
if root_room_entry is None:
# Room is inaccessible to the requesting server.
raise SynapseError(404, "Unknown room: %s" % (requested_room_id,))

children_rooms_result: List[JsonDict] = []
inaccessible_children: List[str] = []

# Iterate through each child and potentially add it, but not it's children,
clokep marked this conversation as resolved.
Show resolved Hide resolved
# to the response.
for child_room in root_room_entry.children:
room_id = child_room.get("state_key")
assert isinstance(room_id, str)
# If the room is unknown, skip it.
if not await self._store.is_host_joined(room_id, self._server_name):
continue

room_entry = await self._summarize_local_room(
None, origin, room_id, suggested_only, max_children=0
)
# If the room is accessible, include it in the results. Otherwise,
# note that the requesting server shouldn't bother trying to
# summarize it.
if room_entry:
children_rooms_result.append(room_entry.room)
else:
inaccessible_children.append(room_id)

return {
"room": root_room_entry.as_json(),
clokep marked this conversation as resolved.
Show resolved Hide resolved
"children": children_rooms_result,
"inaccessible_children": inaccessible_children,
}

async def _summarize_local_room(
self,
requester: Optional[str],
Expand Down Expand Up @@ -492,8 +584,9 @@ async def _summarize_local_room(

room_entry = await self._build_room_entry(room_id, for_federation=bool(origin))

# If the room is not a space, return just the room information.
if room_entry.get("room_type") != RoomTypes.SPACE:
# If the room is not a space or the children don't matter, return just
# the room information.
if room_entry.get("room_type") != RoomTypes.SPACE or max_children == 0:
return _RoomEntry(room_id, room_entry)

# Otherwise, look for child rooms/spaces.
Expand Down Expand Up @@ -589,6 +682,59 @@ async def _summarize_remote_room(

return results

async def _summarize_remote_room_hiearchy(
self, room: "_RoomQueueEntry", suggested_only: bool
) -> Tuple[Optional["_RoomEntry"], Dict[str, JsonDict], Set[str]]:
"""
Request room entries and a list of event entries for a given room by querying a remote server.

Args:
room: The room to summarize.
suggested_only: True if only suggested children should be returned.
Otherwise, all children are returned.

Returns:
A tuple of:
The room entry.
Partial room data return over federation.
A set of inaccessible children room IDs.
"""
room_id = room.room_id
logger.info("Requesting summary for %s via %s", room_id, room.via)

via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE)
try:
(
room_response,
children,
inaccessible_children,
) = await self._federation_client.get_room_hierarchy(
via,
room_id,
suggested_only=suggested_only,
)
except Exception as e:
logger.warning(
"Unable to get hierarchy of %s via federation: %s",
room_id,
e,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
return None, {}, set()

# Map the children to their room ID.
children_by_room_id = {
c["room_id"]: c
for c in children
if "room_id" in c and isinstance(c["room_id"], str)
}

return (
_RoomEntry(room_id, room_response, room_response.pop("children_state", ())),
children_by_room_id,
set(inaccessible_children),
)

async def _is_local_room_accessible(
self, room_id: str, requester: Optional[str], origin: Optional[str]
) -> bool:
Expand Down Expand Up @@ -842,6 +988,7 @@ class _RoomQueueEntry:
room_id: str
via: Sequence[str]
depth: int = 0
remote_room: Optional[JsonDict] = None


@attr.s(frozen=True, slots=True, auto_attribs=True)
Expand Down
Loading