From ebd0fe44e7ab309fe9783e481ef693a57bbe2b1a Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 29 Aug 2023 08:47:50 +0200 Subject: [PATCH 01/10] Add automatic purge after all users forget a room --- changelog.d/15488.feature | 1 + synapse/config/server.py | 9 + synapse/handlers/pagination.py | 454 ++++++++----------------- synapse/handlers/room.py | 167 +++++---- synapse/handlers/room_member.py | 20 +- synapse/rest/admin/__init__.py | 21 +- synapse/rest/admin/rooms.py | 78 +++-- tests/rest/admin/test_room.py | 159 ++++++++- tests/rest/admin/test_server_notice.py | 20 +- tests/rest/client/test_rooms.py | 6 +- 10 files changed, 501 insertions(+), 434 deletions(-) create mode 100644 changelog.d/15488.feature diff --git a/changelog.d/15488.feature b/changelog.d/15488.feature new file mode 100644 index 000000000000..8684d84192a9 --- /dev/null +++ b/changelog.d/15488.feature @@ -0,0 +1 @@ +Add automatic purge after all users forgotten a room. Also add restore of purge/shutdown rooms after a synapse restart. diff --git a/synapse/config/server.py b/synapse/config/server.py index b46fa5159309..cb93e566782e 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -486,6 +486,15 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: else: self.redaction_retention_period = None + # How long to keep locally forgotten rooms before purging them. + purge_retention_period = config.get("purge_retention_period", "7d") + if purge_retention_period is not None: + self.purge_retention_period: Optional[int] = self.parse_duration( + purge_retention_period + ) + else: + self.purge_retention_period = None + # How long to keep entries in the `users_ips` table. user_ips_max_age = config.get("user_ips_max_age", "28d") if user_ips_max_age is not None: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e5ac9096ccf6..fbcd2e680587 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Set - -import attr +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, cast from twisted.python.failure import Failure @@ -23,16 +21,22 @@ from synapse.api.errors import SynapseError from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig -from synapse.handlers.room import ShutdownRoomResponse +from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.admin._base import assert_user_is_admin from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType +from synapse.types import ( + JsonDict, + JsonMapping, + Requester, + ScheduledTask, + StreamKeyType, + TaskStatus, +) from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock -from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -53,80 +57,11 @@ PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock" -@attr.s(slots=True, auto_attribs=True) -class PurgeStatus: - """Object tracking the status of a purge request - - This class contains information on the progress of a purge request, for - return by get_purge_status. - """ - - STATUS_ACTIVE = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - - STATUS_TEXT = { - STATUS_ACTIVE: "active", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - } - - # Save the error message if an error occurs - error: str = "" +PURGE_HISTORY_ACTION_NAME = "purge_history" - # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}. - status: int = STATUS_ACTIVE +PURGE_ROOM_ACTION_NAME = "purge_room" - def asdict(self) -> JsonDict: - ret = {"status": PurgeStatus.STATUS_TEXT[self.status]} - if self.error: - ret["error"] = self.error - return ret - - -@attr.s(slots=True, auto_attribs=True) -class DeleteStatus: - """Object tracking the status of a delete room request - - This class contains information on the progress of a delete room request, for - return by get_delete_status. - """ - - STATUS_PURGING = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - STATUS_SHUTTING_DOWN = 3 - - STATUS_TEXT = { - STATUS_PURGING: "purging", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - STATUS_SHUTTING_DOWN: "shutting_down", - } - - # Tracks whether this request has completed. - # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}. - status: int = STATUS_PURGING - - # Save the error message if an error occurs - error: str = "" - - # Saves the result of an action to give it back to REST API - shutdown_room: ShutdownRoomResponse = { - "kicked_users": [], - "failed_to_kick_users": [], - "local_aliases": [], - "new_room_id": None, - } - - def asdict(self) -> JsonDict: - ret = { - "status": DeleteStatus.STATUS_TEXT[self.status], - "shutdown_room": self.shutdown_room, - } - if self.error: - ret["error"] = self.error - return ret +SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room" class PaginationHandler: @@ -136,9 +71,6 @@ class PaginationHandler: paginating during a purge. """ - # when to remove a completed deletion/purge from the results map - CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours - def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() @@ -150,17 +82,11 @@ def __init__(self, hs: "HomeServer"): self._room_shutdown_handler = hs.get_room_shutdown_handler() self._relations_handler = hs.get_relations_handler() self._worker_locks = hs.get_worker_locks_handler() + self._task_scheduler = hs.get_task_scheduler() self.pagination_lock = ReadWriteLock() # IDs of rooms in which there currently an active purge *or delete* operation. self._purges_in_progress_by_room: Set[str] = set() - # map from purge id to PurgeStatus - self._purges_by_id: Dict[str, PurgeStatus] = {} - # map from purge id to DeleteStatus - self._delete_by_id: Dict[str, DeleteStatus] = {} - # map from room id to delete ids - # Dict[`room_id`, List[`delete_id`]] - self._delete_by_room: Dict[str, List[str]] = {} self._event_serializer = hs.get_event_client_serializer() self._retention_default_max_lifetime = ( @@ -173,6 +99,7 @@ def __init__(self, hs: "HomeServer"): self._retention_allowed_lifetime_max = ( hs.config.retention.retention_allowed_lifetime_max ) + self._purge_retention_period = hs.config.server.purge_retention_period self._is_master = hs.config.worker.worker_app is None if hs.config.retention.retention_enabled and self._is_master: @@ -189,6 +116,14 @@ def __init__(self, hs: "HomeServer"): job.longest_max_lifetime, ) + self._task_scheduler.register_action( + self._purge_history, PURGE_HISTORY_ACTION_NAME + ) + self._task_scheduler.register_action(self._purge_room, PURGE_ROOM_ACTION_NAME) + self._task_scheduler.register_action( + self._shutdown_and_purge_room, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME + ) + async def purge_history_for_rooms_in_range( self, min_ms: Optional[int], max_ms: Optional[int] ) -> None: @@ -239,14 +174,6 @@ async def purge_history_for_rooms_in_range( for room_id, retention_policy in rooms.items(): logger.info("[purge] Attempting to purge messages in room %s", room_id) - if room_id in self._purges_in_progress_by_room: - logger.warning( - "[purge] not purging room %s as there's an ongoing purge running" - " for this room", - room_id, - ) - continue - # If max_lifetime is None, it means that the room has no retention policy. # Given we only retrieve such rooms when there's a default retention policy # defined in the server's configuration, we can safely assume that's the @@ -295,27 +222,20 @@ async def purge_history_for_rooms_in_range( (stream, topo, _event_id) = r token = "t%d-%d" % (topo, stream) - purge_id = random_string(16) - - self._purges_by_id[purge_id] = PurgeStatus() - - logger.info( - "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id) - ) + logger.info("Starting purging events in room %s" % (room_id)) # We want to purge everything, including local events, and to run the purge in # the background so that it's not blocking any other operation apart from # other purges in the same room. run_as_background_process( - "_purge_history", - self._purge_history, - purge_id, + PURGE_HISTORY_ACTION_NAME, + self.purge_history, room_id, token, True, ) - def start_purge_history( + async def start_purge_history( self, room_id: str, token: str, delete_local_events: bool = False ) -> str: """Start off a history purge on a room. @@ -329,40 +249,58 @@ def start_purge_history( Returns: unique ID for this purge transaction. """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, "History purge already in progress for %s" % (room_id,) - ) - - purge_id = random_string(16) + purge_id = await self._task_scheduler.schedule_task( + PURGE_HISTORY_ACTION_NAME, + resource_id=room_id, + params={"token": token, "delete_local_events": delete_local_events}, + ) # we log the purge_id here so that it can be tied back to the # request id in the log lines. logger.info("[purge] starting purge_id %s", purge_id) - self._purges_by_id[purge_id] = PurgeStatus() - run_as_background_process( - "purge_history", - self._purge_history, - purge_id, - room_id, - token, - delete_local_events, - ) return purge_id async def _purge_history( - self, purge_id: str, room_id: str, token: str, delete_local_events: bool - ) -> None: + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """ + Scheduler action to purge some history of a room. + """ + if ( + task.resource_id is None + or task.params is None + or "token" not in task.params + or "delete_local_events" not in task.params + ): + return ( + TaskStatus.FAILED, + None, + "Not enough parameters passed to _purge_history", + ) + err = await self.purge_history( + task.resource_id, + task.params["token"], + task.params["delete_local_events"], + ) + if err is not None: + return TaskStatus.FAILED, None, err + return TaskStatus.COMPLETE, None, None + + async def purge_history( + self, + room_id: str, + token: str, + delete_local_events: bool, + ) -> Optional[str]: """Carry out a history purge on a room. Args: - purge_id: The ID for this purge. room_id: The room to purge from token: topological token to delete events before delete_local_events: True to delete local events as well as remote ones """ - self._purges_in_progress_by_room.add(room_id) try: async with self._worker_locks.acquire_read_write_lock( PURGE_PAGINATION_LOCK_NAME, room_id, write=True @@ -371,57 +309,68 @@ async def _purge_history( room_id, token, delete_local_events ) logger.info("[purge] complete") - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + return None except Exception: f = Failure() logger.error( "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) ) - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED - self._purges_by_id[purge_id].error = f.getErrorMessage() - finally: - self._purges_in_progress_by_room.discard(room_id) - - # remove the purge from the list 24 hours after it completes - def clear_purge() -> None: - del self._purges_by_id[purge_id] - - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge - ) - - def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]: - """Get the current status of an active purge - - Args: - purge_id: purge_id returned by start_purge_history - """ - return self._purges_by_id.get(purge_id) + return f.getErrorMessage() - def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]: + async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]: """Get the current status of an active deleting Args: delete_id: delete_id returned by start_shutdown_and_purge_room + or start_purge_history. """ - return self._delete_by_id.get(delete_id) + return await self._task_scheduler.get_task(delete_id) - def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]: - """Get all active delete ids by room + async def get_delete_tasks_by_room( + self, room_id: str, only_active: Optional[bool] = False + ) -> List[ScheduledTask]: + """Get complete, failed or active delete tasks by room Args: room_id: room_id that is deleted + only_active: if True, completed&failed tasks will be omitted """ - return self._delete_by_room.get(room_id) + statuses = [TaskStatus.ACTIVE] + if not only_active: + statuses += [TaskStatus.COMPLETE, TaskStatus.FAILED] + + return await self._task_scheduler.get_tasks( + actions=[PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME], + resource_id=room_id, + statuses=statuses, + ) - async def purge_room(self, room_id: str, force: bool = False) -> None: + async def _purge_room( + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """ + Scheduler action to purge a room. + """ + if not task.resource_id: + raise Exception("No room id passed to purge_room task") + params = task.params if task.params else {} + await self.purge_room(task.resource_id, params.get("force", False)) + return TaskStatus.COMPLETE, None, None + + async def purge_room( + self, + room_id: str, + force: bool, + ) -> None: """Purge the given room from the database. - This function is part the delete room v1 API. Args: room_id: room to be purged force: set true to skip checking for joined users. """ + logger.info("starting purge room_id=%s force=%s", room_id, force) + async with self._worker_locks.acquire_multi_read_write_lock( [ (PURGE_PAGINATION_LOCK_NAME, room_id), @@ -430,13 +379,20 @@ async def purge_room(self, room_id: str, force: bool = False) -> None: write=True, ): # first check that we have no users in this room - if not force: - joined = await self.store.is_host_joined(room_id, self._server_name) - if joined: + joined = await self.store.is_host_joined(room_id, self._server_name) + if joined: + if force: + logger.info( + "force-purging room %s with some local users still joined", + room_id, + ) + else: raise SynapseError(400, "Users are still joined to this room") await self._storage_controllers.purge_events.purge_room(room_id) + logger.info("purge complete for room_id %s", room_id) + @trace async def get_messages( self, @@ -711,169 +667,72 @@ async def get_messages( async def _shutdown_and_purge_room( self, - delete_id: str, - room_id: str, - requester_user_id: str, - new_room_user_id: Optional[str] = None, - new_room_name: Optional[str] = None, - message: Optional[str] = None, - block: bool = False, - purge: bool = True, - force_purge: bool = False, - ) -> None: + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: """ - Shuts down and purges a room. - - See `RoomShutdownHandler.shutdown_room` for details of creation of the new room - - Args: - delete_id: The ID for this delete. - room_id: The ID of the room to shut down. - requester_user_id: - User who requested the action. Will be recorded as putting the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - also if it fails to remove some users from room. - - Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`: + Scheduler action to shutdown and purge a room. """ + if task.resource_id is None or task.params is None: + raise Exception( + "No room id and/or no parameters passed to shutdown_and_purge_room task" + ) - self._purges_in_progress_by_room.add(room_id) - try: - async with self._worker_locks.acquire_read_write_lock( - PURGE_PAGINATION_LOCK_NAME, room_id, write=True - ): - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN - self._delete_by_id[ - delete_id - ].shutdown_room = await self._room_shutdown_handler.shutdown_room( - room_id=room_id, - requester_user_id=requester_user_id, - new_room_user_id=new_room_user_id, - new_room_name=new_room_name, - message=message, - block=block, - ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING + room_id = task.resource_id - if purge: - logger.info("starting purge room_id %s", room_id) + async def update_result(result: Optional[JsonMapping]) -> None: + await self._task_scheduler.update_task(task.id, result=result) - # first check that we have no users in this room - if not force_purge: - joined = await self.store.is_host_joined( - room_id, self._server_name - ) - if joined: - raise SynapseError( - 400, "Users are still joined to this room" - ) + shutdown_result = ( + cast(ShutdownRoomResponse, task.result) if task.result else None + ) - await self._storage_controllers.purge_events.purge_room(room_id) + shutdown_result = await self._room_shutdown_handler.shutdown_room( + room_id, + cast(ShutdownRoomParams, task.params), + shutdown_result, + update_result, + ) - logger.info("purge complete for room_id %s", room_id) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE - except Exception: - f = Failure() - logger.error( - "failed", - exc_info=(f.type, f.value, f.getTracebackObject()), - ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED - self._delete_by_id[delete_id].error = f.getErrorMessage() - finally: - self._purges_in_progress_by_room.discard(room_id) - - # remove the delete from the list 24 hours after it completes - def clear_delete() -> None: - del self._delete_by_id[delete_id] - self._delete_by_room[room_id].remove(delete_id) - if not self._delete_by_room[room_id]: - del self._delete_by_room[room_id] - - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete + if task.params.get("purge", False): + await self.purge_room( + room_id, + task.params.get("force_purge", False), ) - def start_shutdown_and_purge_room( + return (TaskStatus.COMPLETE, shutdown_result, None) + + async def start_shutdown_and_purge_room( self, room_id: str, - requester_user_id: str, - new_room_user_id: Optional[str] = None, - new_room_name: Optional[str] = None, - message: Optional[str] = None, - block: bool = False, - purge: bool = True, - force_purge: bool = False, + shutdown_params: ShutdownRoomParams, ) -> str: """Start off shut down and purge on a room. Args: room_id: The ID of the room to shut down. - requester_user_id: - User who requested the action and put the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - also if it fails to remove some users from room. + shutdown_params: parameters for the shutdown Returns: unique ID for this delete transaction. """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, "History purge already in progress for %s" % (room_id,) - ) + if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0: + raise SynapseError(400, "Purge already in progress for %s" % (room_id,)) # This check is double to `RoomShutdownHandler.shutdown_room` # But here the requester get a direct response / error with HTTP request # and do not have to check the purge status + new_room_user_id = shutdown_params["new_room_user_id"] if new_room_user_id is not None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( 400, "User must be our own: %s" % (new_room_user_id,) ) - delete_id = random_string(16) + delete_id = await self._task_scheduler.schedule_task( + SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME, + resource_id=room_id, + params=shutdown_params, + ) # we log the delete_id here so that it can be tied back to the # request id in the log lines. @@ -883,19 +742,4 @@ def start_shutdown_and_purge_room( delete_id, ) - self._delete_by_id[delete_id] = DeleteStatus() - self._delete_by_room.setdefault(room_id, []).append(delete_id) - run_as_background_process( - "shutdown_and_purge_room", - self._shutdown_and_purge_room, - delete_id, - room_id, - requester_user_id, - new_room_user_id, - new_room_name, - message, - block, - purge, - force_purge, - ) return delete_id diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0513e28aabe1..ffd24fa546a8 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -20,7 +20,7 @@ import string from collections import OrderedDict from http import HTTPStatus -from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple import attr from typing_extensions import TypedDict @@ -59,6 +59,7 @@ from synapse.streams import EventSource from synapse.types import ( JsonDict, + JsonMapping, MutableStateMap, Requester, RoomAlias, @@ -1750,6 +1751,45 @@ def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]: return self.store.get_current_room_stream_token_for_room_id(room_id) +class ShutdownRoomParams(TypedDict): + """ + Attributes: + requester_user_id: + User who requested the action. Will be recorded as putting the room on the + blocking list. + new_room_user_id: + If set, a new room will be created with this user ID + as the creator and admin, and all users in the old room will be + moved into that room. If not set, no new room will be created + and the users will just be removed from the old room. + new_room_name: + A string representing the name of the room that new users will + be invited to. Defaults to `Content Violation Notification` + message: + A string containing the first message that will be sent as + `new_room_user_id` in the new room. Ideally this will clearly + convey why the original room was shut down. + Defaults to `Sharing illegal content on this server is not + permitted and rooms in violation will be blocked.` + block: + If set to `true`, this room will be added to a blocking list, + preventing future attempts to join the room. Defaults to `false`. + purge: + If set to `true`, purge the given room from the database. + force_purge: + If set to `true`, the room will be purged from database + even if there are still users joined to the room. + """ + + requester_user_id: str + new_room_user_id: Optional[str] + new_room_name: Optional[str] + message: Optional[str] + block: bool + purge: bool + force_purge: bool + + class ShutdownRoomResponse(TypedDict): """ Attributes: @@ -1787,12 +1827,12 @@ def __init__(self, hs: "HomeServer"): async def shutdown_room( self, room_id: str, - requester_user_id: str, - new_room_user_id: Optional[str] = None, - new_room_name: Optional[str] = None, - message: Optional[str] = None, - block: bool = False, - ) -> ShutdownRoomResponse: + params: ShutdownRoomParams, + result: Optional[ShutdownRoomResponse] = None, + update_result_fct: Optional[ + Callable[[Optional[JsonMapping]], Awaitable[None]] + ] = None, + ) -> Optional[ShutdownRoomResponse]: """ Shuts down a room. Moves all local users and room aliases automatically to a new room if `new_room_user_id` is set. Otherwise local users only @@ -1808,48 +1848,22 @@ async def shutdown_room( Args: room_id: The ID of the room to shut down. - requester_user_id: - User who requested the action and put the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `True`, users will be prevented from joining the old - room. This option can also be used to pre-emptively block a room, - even if it's unknown to this homeserver. In this case, the room - will be blocked, and no further action will be taken. If `False`, - attempting to delete an unknown room is invalid. - - Defaults to `False`. - - Returns: a dict containing the following keys: - kicked_users: An array of users (`user_id`) that were kicked. - failed_to_kick_users: - An array of users (`user_id`) that that were not kicked. - local_aliases: - An array of strings representing the local aliases that were - migrated from the old room to the new. - new_room_id: - A string representing the room ID of the new room, or None if - no such room was created. - """ + delete_id: The delete ID identifying this delete request + shutdown_params: parameters for the shutdown, cf `ShutdownRoomParams` + shutdown_response: current status of the shutdown, if it was interrupted - if not new_room_name: - new_room_name = self.DEFAULT_ROOM_NAME - if not message: - message = self.DEFAULT_MESSAGE + Returns: a dict matching `ShutdownRoomResponse`. + """ + requester_user_id = params["requester_user_id"] + new_room_user_id = params["new_room_user_id"] + block = params["block"] + + new_room_name = ( + params["new_room_name"] + if params["new_room_name"] + else self.DEFAULT_ROOM_NAME + ) + message = params["message"] if params["message"] else self.DEFAULT_MESSAGE if not RoomID.is_valid(room_id): raise SynapseError(400, "%s is not a legal room ID" % (room_id,)) @@ -1861,6 +1875,17 @@ async def shutdown_room( 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN ) + result = ( + result + if result + else { + "kicked_users": [], + "failed_to_kick_users": [], + "local_aliases": [], + "new_room_id": None, + } + ) + # Action the block first (even if the room doesn't exist yet) if block: # This will work even if the room is already blocked, but that is @@ -1869,14 +1894,10 @@ async def shutdown_room( if not await self.store.get_room(room_id): # if we don't know about the room, there is nothing left to do. - return { - "kicked_users": [], - "failed_to_kick_users": [], - "local_aliases": [], - "new_room_id": None, - } + return result - if new_room_user_id is not None: + new_room_id = result.get("new_room_id") + if new_room_user_id is not None and new_room_id is None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( 400, "User must be our own: %s" % (new_room_user_id,) @@ -1896,6 +1917,10 @@ async def shutdown_room( ratelimit=False, ) + result["new_room_id"] = new_room_id + if update_result_fct: + await update_result_fct(result) + logger.info( "Shutting down room %r, joining to new room: %r", room_id, new_room_id ) @@ -1909,12 +1934,9 @@ async def shutdown_room( stream_id, ) else: - new_room_id = None logger.info("Shutting down room %r", room_id) users = await self.store.get_users_in_room(room_id) - kicked_users = [] - failed_to_kick_users = [] for user_id in users: if not self.hs.is_mine_id(user_id): continue @@ -1943,7 +1965,9 @@ async def shutdown_room( stream_id, ) - await self.room_member_handler.forget(target_requester.user, room_id) + await self.room_member_handler.forget( + target_requester.user, room_id, do_not_schedule_purge=True + ) # Join users to new room if new_room_user_id: @@ -1958,15 +1982,23 @@ async def shutdown_room( require_consent=False, ) - kicked_users.append(user_id) + result["kicked_users"].append(user_id) + if update_result_fct: + await update_result_fct(result) except Exception: logger.exception( "Failed to leave old room and join new room for %r", user_id ) - failed_to_kick_users.append(user_id) + result["failed_to_kick_users"].append(user_id) + if update_result_fct: + await update_result_fct(result) # Send message in new room and move aliases if new_room_user_id: + room_creator_requester = create_requester( + new_room_user_id, authenticated_entity=requester_user_id + ) + await self.event_creation_handler.create_and_send_nonmember_event( room_creator_requester, { @@ -1978,18 +2010,15 @@ async def shutdown_room( ratelimit=False, ) - aliases_for_room = await self.store.get_aliases_for_room(room_id) + result["local_aliases"] = list( + await self.store.get_aliases_for_room(room_id) + ) assert new_room_id is not None await self.store.update_aliases_for_room( room_id, new_room_id, requester_user_id ) else: - aliases_for_room = [] + result["local_aliases"] = [] - return { - "kicked_users": kicked_users, - "failed_to_kick_users": failed_to_kick_users, - "local_aliases": list(aliases_for_room), - "new_room_id": new_room_id, - } + return result diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 1d8d4a72e7a2..b5ff3badae92 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -37,6 +37,7 @@ from synapse.event_auth import get_named_level, get_power_level_event from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.handlers.pagination import PURGE_ROOM_ACTION_NAME from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME @@ -176,6 +177,8 @@ def __init__(self, hs: "HomeServer"): self.request_ratelimiter = hs.get_request_ratelimiter() hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room) + self._purge_retention_period = hs.config.server.purge_retention_period + def _on_user_joined_room(self, event_id: str, room_id: str) -> None: """Notify the rate limiter that a room join has occurred. @@ -285,7 +288,9 @@ async def _user_left_room(self, target: UserID, room_id: str) -> None: """ raise NotImplementedError() - async def forget(self, user: UserID, room_id: str) -> None: + async def forget( + self, user: UserID, room_id: str, do_not_schedule_purge: bool = False + ) -> None: user_id = user.to_string() member = await self._storage_controllers.state.get_current_state_event( @@ -305,6 +310,19 @@ async def forget(self, user: UserID, room_id: str) -> None: # the table `current_state_events` and `get_current_state_events` is `None`. await self.store.forget(user_id, room_id) + # If everyone locally has left the room, then there is no reason for us to keep the + # room around and we automatically purge room after a little bit + if ( + not do_not_schedule_purge + and self._purge_retention_period + and await self.store.is_locally_forgotten_room(room_id) + ): + await self.hs.get_task_scheduler().schedule_task( + PURGE_ROOM_ACTION_NAME, + resource_id=room_id, + timestamp=self.clock.time_msec() + self._purge_retention_period, + ) + async def ratelimit_multiple_invites( self, requester: Optional[Requester], diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 55e752fda85a..3d2c010819d2 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Optional, Tuple from synapse.api.errors import Codes, NotFoundError, SynapseError +from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseRequest @@ -94,7 +95,7 @@ UserTokenRestServlet, WhoisRestServlet, ) -from synapse.types import JsonDict, RoomStreamToken +from synapse.types import JsonDict, RoomStreamToken, TaskStatus from synapse.util import SYNAPSE_VERSION if TYPE_CHECKING: @@ -197,7 +198,7 @@ async def on_POST( errcode=Codes.BAD_JSON, ) - purge_id = self.pagination_handler.start_purge_history( + purge_id = await self.pagination_handler.start_purge_history( room_id, token, delete_local_events=delete_local_events ) @@ -216,11 +217,21 @@ async def on_GET( ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) - purge_status = self.pagination_handler.get_purge_status(purge_id) - if purge_status is None: + purge_task = await self.pagination_handler.get_delete_task(purge_id) + if purge_task is None or purge_task.action != PURGE_HISTORY_ACTION_NAME: raise NotFoundError("purge id '%s' not found" % purge_id) - return HTTPStatus.OK, purge_status.asdict() + result = { + "status": purge_task.status + if purge_task.status == TaskStatus.COMPLETE + or purge_task.status == TaskStatus.FAILED + else "active", + } + if purge_task.error: + result["error"] = purge_task.error + + # TODO active vs purging etc + return HTTPStatus.OK, result ######################################################################################## diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 1d655602650d..436718c8b227 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -19,6 +19,10 @@ from synapse.api.constants import Direction, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.api.filtering import Filter +from synapse.handlers.pagination import ( + PURGE_ROOM_ACTION_NAME, + SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME, +) from synapse.http.servlet import ( ResolveRoomIdMixin, RestServlet, @@ -36,7 +40,7 @@ ) from synapse.storage.databases.main.room import RoomSortOrder from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, RoomID, UserID, create_requester +from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester from synapse.types.state import StateFilter from synapse.util import json_decoder @@ -117,20 +121,30 @@ async def on_DELETE( 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN ) - delete_id = self._pagination_handler.start_shutdown_and_purge_room( + delete_id = await self._pagination_handler.start_shutdown_and_purge_room( room_id=room_id, - new_room_user_id=content.get("new_room_user_id"), - new_room_name=content.get("room_name"), - message=content.get("message"), - requester_user_id=requester.user.to_string(), - block=block, - purge=purge, - force_purge=force_purge, + shutdown_params={ + "new_room_user_id": content.get("new_room_user_id"), + "new_room_name": content.get("room_name"), + "message": content.get("message"), + "requester_user_id": requester.user.to_string(), + "block": block, + "purge": purge, + "force_purge": force_purge, + }, ) return HTTPStatus.OK, {"delete_id": delete_id} +def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict: + return { + "delete_id": task.id, + "status": task.status, + "shutdown_room": task.result, + } + + class DeleteRoomStatusByRoomIdRestServlet(RestServlet): """Get the status of the delete room background task.""" @@ -150,21 +164,16 @@ async def on_GET( HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,) ) - delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id) - if delete_ids is None: - raise NotFoundError("No delete task for room_id '%s' found" % room_id) + delete_tasks = await self._pagination_handler.get_delete_tasks_by_room(room_id) - response = [] - for delete_id in delete_ids: - delete = self._pagination_handler.get_delete_status(delete_id) - if delete: - response += [ - { - "delete_id": delete_id, - **delete.asdict(), - } - ] - return HTTPStatus.OK, {"results": cast(JsonDict, response)} + if delete_tasks: + return HTTPStatus.OK, { + "results": [ + _convert_delete_task_to_response(task) for task in delete_tasks + ], + } + else: + raise NotFoundError("No delete task for room_id '%s' found" % room_id) class DeleteRoomStatusByDeleteIdRestServlet(RestServlet): @@ -181,11 +190,14 @@ async def on_GET( ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self._auth, request) - delete_status = self._pagination_handler.get_delete_status(delete_id) - if delete_status is None: + delete_task = await self._pagination_handler.get_delete_task(delete_id) + if delete_task is None or ( + delete_task.action != PURGE_ROOM_ACTION_NAME + and delete_task.action != SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME + ): raise NotFoundError("delete id '%s' not found" % delete_id) - return HTTPStatus.OK, cast(JsonDict, delete_status.asdict()) + return HTTPStatus.OK, _convert_delete_task_to_response(delete_task) class ListRoomRestServlet(RestServlet): @@ -349,11 +361,15 @@ async def _delete_room( ret = await room_shutdown_handler.shutdown_room( room_id=room_id, - new_room_user_id=content.get("new_room_user_id"), - new_room_name=content.get("room_name"), - message=content.get("message"), - requester_user_id=requester.user.to_string(), - block=block, + params={ + "new_room_user_id": content.get("new_room_user_id"), + "new_room_name": content.get("room_name"), + "message": content.get("message"), + "requester_user_id": requester.user.to_string(), + "block": block, + "purge": purge, + "force_purge": force_purge, + }, ) # Purge room diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index eb50086c508e..e0b5a0b359ab 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -15,26 +15,34 @@ import time import urllib.parse from typing import List, Optional -from unittest.mock import Mock +from unittest.mock import AsyncMock, Mock from parameterized import parameterized +from twisted.internet.task import deferLater from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.constants import EventTypes, Membership, RoomTypes from synapse.api.errors import Codes -from synapse.handlers.pagination import PaginationHandler, PurgeStatus +from synapse.handlers.pagination import ( + PURGE_ROOM_ACTION_NAME, + SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME, +) from synapse.rest.client import directory, events, login, room from synapse.server import HomeServer +from synapse.types import UserID from synapse.util import Clock -from synapse.util.stringutils import random_string +from synapse.util.task_scheduler import TaskScheduler from tests import unittest """Tests admin REST events for /rooms paths.""" +ONE_HOUR_IN_S = 3600 + + class DeleteRoomTestCase(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, @@ -46,6 +54,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.event_creation_handler = hs.get_event_creation_handler() + self.task_scheduler = hs.get_task_scheduler() hs.config.consent.user_consent_version = "1" consent_uri_builder = Mock() @@ -476,6 +485,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.event_creation_handler = hs.get_event_creation_handler() + self.task_scheduler = hs.get_task_scheduler() hs.config.consent.user_consent_version = "1" consent_uri_builder = Mock() @@ -502,6 +512,9 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: ) self.url_status_by_delete_id = "/_synapse/admin/v2/rooms/delete_status/" + self.room_member_handler = hs.get_room_member_handler() + self.pagination_handler = hs.get_pagination_handler() + @parameterized.expand( [ ("DELETE", "/_synapse/admin/v2/rooms/%s"), @@ -661,7 +674,7 @@ def test_delete_expired_status(self) -> None: delete_id1 = channel.json_body["delete_id"] # go ahead - self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2) + self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2) # second task channel = self.make_request( @@ -686,12 +699,14 @@ def test_delete_expired_status(self) -> None: self.assertEqual(2, len(channel.json_body["results"])) self.assertEqual("complete", channel.json_body["results"][0]["status"]) self.assertEqual("complete", channel.json_body["results"][1]["status"]) - self.assertEqual(delete_id1, channel.json_body["results"][0]["delete_id"]) - self.assertEqual(delete_id2, channel.json_body["results"][1]["delete_id"]) + delete_ids = {delete_id1, delete_id2} + self.assertTrue(channel.json_body["results"][0]["delete_id"] in delete_ids) + delete_ids.remove(channel.json_body["results"][0]["delete_id"]) + self.assertTrue(channel.json_body["results"][1]["delete_id"] in delete_ids) # get status after more than clearing time for first task # second task is not cleared - self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2) + self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2) channel = self.make_request( "GET", @@ -705,7 +720,7 @@ def test_delete_expired_status(self) -> None: self.assertEqual(delete_id2, channel.json_body["results"][0]["delete_id"]) # get status after more than clearing time for all tasks - self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2) + self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2) channel = self.make_request( "GET", @@ -721,6 +736,13 @@ def test_delete_same_room_twice(self) -> None: body = {"new_room_user_id": self.admin_user} + # Mock PaginationHandler.purge_room to sleep for 100s, so we have time to do a second call + # before the purge is over. Note that it doesn't purge anymore, but we don't care. + async def purge_room(room_id: str, force: bool) -> None: + await deferLater(self.hs.get_reactor(), 100, lambda: None) + + self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[assignment] + # first call to delete room # and do not wait for finish the task first_channel = self.make_request( @@ -728,7 +750,6 @@ def test_delete_same_room_twice(self) -> None: self.url.encode("ascii"), content=body, access_token=self.admin_user_tok, - await_result=False, ) # second call to delete room @@ -742,7 +763,7 @@ def test_delete_same_room_twice(self) -> None: self.assertEqual(400, second_channel.code, msg=second_channel.json_body) self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"]) self.assertEqual( - f"History purge already in progress for {self.room_id}", + f"Purge already in progress for {self.room_id}", second_channel.json_body["error"], ) @@ -751,6 +772,9 @@ def test_delete_same_room_twice(self) -> None: self.assertEqual(200, first_channel.code, msg=first_channel.json_body) self.assertIn("delete_id", first_channel.json_body) + # wait for purge_room to finish + self.pump(1) + # check status after finish the task self._test_result( first_channel.json_body["delete_id"], @@ -972,6 +996,115 @@ def test_shutdown_room_block_peek(self) -> None: # Assert we can no longer peek into the room self._assert_peek(self.room_id, expect_code=403) + @unittest.override_config({"purge_retention_period": "1d"}) + def test_purge_forgotten_room(self) -> None: + # Create a test room + room_id = self.helper.create_room_as( + self.admin_user, + tok=self.admin_user_tok, + ) + + self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok) + self.get_success( + self.room_member_handler.forget( + UserID.from_string(self.admin_user), room_id + ) + ) + + # Test that room is not yet purged + with self.assertRaises(AssertionError): + self._is_purged(room_id) + + # Advance 24 hours in the future, past the `purge_retention_period` + self.reactor.advance(24 * ONE_HOUR_IN_S) + + self._is_purged(room_id) + + def test_scheduled_purge_room(self) -> None: + # Create a test room + room_id = self.helper.create_room_as( + self.admin_user, + tok=self.admin_user_tok, + ) + self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok) + + # Schedule a purge 10 seconds in the future + self.get_success( + self.task_scheduler.schedule_task( + PURGE_ROOM_ACTION_NAME, + resource_id=room_id, + timestamp=self.clock.time_msec() + 10 * 1000, + ) + ) + + # Test that room is not yet purged + with self.assertRaises(AssertionError): + self._is_purged(room_id) + + # Wait for next scheduler run + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS) + + self._is_purged(room_id) + + def test_schedule_shutdown_room(self) -> None: + # Create a test room + room_id = self.helper.create_room_as( + self.other_user, + tok=self.other_user_tok, + ) + + # Schedule a shutdown 10 seconds in the future + delete_id = self.get_success( + self.task_scheduler.schedule_task( + SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME, + resource_id=room_id, + params={ + "requester_user_id": self.admin_user, + "new_room_user_id": self.admin_user, + "new_room_name": None, + "message": None, + "block": False, + "purge": True, + "force_purge": True, + }, + timestamp=self.clock.time_msec() + 10 * 1000, + ) + ) + + # Test that room is not yet shutdown + self._is_member(room_id, self.other_user) + + # Test that room is not yet purged + with self.assertRaises(AssertionError): + self._is_purged(room_id) + + # Wait for next scheduler run + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS) + + # Test that all users has been kicked (room is shutdown) + self._has_no_members(room_id) + + self._is_purged(room_id) + + # Retrieve delete results + result = self.make_request( + "GET", + self.url_status_by_delete_id + delete_id, + access_token=self.admin_user_tok, + ) + self.assertEqual(200, result.code, msg=result.json_body) + + # Check that the user is in kicked_users + self.assertIn( + self.other_user, result.json_body["shutdown_room"]["kicked_users"] + ) + + new_room_id = result.json_body["shutdown_room"]["new_room_id"] + self.assertTrue(new_room_id) + + # Check that the user is actually in the new room + self._is_member(new_room_id, self.other_user) + def _is_blocked(self, room_id: str, expect: bool = True) -> None: """Assert that the room is blocked or not""" d = self.store.is_room_blocked(room_id) @@ -1034,7 +1167,6 @@ def _test_result( kicked_user: a user_id which is kicked from the room expect_new_room: if we expect that a new room was created """ - # get information by room_id channel_room_id = self.make_request( "GET", @@ -1957,11 +2089,8 @@ def test_room_messages_purge(self) -> None: self.assertEqual(len(chunk), 2, [event["content"] for event in chunk]) # Purge every event before the second event. - purge_id = random_string(16) - pagination_handler._purges_by_id[purge_id] = PurgeStatus() self.get_success( - pagination_handler._purge_history( - purge_id=purge_id, + pagination_handler.purge_history( room_id=self.room_id, token=second_token_str, delete_local_events=True, diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py index 28b999573e75..dfd14f5751bf 100644 --- a/tests/rest/admin/test_server_notice.py +++ b/tests/rest/admin/test_server_notice.py @@ -22,6 +22,7 @@ from synapse.storage.roommember import RoomsForUser from synapse.types import JsonDict from synapse.util import Clock +from synapse.util.stringutils import random_string from tests import unittest from tests.unittest import override_config @@ -413,11 +414,24 @@ def test_send_server_notice_delete_room(self) -> None: self.assertEqual(messages[0]["content"]["body"], "test msg one") self.assertEqual(messages[0]["sender"], "@notices:test") + random_string(16) + # shut down and purge room self.get_success( - self.room_shutdown_handler.shutdown_room(first_room_id, self.admin_user) - ) - self.get_success(self.pagination_handler.purge_room(first_room_id)) + self.room_shutdown_handler.shutdown_room( + first_room_id, + { + "requester_user_id": self.admin_user, + "new_room_user_id": None, + "new_room_name": None, + "message": None, + "block": False, + "purge": True, + "force_purge": False, + }, + ) + ) + self.get_success(self.pagination_handler.purge_room(first_room_id, force=False)) # user is not member anymore self._check_invite_and_join_status(self.other_user, 0, 0) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 53182459e487..10209e6d8c35 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -41,7 +41,6 @@ from synapse.appservice import ApplicationService from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.handlers.pagination import PurgeStatus from synapse.rest import admin from synapse.rest.client import account, directory, login, profile, register, room, sync from synapse.server import HomeServer @@ -2086,11 +2085,8 @@ def test_room_messages_purge(self) -> None: self.assertEqual(len(chunk), 2, [event["content"] for event in chunk]) # Purge every event before the second event. - purge_id = random_string(16) - pagination_handler._purges_by_id[purge_id] = PurgeStatus() self.get_success( - pagination_handler._purge_history( - purge_id=purge_id, + pagination_handler.purge_history( room_id=self.room_id, token=second_token_str, delete_local_events=True, From 8660b83a31a5116fe09600250f5e96e62b2a2675 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 29 Aug 2023 11:04:23 +0200 Subject: [PATCH 02/10] Fix NOT_SPAM --- synapse/handlers/room.py | 5 ++--- synapse/handlers/room_member.py | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ffd24fa546a8..278ecde12a86 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -54,7 +54,6 @@ from synapse.events.snapshot import UnpersistedEventContext from synapse.events.utils import copy_and_fixup_power_levels_contents from synapse.handlers.relations import BundledAggregations -from synapse.module_api import NOT_SPAM from synapse.rest.admin._base import assert_user_is_admin from synapse.streams import EventSource from synapse.types import ( @@ -455,7 +454,7 @@ async def clone_existing_room( spam_check = await self._spam_checker_module_callbacks.user_may_create_room( user_id ) - if spam_check != NOT_SPAM: + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: raise SynapseError( 403, "You are not permitted to create rooms", @@ -769,7 +768,7 @@ async def create_room( spam_check = await self._spam_checker_module_callbacks.user_may_create_room( user_id ) - if spam_check != NOT_SPAM: + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: raise SynapseError( 403, "You are not permitted to create rooms", diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b5ff3badae92..6e787e4e33a8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -829,7 +829,7 @@ async def update_membership_locked( spam_check = await self._spam_checker_module_callbacks.user_may_invite( requester.user.to_string(), target_id, room_id ) - if spam_check != NOT_SPAM: + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: logger.info("Blocking invite due to spam checker") block_invite_result = spam_check @@ -964,7 +964,7 @@ async def update_membership_locked( target.to_string(), room_id, is_invited=inviter is not None ) ) - if spam_check != NOT_SPAM: + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: raise SynapseError( 403, "Not allowed to join this room", @@ -1582,7 +1582,7 @@ async def do_3pid_invite( room_id=room_id, ) ) - if spam_check != NOT_SPAM: + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: raise SynapseError( 403, "Cannot send threepid invite", From 20f6905752e74c75821d7196ea80c5f93b90fad4 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 29 Aug 2023 11:20:28 +0200 Subject: [PATCH 03/10] lint --- synapse/handlers/room_member.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6e787e4e33a8..863d5d928003 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -44,7 +44,6 @@ from synapse.logging import opentracing from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.module_api import NOT_SPAM from synapse.types import ( JsonDict, Requester, From 9cfaf7b46ac667c2356dbf5c2715c50a9e575a85 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 29 Aug 2023 13:49:41 +0200 Subject: [PATCH 04/10] fix worker --- synapse/app/generic_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d25e3548e075..f7c80eee210d 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -77,6 +77,7 @@ ) from synapse.storage.databases.main.presence import PresenceStore from synapse.storage.databases.main.profile import ProfileWorkerStore +from synapse.storage.databases.main.purge_events import PurgeEventsStore from synapse.storage.databases.main.push_rule import PushRulesWorkerStore from synapse.storage.databases.main.pusher import PusherWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore @@ -134,6 +135,7 @@ class GenericWorkerStore( RelationsWorkerStore, EventFederationWorkerStore, EventPushActionsWorkerStore, + PurgeEventsStore, StateGroupWorkerStore, SignatureWorkerStore, UserErasureWorkerStore, From a366a337042699ccedbd0f591e0a089451fff944 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 5 Sep 2023 14:38:01 +0200 Subject: [PATCH 05/10] Update synapse/handlers/pagination.py Co-authored-by: Erik Johnston --- synapse/handlers/pagination.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index fbcd2e680587..4573f0112f84 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -222,7 +222,7 @@ async def purge_history_for_rooms_in_range( (stream, topo, _event_id) = r token = "t%d-%d" % (topo, stream) - logger.info("Starting purging events in room %s" % (room_id)) + logger.info("Starting purging events in room %s", room_id) # We want to purge everything, including local events, and to run the purge in # the background so that it's not blocking any other operation apart from From 4afe100d07e8a2a9fc85e3b55f533b03601612cd Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 5 Sep 2023 15:02:04 +0200 Subject: [PATCH 06/10] Rename and document config --- docs/usage/configuration/config_documentation.md | 11 +++++++++++ synapse/config/server.py | 14 ++++++++------ synapse/handlers/pagination.py | 4 +++- synapse/handlers/room_member.py | 9 ++++++--- tests/rest/admin/test_room.py | 4 ++-- 5 files changed, 30 insertions(+), 12 deletions(-) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 0b1725816e3d..a07b4898e776 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -934,6 +934,17 @@ Example configuration: redaction_retention_period: 28d ``` --- +### `forgotten_room_retention_period` + +How long to keep locally forgotten rooms before purging them from the DB. + +Defaults to `null`, meaning it's disabled. + +Example configuration: +```yaml +forgotten_room_retention_period: 28d +``` +--- ### `user_ips_max_age` How long to track users' last seen time and IPs in the database. diff --git a/synapse/config/server.py b/synapse/config/server.py index cb93e566782e..72d30da30082 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -486,14 +486,16 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: else: self.redaction_retention_period = None - # How long to keep locally forgotten rooms before purging them. - purge_retention_period = config.get("purge_retention_period", "7d") - if purge_retention_period is not None: - self.purge_retention_period: Optional[int] = self.parse_duration( - purge_retention_period + # How long to keep locally forgotten rooms before purging them from the DB. + forgotten_room_retention_period = config.get( + "forgotten_room_retention_period", None + ) + if forgotten_room_retention_period is not None: + self.forgotten_room_retention_period: Optional[int] = self.parse_duration( + forgotten_room_retention_period ) else: - self.purge_retention_period = None + self.forgotten_room_retention_period = None # How long to keep entries in the `users_ips` table. user_ips_max_age = config.get("user_ips_max_age", "28d") diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 4573f0112f84..622bf2ba5774 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -99,7 +99,9 @@ def __init__(self, hs: "HomeServer"): self._retention_allowed_lifetime_max = ( hs.config.retention.retention_allowed_lifetime_max ) - self._purge_retention_period = hs.config.server.purge_retention_period + self._forgotten_room_retention_period = ( + hs.config.server.forgotten_room_retention_period + ) self._is_master = hs.config.worker.worker_app is None if hs.config.retention.retention_enabled and self._is_master: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 863d5d928003..3a0fc641b816 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -176,7 +176,9 @@ def __init__(self, hs: "HomeServer"): self.request_ratelimiter = hs.get_request_ratelimiter() hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room) - self._purge_retention_period = hs.config.server.purge_retention_period + self._forgotten_room_retention_period = ( + hs.config.server.forgotten_room_retention_period + ) def _on_user_joined_room(self, event_id: str, room_id: str) -> None: """Notify the rate limiter that a room join has occurred. @@ -313,13 +315,14 @@ async def forget( # room around and we automatically purge room after a little bit if ( not do_not_schedule_purge - and self._purge_retention_period + and self._forgotten_room_retention_period and await self.store.is_locally_forgotten_room(room_id) ): await self.hs.get_task_scheduler().schedule_task( PURGE_ROOM_ACTION_NAME, resource_id=room_id, - timestamp=self.clock.time_msec() + self._purge_retention_period, + timestamp=self.clock.time_msec() + + self._forgotten_room_retention_period, ) async def ratelimit_multiple_invites( diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index e0b5a0b359ab..f8c967e41455 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -996,7 +996,7 @@ def test_shutdown_room_block_peek(self) -> None: # Assert we can no longer peek into the room self._assert_peek(self.room_id, expect_code=403) - @unittest.override_config({"purge_retention_period": "1d"}) + @unittest.override_config({"forgotten_room_retention_period": "1d"}) def test_purge_forgotten_room(self) -> None: # Create a test room room_id = self.helper.create_room_as( @@ -1015,7 +1015,7 @@ def test_purge_forgotten_room(self) -> None: with self.assertRaises(AssertionError): self._is_purged(room_id) - # Advance 24 hours in the future, past the `purge_retention_period` + # Advance 24 hours in the future, past the `forgotten_room_retention_period` self.reactor.advance(24 * ONE_HOUR_IN_S) self._is_purged(room_id) From b82288f0d4bf065756bd1bbe079005e60e139460 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 5 Sep 2023 15:12:37 +0200 Subject: [PATCH 07/10] Fix mypy --- synapse/rest/admin/__init__.py | 2 +- tests/rest/admin/test_room.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 0a7869c1b1bb..6c4dc12f0f16 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -221,7 +221,7 @@ async def on_GET( if purge_task is None or purge_task.action != PURGE_HISTORY_ACTION_NAME: raise NotFoundError("purge id '%s' not found" % purge_id) - result = { + result: JsonDict = { "status": purge_task.status if purge_task.status == TaskStatus.COMPLETE or purge_task.status == TaskStatus.FAILED diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index f8c967e41455..6ed451d7c465 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -741,7 +741,7 @@ def test_delete_same_room_twice(self) -> None: async def purge_room(room_id: str, force: bool) -> None: await deferLater(self.hs.get_reactor(), 100, lambda: None) - self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[assignment] + self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[method-assign] # first call to delete room # and do not wait for finish the task From 981bf54c471ec0fb7eaf9588595e5a29b1108dad Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 6 Sep 2023 12:24:27 +0200 Subject: [PATCH 08/10] Add back check --- synapse/handlers/pagination.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 622bf2ba5774..878f267a4e45 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -161,7 +161,7 @@ async def purge_history_for_rooms_in_range( include_null = False logger.info( - "[purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s)", + "[purge] Running retention purge job for %s < max_lifetime <= %s (include NULLs = %s)", min_ms, max_ms, include_null, @@ -176,6 +176,14 @@ async def purge_history_for_rooms_in_range( for room_id, retention_policy in rooms.items(): logger.info("[purge] Attempting to purge messages in room %s", room_id) + if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0: + logger.warning( + "[purge] not purging room %s for retention as there's an ongoing purge" + " running for this room", + room_id, + ) + continue + # If max_lifetime is None, it means that the room has no retention policy. # Given we only retrieve such rooms when there's a default retention policy # defined in the server's configuration, we can safely assume that's the From 6f8e34dadfab0fc2f7edb9d5dc4e4ca54784aed8 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 6 Sep 2023 14:33:32 +0200 Subject: [PATCH 09/10] Fix merge --- synapse/handlers/room.py | 2 +- synapse/module_api/__init__.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d02a823a5844..a0c3b168197b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1780,7 +1780,7 @@ class ShutdownRoomParams(TypedDict): even if there are still users joined to the room. """ - requester_user_id: str + requester_user_id: Optional[str] new_room_user_id: Optional[str] new_room_name: Optional[str] message: Optional[str] diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index d6efe10a28ba..7045b3c4d61a 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -1741,7 +1741,18 @@ async def delete_room(self, room_id: str) -> None: """ # Future extensions to this method might want to e.g. allow use of `force_purge`. # TODO In the future we should make sure this is persistent. - self._hs.get_pagination_handler().start_shutdown_and_purge_room(room_id, None) + await self._hs.get_pagination_handler().start_shutdown_and_purge_room( + room_id, + { + "new_room_user_id": None, + "new_room_name": None, + "message": None, + "requester_user_id": None, + "block": False, + "purge": True, + "force_purge": False, + }, + ) async def set_displayname( self, From c666a546d2a7385a601fb6aeb402611269a1d1b0 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 12 Sep 2023 15:12:21 +0200 Subject: [PATCH 10/10] Remove useless TODO --- synapse/rest/admin/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 2e06f44c6ef0..7d0b4b55a0df 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -229,7 +229,6 @@ async def on_GET( if purge_task.error: result["error"] = purge_task.error - # TODO active vs purging etc return HTTPStatus.OK, result