Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pending kernels #712

Merged
merged 4 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import sys
import typing as t
import uuid
from asyncio.futures import Future
from concurrent.futures import Future as CFuture
from contextlib import contextmanager
from enum import Enum

Expand Down Expand Up @@ -58,6 +60,11 @@ class KernelManager(ConnectionFileMixin):
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self._shutdown_status = _ShutdownStatus.Unset
try:
self._ready = Future()
except RuntimeError:
# No event loop running, use concurrent future
self._ready = CFuture()

_created_context: Bool = Bool(False)

Expand Down Expand Up @@ -139,6 +146,11 @@ def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]:
def _default_cache_ports(self) -> bool:
return self.transport == "tcp"

@property
def ready(self) -> Future:
"""A future that resolves when the kernel process has started for the first time"""
return self._ready

@property
def ipykernel(self) -> bool:
return self.kernel_name in {"python", "python2", "python3"}
Expand Down Expand Up @@ -329,12 +341,25 @@ async def _async_start_kernel(self, **kw):
keyword arguments that are passed down to build the kernel_cmd
and launching the kernel (e.g. Popen kwargs).
"""
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))
done = self._ready.done()

# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))
try:
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))

# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))
if not done:
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)

except Exception as e:
if not done:
self._ready.set_exception(e)
self.log.exception(self._ready.exception())
raise e

start_kernel = run_sync(_async_start_kernel)

Expand Down Expand Up @@ -427,6 +452,10 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
# Shutdown is a no-op for a kernel that had a failed startup
if self._ready.exception():
return

self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand Down Expand Up @@ -473,16 +502,19 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False,
"""
if self._launch_args is None:
raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.")
else:
# Stop currently running kernel.
await ensure_async(self.shutdown_kernel(now=now, restart=True))

if newports:
self.cleanup_random_ports()
if not self._ready.done():
raise RuntimeError("Cannot restart the kernel. " "Kernel has not fully started.")

# Stop currently running kernel.
await ensure_async(self.shutdown_kernel(now=now, restart=True))

if newports:
self.cleanup_random_ports()

# Start new kernel.
self._launch_args.update(kw)
await ensure_async(self.start_kernel(**self._launch_args))
# Start new kernel.
self._launch_args.update(kw)
await ensure_async(self.start_kernel(**self._launch_args))

restart_kernel = run_sync(_async_restart_kernel)

Expand Down
56 changes: 32 additions & 24 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,17 @@ class MultiKernelManager(LoggingConfigurable):
"""A class for managing multiple kernels."""

default_kernel_name = Unicode(
NATIVE_KERNEL_NAME, config=True, help="The name of the default kernel to start"
)
NATIVE_KERNEL_NAME, help="The name of the default kernel to start"
).tag(config=True)

kernel_spec_manager = Instance(KernelSpecManager, allow_none=True)

kernel_manager_class = DottedObjectName(
"jupyter_client.ioloop.IOLoopKernelManager",
config=True,
help="""The kernel manager class. This is configurable to allow
subclassing of the KernelManager for customized behavior.
""",
)
).tag(config=True)

@observe("kernel_manager_class")
def _kernel_manager_class_changed(self, change):
Expand Down Expand Up @@ -91,9 +90,8 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager:

shared_context = Bool(
True,
config=True,
help="Share a single zmq.Context to talk to all my kernels",
)
).tag(config=True)

_created_context = Bool(False)

Expand Down Expand Up @@ -163,8 +161,11 @@ def pre_start_kernel(
async def _add_kernel_when_ready(
self, kernel_id: str, km: KernelManager, kernel_awaitable: t.Awaitable
) -> None:
await kernel_awaitable
self._kernels[kernel_id] = km
try:
await kernel_awaitable
self._kernels[kernel_id] = km
finally:
self._starting_kernels.pop(kernel_id, None)

async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwargs) -> str:
"""Start a new kernel.
Expand All @@ -182,12 +183,16 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
)
)
kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner
fut = asyncio.ensure_future(
self._add_kernel_when_ready(kernel_id, km, ensure_async(km.start_kernel(**kwargs)))
)

starter = ensure_async(km.start_kernel(**kwargs))
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
self._starting_kernels[kernel_id] = fut
await fut
del self._starting_kernels[kernel_id]

if getattr(self, 'use_pending_kernels', False):
self._kernels[kernel_id] = km
else:
await fut

return kernel_id

start_kernel = run_sync(_async_start_kernel)
Expand All @@ -210,9 +215,13 @@ async def _async_shutdown_kernel(
Will the kernel be restarted?
"""
self.log.info("Kernel shutdown: %s" % kernel_id)

if kernel_id in self._starting_kernels:
try:
await self._starting_kernels[kernel_id]
except Exception:
self.remove_kernel(kernel_id)
return
km = self.get_kernel(kernel_id)

await ensure_async(km.shutdown_kernel(now, restart))
self.remove_kernel(kernel_id)

Expand Down Expand Up @@ -246,18 +255,11 @@ def remove_kernel(self, kernel_id: str) -> KernelManager:
"""
return self._kernels.pop(kernel_id, None)

async def _shutdown_starting_kernel(self, kid: str, now: bool) -> None:
if kid in self._starting_kernels:
await self._starting_kernels[kid]
await ensure_async(self.shutdown_kernel(kid, now=now))

async def _async_shutdown_all(self, now: bool = False) -> None:
"""Shutdown all kernels."""
kids = self.list_kernel_ids()
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in kids]
futs += [
self._shutdown_starting_kernel(kid, now=now) for kid in self._starting_kernels.keys()
]
kids += list(self._starting_kernels)
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)]
await asyncio.gather(*futs)

shutdown_all = run_sync(_async_shutdown_all)
Expand Down Expand Up @@ -466,6 +468,12 @@ class AsyncMultiKernelManager(MultiKernelManager):
""",
)

use_pending_kernels = Bool(
False,
help="""Whether to make kernels available before the process has started. The
kernel has a `.ready` future which can be awaited before connecting""",
).tag(config=True)

start_kernel = MultiKernelManager._async_start_kernel
shutdown_kernel = MultiKernelManager._async_shutdown_kernel
shutdown_all = MultiKernelManager._async_shutdown_all
4 changes: 4 additions & 0 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ class TestKernelManager:
def test_lifecycle(self, km):
km.start_kernel(stdout=PIPE, stderr=PIPE)
assert km.is_alive()
is_done = km.ready.done()
assert is_done
km.restart_kernel(now=True)
assert km.is_alive()
km.interrupt_kernel()
Expand Down Expand Up @@ -439,6 +441,8 @@ async def test_lifecycle(self, async_km):
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
is_alive = await async_km.is_alive()
assert is_alive
is_ready = async_km.ready.done()
assert is_ready
await async_km.restart_kernel(now=True)
is_alive = await async_km.is_alive()
assert is_alive
Expand Down
22 changes: 5 additions & 17 deletions jupyter_client/tests/test_kernelspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,18 @@
import pytest
from jupyter_core import paths

from .utils import install_kernel
from .utils import sample_kernel_json
from .utils import test_env
from jupyter_client import kernelspec

sample_kernel_json = {
"argv": ["cat", "{connection_file}"],
"display_name": "Test kernel",
}


class KernelSpecTests(unittest.TestCase):
def _install_sample_kernel(self, kernels_dir):
"""install a sample kernel in a kernels directory"""
sample_kernel_dir = pjoin(kernels_dir, "sample")
os.makedirs(sample_kernel_dir)
json_file = pjoin(sample_kernel_dir, "kernel.json")
with open(json_file, "w") as f:
json.dump(sample_kernel_json, f)
return sample_kernel_dir

def setUp(self):
self.env_patch = test_env()
self.env_patch.start()
self.sample_kernel_dir = self._install_sample_kernel(
pjoin(paths.jupyter_data_dir(), "kernels")
self.sample_kernel_dir = install_kernel(
pjoin(paths.jupyter_data_dir(), "kernels"), name="sample"
)

self.ksm = kernelspec.KernelSpecManager()
Expand Down Expand Up @@ -87,7 +75,7 @@ def test_find_all_specs(self):
def test_kernel_spec_priority(self):
td = TemporaryDirectory()
self.addCleanup(td.cleanup)
sample_kernel = self._install_sample_kernel(td.name)
sample_kernel = install_kernel(td.name, name="sample")
self.ksm.kernel_dirs.append(td.name)
kernels = self.ksm.find_kernel_specs()
self.assertEqual(kernels["sample"], self.sample_kernel_dir)
Expand Down
Loading