Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pending kernels #712

Merged
merged 4 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ jobs:
pip freeze

- name: Check types
if: ${{ matrix.python-version != '3.6' }}
run: mypy jupyter_client --exclude '\/tests|kernelspecapp|ioloop|runapp' --install-types --non-interactive

- name: Run the tests
Expand Down
36 changes: 29 additions & 7 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import sys
import typing as t
import uuid
from asyncio.futures import Future
from concurrent.futures import Future as CFuture
from contextlib import contextmanager
from enum import Enum

Expand Down Expand Up @@ -58,6 +60,11 @@ class KernelManager(ConnectionFileMixin):
def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
self._shutdown_status = _ShutdownStatus.Unset
try:
self._ready = Future()
except RuntimeError:
# No event loop running, use concurrent future
self._ready = CFuture()

_created_context: Bool = Bool(False)

Expand Down Expand Up @@ -139,6 +146,11 @@ def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]:
def _default_cache_ports(self) -> bool:
return self.transport == "tcp"

@property
def ready(self) -> Future:
"""A future that resolves when the kernel process has started for the first time"""
return self._ready

@property
def ipykernel(self) -> bool:
return self.kernel_name in {"python", "python2", "python3"}
Expand Down Expand Up @@ -329,12 +341,22 @@ async def _async_start_kernel(self, **kw):
keyword arguments that are passed down to build the kernel_cmd
and launching the kernel (e.g. Popen kwargs).
"""
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))
done = self._ready.done()

try:
kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw))

# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))
if not done:
self._ready.set_result(None)

# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))
except Exception as e:
if not done:
self._ready.set_exception(e)
raise e

start_kernel = run_sync(_async_start_kernel)

Expand Down Expand Up @@ -471,8 +493,8 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False,
Any options specified here will overwrite those used to launch the
kernel.
"""
if self._launch_args is None:
raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.")
if not self._ready.done():
raise RuntimeError("Cannot restart the kernel. " "Kernel has been not fully started.")
else:
# Stop currently running kernel.
await ensure_async(self.shutdown_kernel(now=now, restart=True))
Expand Down
36 changes: 23 additions & 13 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,23 @@ class MultiKernelManager(LoggingConfigurable):
"""A class for managing multiple kernels."""

default_kernel_name = Unicode(
NATIVE_KERNEL_NAME, config=True, help="The name of the default kernel to start"
)
NATIVE_KERNEL_NAME, help="The name of the default kernel to start"
).tag(config=True)

kernel_spec_manager = Instance(KernelSpecManager, allow_none=True)

kernel_manager_class = DottedObjectName(
"jupyter_client.ioloop.IOLoopKernelManager",
config=True,
help="""The kernel manager class. This is configurable to allow
subclassing of the KernelManager for customized behavior.
""",
)
).tag(config=True)

use_pending_kernels = Bool(
False,
help="""Whether to make kernels available before the process has started. The
kernel has a `.ready` future which can be awaited before connecting""",
).tag(config=True)

@observe("kernel_manager_class")
def _kernel_manager_class_changed(self, change):
Expand Down Expand Up @@ -91,9 +96,8 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager:

shared_context = Bool(
True,
config=True,
help="Share a single zmq.Context to talk to all my kernels",
)
).tag(config=True)

_created_context = Bool(False)

Expand Down Expand Up @@ -182,12 +186,18 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
)
)
kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner
fut = asyncio.ensure_future(
self._add_kernel_when_ready(kernel_id, km, ensure_async(km.start_kernel(**kwargs)))
)
self._starting_kernels[kernel_id] = fut
await fut
del self._starting_kernels[kernel_id]

starter = ensure_async(km.start_kernel(**kwargs))

if self.use_pending_kernels:
asyncio.create_task(starter)
blink1073 marked this conversation as resolved.
Show resolved Hide resolved
self._kernels[kernel_id] = km
else:
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
self._starting_kernels[kernel_id] = fut
await fut
del self._starting_kernels[kernel_id]

return kernel_id

start_kernel = run_sync(_async_start_kernel)
Expand All @@ -212,7 +222,7 @@ async def _async_shutdown_kernel(
self.log.info("Kernel shutdown: %s" % kernel_id)

km = self.get_kernel(kernel_id)

await km.ready
await ensure_async(km.shutdown_kernel(now, restart))
self.remove_kernel(kernel_id)

Expand Down