Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cell watcher docstrings #716

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 181 additions & 72 deletions itkwidgets/cell_watcher.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,98 @@
import asyncio
import sys
from inspect import isawaitable, iscoroutinefunction
from typing import Callable, Dict, List
from IPython import get_ipython
from IPython.core.interactiveshell import ExecutionResult
from queue import Queue
from imjoy_rpc.utils import FuturePromise
from zmq.eventloop.zmqstream import ZMQStream

background_tasks = set()


class Viewers(object):
"""This class is designed to track each instance of the Viewer class that
is instantiated as well as whether or not that instance is available for
updates or requests.
"""
def __init__(self):
self._data = {}

@property
def data(self):
def data(self) -> Dict[str, Dict[str, bool]]:
"""Get the underlying data dict containg all viewer data

:return: A dict of key, value pairs mapping the unique Viewer name to a
dictionary containing a 'ready' key and a boolean value reflecting the
ready state of the Viewer.
:rtype: Dict[str, Dict[str, bool]]
"""
return self._data

@property
def not_created(self):
# Return a list of names of viewers that have not been created yet
names = []
for key, val in self.data.items():
name = val['name']
if not val['status']:
name = name if name is not None else key
names.append(name)
return names

@property
def not_named(self):
# Return a list of names of viewers that have not been named yet
return any([k for k, v in self.data.items() if v['name'] is None])

@property
def viewer_objects(self):
# Return a list of created viewers
return list(self.data.keys())

def add_viewer(self, view):
self.data[view] = {'name': None, 'status': False}

def set_name(self, view, name):
def not_created(self) -> List[str]:
"""Return a list of all unavailable viewers

:return: A list of names of viewers that have not yet been created.
:rtype: List[str]
"""
return [k for k in self.data.keys() if not self.viewer_ready(k)]

def add_viewer(self, view: str) -> None:
"""Add a new Viewer object to track.

:param view: The unique string identifier for the Viewer object
:type view: str
"""
self.data[view] = {"ready": False}

def update_viewer_status(self, view: str, status: bool) -> None:
"""Update a Viewer's 'ready' status.

:param view: The unique string identifier for the Viewer object
:type view: str
:param status: Boolean value indicating whether or not the viewer is
available for requests or updates. This should be false when the plugin
API is not yet available or new data is not yet rendered.
:type status: bool
"""
if view not in self.data.keys():
self.add_viewer(view)
self.data[view]['name'] = name
self.data[view]["ready"] = status

def update_viewer_status(self, view, status):
if view not in self.data.keys():
self.add_viewer(view)
self.data[view]['status'] = status
def viewer_ready(self, view: str) -> bool:
"""Request the 'ready' status of a viewer.

:param view: The unique string identifier for the Viewer object
:type view: str

def viewer_ready(self, view):
if viewer := self.data.get(view):
return viewer['status']
return False
:return: Boolean value indicating whether or not the viewer is
available for requests or updates. This will be false when the plugin
API is not yet available or new data is not yet rendered.
:rtype: bool
"""
return self.data.get(view, {}).get("ready", False)


class CellWatcher(object):
"""A singleton class used in interactive Jupyter notebooks in order to
support asynchronous network communication that would otherwise be blocked
by the IPython kernel.
"""

def __new__(cls):
if not hasattr(cls, '_instance'):
"""Create a singleton class."""
if not hasattr(cls, "_instance"):
cls._instance = super(CellWatcher, cls).__new__(cls)
cls._instance.setup()
return cls._instance

def setup(self):
def setup(self) -> None:
"""Perform the initial setup, including intercepting 'execute_request'
handlers so that we can handle them internally before the IPython
kernel does.
"""
self.viewers = Viewers()
self.shell = get_ipython()
self.kernel = self.shell.kernel
Expand All @@ -87,22 +116,52 @@ def setup(self):

# Call self.post_run_cell every time the post_run_cell signal is emitted
# post_run_cell runs after interactive execution (e.g. a cell in a notebook)
self.shell.events.register('post_run_cell', self.post_run_cell)
self.shell.events.register("post_run_cell", self.post_run_cell)

def add_viewer(self, view: str) -> None:
"""Add a new Viewer object to track.

def add_viewer(self, view):
:param view: The unique string identifier for the Viewer object
:type view: str
"""
# Track all Viewer instances
self.viewers.add_viewer(view)

def update_viewer_status(self, view, status):
def update_viewer_status(self, view: str, status: bool) -> None:
"""Update a Viewer's 'ready' status. If the last cell run failed
because the viewer was unavailable try to run the cell again.

:param view: The unique string identifier for the Viewer object
:type view: str
:param status: Boolean value indicating whether or not the viewer is
available for requests or updates. This should be false when the plugin
API is not yet available or new data is not yet rendered.
:type status: bool
"""
self.viewers.update_viewer_status(view, status)
if self.waiting_on_viewer:
if status and self.waiting_on_viewer:
# Might be ready now, try again
self.create_task(self.execute_next_request)

def viewer_ready(self, view):
def viewer_ready(self, view: str) -> bool:
"""Request the 'ready' status of a viewer.

:param view: The unique string identifier for the Viewer object
:type view: str

:return: Boolean value indicating whether or not the viewer is
available for requests or updates. This will be false when the plugin
API is not yet available or new data is not yet rendered.
:rtype: bool
"""
return self.viewers.viewer_ready(view)

def _task_cleanup(self, task):
def _task_cleanup(self, task: asyncio.Task) -> None:
"""Callback to discard references to tasks once they've completed.

:param task: Completed task that no longer needs a strong reference
:type task: asyncio.Task
"""
global background_tasks
try:
# "Handle" exceptions here to prevent further errors. Exceptions
Expand All @@ -112,40 +171,83 @@ def _task_cleanup(self, task):
except:
background_tasks.discard(task)

def create_task(self, fn):
def create_task(self, fn: Callable) -> None:
"""Create a task from the function passed in.

:param fn: Coroutine to run concurrently as a Task
:type fn: Callable
"""
global background_tasks
# The event loop only keeps weak references to tasks.
# Gather them into a set to avoid garbage collection mid-task.
task = asyncio.create_task(fn())
background_tasks.add(task)
task.add_done_callback(self._task_cleanup)

def capture_event(self, stream, ident, parent):
def capture_event(self, stream: ZMQStream, ident: list, parent: dict) -> None:
"""Capture execute_request messages so that we can queue and process
them concurrently as tasks to prevent blocking.

:param stream: Class to manage event-based messaging on a zmq socket
:type stream: ZMQStream
:param ident: ZeroMQ routing prefix, which can be zero or more socket
identities
:type ident: list
:param parent: A dictonary of dictionaries representing a complete
message as defined by the Jupyter message specification
:type parent: dict
"""
self._events.put((stream, ident, parent))
if self._events.qsize() == 1 and self.ready_to_run_next_cell():
# We've added a new task to an empty queue.
# Begin executing tasks again.
self.create_task(self.execute_next_request)

async def capture_event_async(self, stream, ident, parent):
async def capture_event_async(
self, stream: ZMQStream, ident: list, parent: dict
) -> None:
"""Capture execute_request messages so that we can queue and process
them concurrently as tasks to prevent blocking.
Asynchronous for ipykernel 6+.

:param stream: Class to manage event-based messaging on a zmq socket
:type stream: ZMQStream
:param ident: ZeroMQ routing prefix, which can be zero or more socket
identities
:type ident: list
:param parent: A dictonary of dictionaries representing a complete
message as defined by the Jupyter message specification
:type parent: dict
"""
# ipykernel 6+
self.capture_event(stream, ident, parent)

@property
def all_getters_resolved(self):
# Check if all of the getter/setter futures have resolved
def all_getters_resolved(self) -> bool:
"""Determine if all tasks representing asynchronous network calls that
fetch values have resolved.

:return: Whether or not all tasks for the current cell have resolved
:rtype: bool
"""
getters_resolved = [f.done() for f in self.results.values()]
return all(getters_resolved)

def ready_to_run_next_cell(self):
# Any itk_viewer objects need to be available and all getters/setters
# need to be resolved
def ready_to_run_next_cell(self) -> bool:
"""Determine if we are ready to run the next cell in the queue.

:return: If created Viewer objects are available and all futures are
resolved.
:rtype: bool
"""
self.waiting_on_viewer = len(self.viewers.not_created)
return self.all_getters_resolved and not self.waiting_on_viewer

async def execute_next_request(self):
# Modeled after the approach used in jupyter-ui-poll
# https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101
async def execute_next_request(self) -> None:
"""Grab the next request if needed and then run the cell if it it ready
to be run. Modeled after the approach used in jupyter-ui-poll.
:ref: https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101
"""
if self._events.empty():
self.abort_all = False

Expand All @@ -157,7 +259,8 @@ async def execute_next_request(self):
# Continue processing the remaining queued tasks
await self._execute_next_request()

async def _execute_next_request(self):
async def _execute_next_request(self) -> None:
"""Run the cell with the ipykernel shell_handler for execute_request"""
# Here we actually run the queued cell as it would have been run
stream, ident, parent = self.current_request

Expand Down Expand Up @@ -187,32 +290,38 @@ async def _execute_next_request(self):
# Continue processing the remaining queued tasks
self.create_task(self.execute_next_request)

def update_namespace(self):
# Update the namespace variables with the results from the getters
def update_namespace(self) -> None:
"""Update the namespace variables with the results from the getters"""
# FIXME: This is a temporary "fix" and does not handle updating output
keys = [k for k in self.shell.user_ns.keys()]
try:
for key in keys:
value = self.shell.user_ns[key]
if asyncio.isfuture(value) and (isinstance(value, FuturePromise) or isinstance(value, asyncio.Task)):
# Getters/setters return futures
# They should all be resolved now, so use the result
self.shell.user_ns[key] = value.result()
self.results.clear()
except Exception as e:
self.results.clear()
self.abort_all = True
self.create_task(self._execute_next_request)
raise e

def _callback(self, *args, **kwargs):
for key in keys:
value = self.shell.user_ns[key]
if asyncio.isfuture(value) and (
isinstance(value, FuturePromise) or isinstance(value, asyncio.Task)
):
# Functions that need to return values from asynchronous
# network requests return futures. They should all be resolved
# now, so use the result.
self.shell.user_ns[key] = value.result()
self.results.clear()

def _callback(self, *args, **kwargs) -> None:
"""After each future resolves check to see if they are all resolved. If
so, update the namespace and run the next cell in the queue.
"""
# After each getter/setter resolves check if they've all resolved
if self.all_getters_resolved:
self.update_namespace()
self.current_request = None
self.create_task(self.execute_next_request)

def post_run_cell(self, response):
def post_run_cell(self, response: ExecutionResult) -> None:
"""Runs after interactive execution (e.g. a cell in a notebook). Set
the abort flag if there are errors produced by cell execution.

:param response: The response message produced by cell execution
:type response: ExecutionResult
"""
# Abort remaining cells on error in execution
if response.error_in_exec is not None:
self.abort_all = True