diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index 23f96132..b23c4cdd 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -103,10 +103,5 @@ def initialize_handlers(self): async def stop_extension(self): # Cancel tasks and clean up - await asyncio.wait( - [ - asyncio.create_task(self.ywebsocket_server.clean()), - asyncio.create_task(self.file_loaders.clear()), - ], - timeout=3, - ) + await self.ywebsocket_server.clean() + await self.file_loaders.clean() diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index 403bfd5c..5788b9b8 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -20,7 +20,7 @@ from .loaders import FileLoaderMapping from .rooms import DocumentRoom, TransientRoom -from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, decode_file_path +from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, decode_file_path, cancel_task from .websocketserver import JupyterWebsocketServer YFILE = YDOCS["file"] @@ -127,7 +127,10 @@ def __aiter__(self): async def __anext__(self): # needed to be compatible with WebsocketServer (async for message in websocket) - message = await self._message_queue.get() + try: + message = await self._message_queue.get() + except asyncio.CancelledError: + message = None if not message: raise StopAsyncIteration() return message @@ -160,7 +163,7 @@ async def open(self, room_id): # cancel the deletion of the room if it was scheduled if self.room.cleaner is not None: - self.room.cleaner.cancel() + await cancel_task(self.room.cleaner) try: # Initialize the room @@ -189,8 +192,11 @@ async def recv(self): """ Receive a message from the client. """ - message = await self._message_queue.get() - return message + try: + message = await self._message_queue.get() + return message + except asyncio.CancelledError: + return None def on_message(self, message): """ diff --git a/jupyter_collaboration/loaders.py b/jupyter_collaboration/loaders.py index eba3fc8a..1c350a25 100644 --- a/jupyter_collaboration/loaders.py +++ b/jupyter_collaboration/loaders.py @@ -14,7 +14,7 @@ from jupyter_server.utils import ensure_async from jupyter_server_fileid.manager import BaseFileIdManager -from .utils import OutOfBandChanges +from .utils import OutOfBandChanges, cancel_task class FileLoader: @@ -73,9 +73,7 @@ async def clean(self) -> None: Stops the watch task. """ if self._watcher is not None: - if not self._watcher.cancelled(): - self._watcher.cancel() - await self._watcher + await cancel_task(self._watcher) def observe( self, id: str, callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]] @@ -241,18 +239,16 @@ def __getitem__(self, file_id: str) -> FileLoader: return file - async def __delitem__(self, file_id: str) -> None: + def __delitem__(self, file_id: str) -> None: """Delete a loader for a given file.""" - await self.remove(file_id) + self.remove(file_id) - async def clear(self) -> None: + async def clean(self) -> None: """Clear all loaders.""" tasks = [] for id in list(self.__dict): loader = self.__dict.pop(id) - tasks.append(loader.clean()) - - await asyncio.gather(*tasks) + await loader.clean() async def remove(self, file_id: str) -> None: """Remove the loader for a given file.""" diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 32657a6e..ed8d5eae 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -13,7 +13,7 @@ from ypy_websocket.ystore import BaseYStore, YDocNotFound from .loaders import FileLoader -from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges +from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges, cancel_task YFILE = YDOCS["file"] @@ -158,20 +158,23 @@ def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = No self._logger.emit(schema_id=JUPYTER_COLLABORATION_EVENTS_URI, data=data) - def _clean(self) -> None: + async def _clean(self) -> None: """ Cleans the rooms. Cancels the save task and unsubscribes from the file. """ - super()._clean() - # TODO: Should we cancel or wait ? - if self._saving_document: - self._saving_document.cancel() + await super()._clean() self._document.unobserve() self._file.unobserve(self.room_id) + if self.cleaner: + await cancel_task(self.cleaner) + + if self._saving_document: + await cancel_task(self._saving_document) + async def _broadcast_updates(self): # FIXME should be upstreamed try: diff --git a/jupyter_collaboration/utils.py b/jupyter_collaboration/utils.py index 5362b474..37462a6d 100644 --- a/jupyter_collaboration/utils.py +++ b/jupyter_collaboration/utils.py @@ -1,6 +1,7 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import pathlib from enum import Enum from typing import Tuple @@ -59,3 +60,11 @@ def encode_file_path(format: str, file_type: str, file_id: str) -> str: path (str): File path. """ return f"{format}:{file_type}:{file_id}" + + +async def cancel_task(task: asyncio.Task): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass diff --git a/jupyter_collaboration/websocketserver.py b/jupyter_collaboration/websocketserver.py index f74255ee..5db01905 100644 --- a/jupyter_collaboration/websocketserver.py +++ b/jupyter_collaboration/websocketserver.py @@ -11,6 +11,8 @@ from ypy_websocket.websocket_server import WebsocketServer, YRoom from ypy_websocket.ystore import BaseYStore +from .utils import cancel_task + class RoomNotFound(LookupError): pass @@ -40,46 +42,10 @@ def __init__( self.monitor_task: asyncio.Task | None = None async def clean(self): - # TODO: should we wait for any save task? - self.log.info("Deleting all rooms.") - # FIXME some clean up should be upstreamed and the following does not - # prevent hanging stop process - it also requires some thinking about - # should the ystore write action be cancelled; I guess not as it could - # results in corrupted data. - # room_tasks = list() - # for name, room in list(self.rooms.items()): - # for task in room.background_tasks: - # task.cancel() # FIXME should be upstreamed - # room_tasks.append(task) - # if room_tasks: - # _, pending = await asyncio.wait(room_tasks, timeout=3) - # if pending: - # msg = f"{len(pending)} room task(s) are pending." - # self.log.warning(msg) - # self.log.debug("Pending tasks: %r", pending) - - tasks = [] - for name, room in list(self.rooms.items()): - try: - self.delete_room(name=name) - except Exception as e: # Capture exception as room may be auto clean - msg = f"Failed to delete room {name}" - self.log.debug(msg, exc_info=e) - else: - tasks.append(room._broadcast_task) # FIXME should be upstreamed + await super().clean() + if self.monitor_task is not None: - self.monitor_task.cancel() - tasks.append(self.monitor_task) - for task in self.background_tasks: - task.cancel() # FIXME should be upstreamed - tasks.append(task) - - if tasks: - _, pending = await asyncio.wait(tasks, timeout=3) - if pending: - msg = f"{len(pending)} task(s) are pending." - self.log.warning(msg) - self.log.debug("Pending tasks: %r", pending) + await cancel_task(self.monitor_task) def room_exists(self, path: str) -> bool: """ @@ -139,10 +105,7 @@ async def _monitor(self): This method runs in a coroutine for debugging purposes. """ while True: - try: - await asyncio.sleep(60) - except asyncio.CancelledError: - break + await asyncio.sleep(60) clients_nb = sum(len(room.clients) for room in self.rooms.values()) self.log.info("Processed %s Y patches in one minute", self.ypatch_nb) self.log.info("Connected Y users: %s", clients_nb)