From 8648b156ae089c3643c5d09464914fcc2dd1f460 Mon Sep 17 00:00:00 2001 From: Brianna Major Date: Mon, 18 Dec 2023 10:41:31 -0500 Subject: [PATCH 1/4] MAINT: Simplify Viewers class --- itkwidgets/cell_watcher.py | 35 ++++++----------------------------- 1 file changed, 6 insertions(+), 29 deletions(-) diff --git a/itkwidgets/cell_watcher.py b/itkwidgets/cell_watcher.py index acc6361d..8d3f8a19 100644 --- a/itkwidgets/cell_watcher.py +++ b/itkwidgets/cell_watcher.py @@ -19,41 +19,18 @@ def data(self): @property def not_created(self): # Return a list of names of viewers that have not been created yet - names = [] - for key, val in self.data.items(): - name = val['name'] - if not val['status']: - name = name if name is not None else key - names.append(name) - return names + return [k for k in self.data.keys() if not self.viewer_ready(k)] - @property - def not_named(self): - # Return a list of names of viewers that have not been named yet - return any([k for k, v in self.data.items() if v['name'] is None]) - - @property - def viewer_objects(self): - # Return a list of created viewers - return list(self.data.keys()) - - def add_viewer(self, view): - self.data[view] = {'name': None, 'status': False} - - def set_name(self, view, name): - if view not in self.data.keys(): - self.add_viewer(view) - self.data[view]['name'] = name + def add_viewer(self, view: str) -> None: + self.data[view] = {"ready": False} def update_viewer_status(self, view, status): if view not in self.data.keys(): self.add_viewer(view) - self.data[view]['status'] = status + self.data[view]["ready"] = status - def viewer_ready(self, view): - if viewer := self.data.get(view): - return viewer['status'] - return False + def viewer_ready(self, view: str) -> bool: + return self.data.get(view, {}).get("ready", False) class CellWatcher(object): From cf6fe9dc3e349ef99b86c5d446e34fe8548e6681 Mon Sep 17 00:00:00 2001 From: Brianna Major Date: Mon, 18 Dec 2023 10:42:13 -0500 Subject: [PATCH 2/4] DOC: Update Viewers class docstrings --- itkwidgets/cell_watcher.py | 46 ++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/itkwidgets/cell_watcher.py b/itkwidgets/cell_watcher.py index 8d3f8a19..2cc75213 100644 --- a/itkwidgets/cell_watcher.py +++ b/itkwidgets/cell_watcher.py @@ -1,6 +1,7 @@ import asyncio import sys from inspect import isawaitable, iscoroutinefunction +from typing import Dict, List from IPython import get_ipython from queue import Queue from imjoy_rpc.utils import FuturePromise @@ -9,27 +10,64 @@ class Viewers(object): + """This class is designed to track each instance of the Viewer class that + is instantiated as well as whether or not that instance is available for + updates or requests. + """ def __init__(self): self._data = {} @property - def data(self): + def data(self) -> Dict[str, Dict[str, bool]]: + """Get the underlying data dict containg all viewer data + + :return: The data object that contains all created Viewer information. + :rtype: Dict[str, Dict[str, bool]] + """ return self._data @property - def not_created(self): - # Return a list of names of viewers that have not been created yet + def not_created(self) -> List[str]: + """Return a list of all unavailable viewers + + :return: A list of names of viewers that have not yet been created. + :rtype: List[str] + """ return [k for k in self.data.keys() if not self.viewer_ready(k)] def add_viewer(self, view: str) -> None: + """Add a new Viewer object to track. + + :param view: The unique string identifier for the Viewer object + :type view: str + """ self.data[view] = {"ready": False} - def update_viewer_status(self, view, status): + def update_viewer_status(self, view: str, status: bool) -> None: + """Update a Viewer's 'ready' status. + + :param view: The unique string identifier for the Viewer object + :type view: str + :param status: Boolean value indicating whether or not the viewer is + available for requests or updates. This should be false when the plugin + API is not yet available or new data is not yet rendered. + :type status: bool + """ if view not in self.data.keys(): self.add_viewer(view) self.data[view]["ready"] = status def viewer_ready(self, view: str) -> bool: + """Request the 'ready' status of a viewer. + + :param view: The unique string identifier for the Viewer object + :type view: str + + :return: Boolean value indicating whether or not the viewer is + available for requests or updates. This will be false when the plugin + API is not yet available or new data is not yet rendered. + :rtype: bool + """ return self.data.get(view, {}).get("ready", False) From 3c7b60b3bc0cccb9639b5e8d3322cd357633f4b9 Mon Sep 17 00:00:00 2001 From: Brianna Major Date: Tue, 19 Dec 2023 08:44:29 -0500 Subject: [PATCH 3/4] DOC: Add docstrings to CellWatcher class --- itkwidgets/cell_watcher.py | 170 ++++++++++++++++++++++++++++--------- 1 file changed, 131 insertions(+), 39 deletions(-) diff --git a/itkwidgets/cell_watcher.py b/itkwidgets/cell_watcher.py index 2cc75213..51d31146 100644 --- a/itkwidgets/cell_watcher.py +++ b/itkwidgets/cell_watcher.py @@ -1,10 +1,12 @@ import asyncio import sys from inspect import isawaitable, iscoroutinefunction -from typing import Dict, List +from typing import Callable, Dict, List from IPython import get_ipython +from IPython.core.interactiveshell import ExecutionResult from queue import Queue from imjoy_rpc.utils import FuturePromise +from zmq.eventloop.zmqstream import ZMQStream background_tasks = set() @@ -72,13 +74,23 @@ def viewer_ready(self, view: str) -> bool: class CellWatcher(object): + """A singleton class used in interactive Jupyter notebooks in order to + support asynchronous network communication that would otherwise be blocked + by the IPython kernel. + """ + def __new__(cls): - if not hasattr(cls, '_instance'): + """Create a singleton class.""" + if not hasattr(cls, "_instance"): cls._instance = super(CellWatcher, cls).__new__(cls) cls._instance.setup() return cls._instance - def setup(self): + def setup(self) -> None: + """Perform the initial setup, including intercepting 'execute_request' + handlers so that we can handle them internally before the IPython + kernel does. + """ self.viewers = Viewers() self.shell = get_ipython() self.kernel = self.shell.kernel @@ -102,22 +114,52 @@ def setup(self): # Call self.post_run_cell every time the post_run_cell signal is emitted # post_run_cell runs after interactive execution (e.g. a cell in a notebook) - self.shell.events.register('post_run_cell', self.post_run_cell) + self.shell.events.register("post_run_cell", self.post_run_cell) + + def add_viewer(self, view: str) -> None: + """Add a new Viewer object to track. - def add_viewer(self, view): + :param view: The unique string identifier for the Viewer object + :type view: str + """ # Track all Viewer instances self.viewers.add_viewer(view) - def update_viewer_status(self, view, status): + def update_viewer_status(self, view: str, status: bool) -> None: + """Update a Viewer's 'ready' status. If the last cell run failed + because the viewer was unavailable try to run the cell again. + + :param view: The unique string identifier for the Viewer object + :type view: str + :param status: Boolean value indicating whether or not the viewer is + available for requests or updates. This should be false when the plugin + API is not yet available or new data is not yet rendered. + :type status: bool + """ self.viewers.update_viewer_status(view, status) - if self.waiting_on_viewer: + if status and self.waiting_on_viewer: # Might be ready now, try again self.create_task(self.execute_next_request) - def viewer_ready(self, view): + def viewer_ready(self, view: str) -> bool: + """Request the 'ready' status of a viewer. + + :param view: The unique string identifier for the Viewer object + :type view: str + + :return: Boolean value indicating whether or not the viewer is + available for requests or updates. This will be false when the plugin + API is not yet available or new data is not yet rendered. + :rtype: bool + """ return self.viewers.viewer_ready(view) - def _task_cleanup(self, task): + def _task_cleanup(self, task: asyncio.Task) -> None: + """Callback to discard references to tasks once they've completed. + + :param task: Completed task that no longer needs a strong reference + :type task: asyncio.Task + """ global background_tasks try: # "Handle" exceptions here to prevent further errors. Exceptions @@ -127,7 +169,12 @@ def _task_cleanup(self, task): except: background_tasks.discard(task) - def create_task(self, fn): + def create_task(self, fn: Callable) -> None: + """Create a task from the function passed in. + + :param fn: Coroutine to run concurrently as a Task + :type fn: Callable + """ global background_tasks # The event loop only keeps weak references to tasks. # Gather them into a set to avoid garbage collection mid-task. @@ -135,32 +182,70 @@ def create_task(self, fn): background_tasks.add(task) task.add_done_callback(self._task_cleanup) - def capture_event(self, stream, ident, parent): + def capture_event(self, stream: ZMQStream, ident: list, parent: dict) -> None: + """Capture execute_request messages so that we can queue and process + them concurrently as tasks to prevent blocking. + + :param stream: Class to manage event-based messaging on a zmq socket + :type stream: ZMQStream + :param ident: ZeroMQ routing prefix, which can be zero or more socket + identities + :type ident: list + :param parent: A dictonary of dictionaries representing a complete + message as defined by the Jupyter message specification + :type parent: dict + """ self._events.put((stream, ident, parent)) if self._events.qsize() == 1 and self.ready_to_run_next_cell(): # We've added a new task to an empty queue. # Begin executing tasks again. self.create_task(self.execute_next_request) - async def capture_event_async(self, stream, ident, parent): + async def capture_event_async( + self, stream: ZMQStream, ident: list, parent: dict + ) -> None: + """Capture execute_request messages so that we can queue and process + them concurrently as tasks to prevent blocking. + Asynchronous for ipykernel 6+. + + :param stream: Class to manage event-based messaging on a zmq socket + :type stream: ZMQStream + :param ident: ZeroMQ routing prefix, which can be zero or more socket + identities + :type ident: list + :param parent: A dictonary of dictionaries representing a complete + message as defined by the Jupyter message specification + :type parent: dict + """ # ipykernel 6+ self.capture_event(stream, ident, parent) @property - def all_getters_resolved(self): - # Check if all of the getter/setter futures have resolved + def all_getters_resolved(self) -> bool: + """Determine if all tasks representing asynchronous network calls that + fetch values have resolved. + + :return: Whether or not all tasks for the current cell have resolved + :rtype: bool + """ getters_resolved = [f.done() for f in self.results.values()] return all(getters_resolved) - def ready_to_run_next_cell(self): - # Any itk_viewer objects need to be available and all getters/setters - # need to be resolved + def ready_to_run_next_cell(self) -> bool: + """Determine if we are ready to run the next cell in the queue. + + :return: If created Viewer objects are available and all futures are + resolved. + :rtype: bool + """ self.waiting_on_viewer = len(self.viewers.not_created) return self.all_getters_resolved and not self.waiting_on_viewer - async def execute_next_request(self): - # Modeled after the approach used in jupyter-ui-poll - # https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101 + async def execute_next_request(self) -> None: + """Grab the next request if needed and then run the cell if it it ready + to be run. Modeled after the approach used in jupyter-ui-poll. + :ref: https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101 + """ if self._events.empty(): self.abort_all = False @@ -172,7 +257,8 @@ async def execute_next_request(self): # Continue processing the remaining queued tasks await self._execute_next_request() - async def _execute_next_request(self): + async def _execute_next_request(self) -> None: + """Run the cell with the ipykernel shell_handler for execute_request""" # Here we actually run the queued cell as it would have been run stream, ident, parent = self.current_request @@ -202,32 +288,38 @@ async def _execute_next_request(self): # Continue processing the remaining queued tasks self.create_task(self.execute_next_request) - def update_namespace(self): - # Update the namespace variables with the results from the getters + def update_namespace(self) -> None: + """Update the namespace variables with the results from the getters""" # FIXME: This is a temporary "fix" and does not handle updating output keys = [k for k in self.shell.user_ns.keys()] - try: - for key in keys: - value = self.shell.user_ns[key] - if asyncio.isfuture(value) and (isinstance(value, FuturePromise) or isinstance(value, asyncio.Task)): - # Getters/setters return futures - # They should all be resolved now, so use the result - self.shell.user_ns[key] = value.result() - self.results.clear() - except Exception as e: - self.results.clear() - self.abort_all = True - self.create_task(self._execute_next_request) - raise e - - def _callback(self, *args, **kwargs): + for key in keys: + value = self.shell.user_ns[key] + if asyncio.isfuture(value) and ( + isinstance(value, FuturePromise) or isinstance(value, asyncio.Task) + ): + # Functions that need to return values from asynchronous + # network requests return futures. They should all be resolved + # now, so use the result. + self.shell.user_ns[key] = value.result() + self.results.clear() + + def _callback(self, *args, **kwargs) -> None: + """After each future resolves check to see if they are all resolved. If + so, update the namespace and run the next cell in the queue. + """ # After each getter/setter resolves check if they've all resolved if self.all_getters_resolved: self.update_namespace() self.current_request = None self.create_task(self.execute_next_request) - def post_run_cell(self, response): + def post_run_cell(self, response: ExecutionResult) -> None: + """Runs after interactive execution (e.g. a cell in a notebook). Set + the abort flag if there are errors produced by cell execution. + + :param response: The response message produced by cell execution + :type response: ExecutionResult + """ # Abort remaining cells on error in execution if response.error_in_exec is not None: self.abort_all = True From e22b2f4926f57f2223b722adbc7f7fafd68ae80d Mon Sep 17 00:00:00 2001 From: Brianna Major Date: Tue, 19 Dec 2023 11:09:36 -0500 Subject: [PATCH 4/4] DOC: Improve Viewers.data return description --- itkwidgets/cell_watcher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/itkwidgets/cell_watcher.py b/itkwidgets/cell_watcher.py index 51d31146..57d7d5f6 100644 --- a/itkwidgets/cell_watcher.py +++ b/itkwidgets/cell_watcher.py @@ -23,7 +23,9 @@ def __init__(self): def data(self) -> Dict[str, Dict[str, bool]]: """Get the underlying data dict containg all viewer data - :return: The data object that contains all created Viewer information. + :return: A dict of key, value pairs mapping the unique Viewer name to a + dictionary containing a 'ready' key and a boolean value reflecting the + ready state of the Viewer. :rtype: Dict[str, Dict[str, bool]] """ return self._data