Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further improvements to pending kernels managment #732

Merged
merged 15 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ jobs:
strategy:
matrix:
python-version: ["3.9"]

steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ with Jupyter kernels.
kernels
wrapperkernels
provisioning
pending-kernels

.. toctree::
:maxdepth: 2
Expand Down
36 changes: 36 additions & 0 deletions docs/pending-kernels.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
Pending Kernels
===============

*Added in 7.1.0*

In scenarios where an kernel takes a long time to start (e.g. kernels running remotely), it can be advantageous to immediately return the kernel's model and ID from key methods like ``.start_kernel()`` and ``.shutdown_kernel()``. The kernel will continue its task without blocking other managerial actions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In scenarios where an kernel takes a long time to start (e.g. kernels running remotely), it can be advantageous to immediately return the kernel's model and ID from key methods like ``.start_kernel()`` and ``.shutdown_kernel()``. The kernel will continue its task without blocking other managerial actions.
In scenarios where a kernel takes a long time to start (e.g. kernels running remotely), it can be advantageous to immediately return the kernel's model and ID from key methods like ``.start_kernel()`` and ``.shutdown_kernel()``. The kernel will continue its task without blocking other managerial actions.


This intermediate state is called a **"pending kernel"**.

How they work
-------------

When ``.start_kernel()`` or ``.shutdown_kernel()`` is called, a ``Future`` is created under the ``KernelManager.ready`` property. This property can be awaited anytime to ensure that the kernel moves out of its pending state, e.g.:

.. code-block:: python

# await a Kernel Manager's `.ready` property to
# block further action until the kernel is out
# of its pending state.
await kernel_manager.ready

Once the kernel is finished pending, ``.ready.done()`` will be ``True`` and either 1) ``.ready.result()`` will return ``None`` or 2) ``.ready.exception()`` will return a raised exception

Using pending kernels
---------------------

The most common way to interact with pending kernels is through the ``MultiKernelManager``—the object that manages a collection of kernels—by setting its ``use_pending_kernels`` trait to ``True``. Pending kernels are "opt-in"; they are not used by default in the ``MultiKernelManager``.

When ``use_pending_kernels`` is ``True``, the following changes are made to the ``MultiKernelManager``:

1. ``start_kernel`` and ``stop_kernel`` return immediately while running the pending task in a background thread.
2. The following methods raise a ``RuntimeError`` if a kernel is pending:
* ``restart_kernel``
* ``interrupt_kernel``
* ``shutdown_kernel``
3. ``shutdown_all`` will wait for all pending kernels to become ready before attempting to shut them down.
63 changes: 38 additions & 25 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio
import functools
import os
import re
import signal
Expand Down Expand Up @@ -51,6 +52,35 @@ class _ShutdownStatus(Enum):
SigkillRequest = "SigkillRequest"


def in_pending_state(method):
"""Sets the kernel to a pending state by
creating a fresh Future for the KernelManager's `ready`
attribute. Once the method is finished, set the Future's results.
"""

@functools.wraps(method)
async def wrapper(self, *args, **kwargs):
# Create a future for the decorated method
try:
self._ready = Future()
except RuntimeError:
# No event loop running, use concurrent future
self._ready = CFuture()
try:
# call wrapped method, await, and set the result or exception.
out = await method(self, *args, **kwargs)
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)
return out
except Exception as e:
self._ready.set_exception(e)
self.log.exception(self._ready.exception())
raise e

return wrapper


class KernelManager(ConnectionFileMixin):
"""Manages a single kernel in a subprocess on this host.

Expand All @@ -60,6 +90,7 @@ class KernelManager(ConnectionFileMixin):
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self._shutdown_status = _ShutdownStatus.Unset
# Create a place holder future.
try:
self._ready = Future()
except RuntimeError:
Expand Down Expand Up @@ -329,6 +360,7 @@ async def _async_post_start_kernel(self, **kw) -> None:

post_start_kernel = run_sync(_async_post_start_kernel)

@in_pending_state
async def _async_start_kernel(self, **kw):
"""Starts a kernel on this host in a separate process.

Expand All @@ -341,25 +373,12 @@ async def _async_start_kernel(self, **kw):
keyword arguments that are passed down to build the kernel_cmd
and launching the kernel (e.g. Popen kwargs).
"""
done = self._ready.done()

try:
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))

# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))
if not done:
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))

except Exception as e:
if not done:
self._ready.set_exception(e)
self.log.exception(self._ready.exception())
raise e
# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))

start_kernel = run_sync(_async_start_kernel)

Expand Down Expand Up @@ -434,6 +453,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:

cleanup_resources = run_sync(_async_cleanup_resources)

@in_pending_state
async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False):
"""Attempts to stop the kernel process cleanly.

Expand All @@ -452,10 +472,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
# Shutdown is a no-op for a kernel that had a failed startup
if self._ready.exception():
return

self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand Down Expand Up @@ -503,9 +519,6 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False,
if self._launch_args is None:
raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.")

if not self._ready.done():
raise RuntimeError("Cannot restart the kernel. " "Kernel has not fully started.")

# Stop currently running kernel.
await ensure_async(self.shutdown_kernel(now=now, restart=True))

Expand Down
102 changes: 88 additions & 14 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager:

context = Instance("zmq.Context")

_starting_kernels = Dict()
_pending_kernels = Dict()

@property
def _starting_kernels(self):
"""A shim for backwards compatibility."""
return self._pending_kernels
Comment on lines +102 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be marked as deprecated?


@default("context")
def _context_default(self) -> zmq.Context:
Expand Down Expand Up @@ -165,7 +170,22 @@ async def _add_kernel_when_ready(
await kernel_awaitable
self._kernels[kernel_id] = km
finally:
self._starting_kernels.pop(kernel_id, None)
self._pending_kernels.pop(kernel_id, None)

async def _remove_kernel_when_ready(
self, kernel_id: str, kernel_awaitable: t.Awaitable
) -> None:
try:
await kernel_awaitable
self.remove_kernel(kernel_id)
finally:
self._pending_kernels.pop(kernel_id, None)

def _using_pending_kernels(self):
"""Returns a boolean; a clearer method for determining if
this multikernelmanager is using pending kernels or not
"""
return getattr(self, 'use_pending_kernels', False)

async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwargs) -> str:
"""Start a new kernel.
Expand All @@ -186,17 +206,38 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg

starter = ensure_async(km.start_kernel(**kwargs))
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
self._starting_kernels[kernel_id] = fut

if getattr(self, 'use_pending_kernels', False):
self._pending_kernels[kernel_id] = fut
# Handling a Pending Kernel
if self._using_pending_kernels():
# If using pending kernels, do not block
# on the kernel start.
self._kernels[kernel_id] = km
else:
await fut
# raise an exception if one occurred during kernel startup.
if km.ready.exception():
raise km.ready.exception() # type: ignore

return kernel_id

start_kernel = run_sync(_async_start_kernel)

async def _shutdown_kernel_when_ready(
self,
kernel_id: str,
now: t.Optional[bool] = False,
restart: t.Optional[bool] = False,
) -> None:
"""Wait for a pending kernel to be ready
before shutting the kernel down.
"""
# Only do this if using pending kernels
if self._using_pending_kernels():
kernel = self._kernels[kernel_id]
await kernel.ready
# Once out of a pending state, we can call shutdown.
await ensure_async(self.shutdown_kernel(kernel_id, now=now, restart=restart))

async def _async_shutdown_kernel(
self,
kernel_id: str,
Expand All @@ -215,15 +256,31 @@ async def _async_shutdown_kernel(
Will the kernel be restarted?
"""
self.log.info("Kernel shutdown: %s" % kernel_id)
if kernel_id in self._starting_kernels:
# If we're using pending kernels, block shutdown when a kernel is pending.
if self._using_pending_kernels() and kernel_id in self._pending_kernels:
raise RuntimeError("Kernel is in a pending state. Cannot shutdown.")
# If the kernel is still starting, wait for it to be ready.
elif kernel_id in self._starting_kernels:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the use of _starting_kernels intentional? Technically, this could also contain kernels that are shutting down now, but only when pending kernels are enabled - so I suspect this was for hinting that, in this case, the kernel can only be starting.

Perhaps we could be more explicit (and save an existence check [nit]) via:

        if kernel_id in self._pending_kernels:
            if self._using_pending_kernels():
                 raise RuntimeError("Kernel is in a pending state. Cannot shutdown.")
            else:  # kernel is still starting, wait for its startup
                kernel = self._pending_kernels[kernel_id]
                try:
                    await kernel
                except Exception:
                    self.remove_kernel(kernel_id)

kernel = self._starting_kernels[kernel_id]
try:
await self._starting_kernels[kernel_id]
await kernel
except Exception:
self.remove_kernel(kernel_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to update the _pending_kernels list as well here?

return
km = self.get_kernel(kernel_id)
await ensure_async(km.shutdown_kernel(now, restart))
self.remove_kernel(kernel_id)
# If a pending kernel raised an exception, remove it.
if km.ready.exception():
self.remove_kernel(kernel_id)
return
stopper = ensure_async(km.shutdown_kernel(now, restart))
fut = asyncio.ensure_future(self._remove_kernel_when_ready(kernel_id, stopper))
self._pending_kernels[kernel_id] = fut
# Await the kernel if not using pending kernels.
if not self._using_pending_kernels():
await fut
# raise an exception if one occurred during kernel shutdown.
if km.ready.exception():
raise km.ready.exception() # type: ignore

shutdown_kernel = run_sync(_async_shutdown_kernel)

Expand Down Expand Up @@ -258,13 +315,17 @@ def remove_kernel(self, kernel_id: str) -> KernelManager:
async def _async_shutdown_all(self, now: bool = False) -> None:
"""Shutdown all kernels."""
kids = self.list_kernel_ids()
kids += list(self._starting_kernels)
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)]
kids += list(self._pending_kernels)
futs = [ensure_async(self._shutdown_kernel_when_ready(kid, now=now)) for kid in set(kids)]
await asyncio.gather(*futs)
# When using "shutdown all", all pending kernels
# should be awaited before exiting this method.
if self._using_pending_kernels():
for km in self._kernels.values():
await km.ready

shutdown_all = run_sync(_async_shutdown_all)

@kernel_method
def interrupt_kernel(self, kernel_id: str) -> None:
"""Interrupt (SIGINT) the kernel by its uuid.

Expand All @@ -273,7 +334,12 @@ def interrupt_kernel(self, kernel_id: str) -> None:
kernel_id : uuid
The id of the kernel to interrupt.
"""
kernel = self.get_kernel(kernel_id)
if not kernel.ready.done():
raise RuntimeError("Kernel is in a pending state. Cannot interrupt.")
out = kernel.interrupt_kernel()
self.log.info("Kernel interrupted: %s" % kernel_id)
return out

@kernel_method
def signal_kernel(self, kernel_id: str, signum: int) -> None:
Expand All @@ -291,8 +357,7 @@ def signal_kernel(self, kernel_id: str, signum: int) -> None:
"""
self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum))

@kernel_method
def restart_kernel(self, kernel_id: str, now: bool = False) -> None:
async def _async_restart_kernel(self, kernel_id: str, now: bool = False) -> None:
"""Restart a kernel by its uuid, keeping the same ports.

Parameters
Expand All @@ -307,7 +372,15 @@ def restart_kernel(self, kernel_id: str, now: bool = False) -> None:
In all cases the kernel is restarted, the only difference is whether
it is given a chance to perform a clean shutdown or not.
"""
kernel = self.get_kernel(kernel_id)
if self._using_pending_kernels():
if not kernel.ready.done():
raise RuntimeError("Kernel is in a pending state. Cannot restart.")
out = await ensure_async(kernel.restart_kernel(now=now))
self.log.info("Kernel restarted: %s" % kernel_id)
return out

restart_kernel = run_sync(_async_restart_kernel)

@kernel_method
def is_alive(self, kernel_id: str) -> bool:
Expand Down Expand Up @@ -475,5 +548,6 @@ class AsyncMultiKernelManager(MultiKernelManager):
).tag(config=True)

start_kernel = MultiKernelManager._async_start_kernel
restart_kernel = MultiKernelManager._async_restart_kernel
shutdown_kernel = MultiKernelManager._async_shutdown_kernel
shutdown_all = MultiKernelManager._async_shutdown_all
Loading