diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index fdae9a701..4908177b5 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -8,6 +8,8 @@ import sys import typing as t import uuid +from asyncio.futures import Future +from concurrent.futures import Future as CFuture from contextlib import contextmanager from enum import Enum @@ -58,6 +60,11 @@ class KernelManager(ConnectionFileMixin): def __init__(self, *args, **kwargs): super().__init__(**kwargs) self._shutdown_status = _ShutdownStatus.Unset + try: + self._ready = Future() + except RuntimeError: + # No event loop running, use concurrent future + self._ready = CFuture() _created_context: Bool = Bool(False) @@ -139,6 +146,11 @@ def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]: def _default_cache_ports(self) -> bool: return self.transport == "tcp" + @property + def ready(self) -> Future: + """A future that resolves when the kernel process has started for the first time""" + return self._ready + @property def ipykernel(self) -> bool: return self.kernel_name in {"python", "python2", "python3"} @@ -329,12 +341,25 @@ async def _async_start_kernel(self, **kw): keyword arguments that are passed down to build the kernel_cmd and launching the kernel (e.g. Popen kwargs). """ - kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw)) + done = self._ready.done() - # launch the kernel subprocess - self.log.debug("Starting kernel: %s", kernel_cmd) - await ensure_async(self._launch_kernel(kernel_cmd, **kw)) - await ensure_async(self.post_start_kernel(**kw)) + try: + kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw)) + + # launch the kernel subprocess + self.log.debug("Starting kernel: %s", kernel_cmd) + await ensure_async(self._launch_kernel(kernel_cmd, **kw)) + await ensure_async(self.post_start_kernel(**kw)) + if not done: + # Add a small sleep to ensure tests can capture the state before done + await asyncio.sleep(0.01) + self._ready.set_result(None) + + except Exception as e: + if not done: + self._ready.set_exception(e) + self.log.exception(self._ready.exception()) + raise e start_kernel = run_sync(_async_start_kernel) @@ -427,6 +452,10 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ + # Shutdown is a no-op for a kernel that had a failed startup + if self._ready.exception(): + return + self.shutting_down = True # Used by restarter to prevent race condition # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -473,16 +502,19 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False, """ if self._launch_args is None: raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.") - else: - # Stop currently running kernel. - await ensure_async(self.shutdown_kernel(now=now, restart=True)) - if newports: - self.cleanup_random_ports() + if not self._ready.done(): + raise RuntimeError("Cannot restart the kernel. " "Kernel has not fully started.") + + # Stop currently running kernel. + await ensure_async(self.shutdown_kernel(now=now, restart=True)) + + if newports: + self.cleanup_random_ports() - # Start new kernel. - self._launch_args.update(kw) - await ensure_async(self.start_kernel(**self._launch_args)) + # Start new kernel. + self._launch_args.update(kw) + await ensure_async(self.start_kernel(**self._launch_args)) restart_kernel = run_sync(_async_restart_kernel) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 1536e7c63..bb07a4008 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -52,18 +52,17 @@ class MultiKernelManager(LoggingConfigurable): """A class for managing multiple kernels.""" default_kernel_name = Unicode( - NATIVE_KERNEL_NAME, config=True, help="The name of the default kernel to start" - ) + NATIVE_KERNEL_NAME, help="The name of the default kernel to start" + ).tag(config=True) kernel_spec_manager = Instance(KernelSpecManager, allow_none=True) kernel_manager_class = DottedObjectName( "jupyter_client.ioloop.IOLoopKernelManager", - config=True, help="""The kernel manager class. This is configurable to allow subclassing of the KernelManager for customized behavior. """, - ) + ).tag(config=True) @observe("kernel_manager_class") def _kernel_manager_class_changed(self, change): @@ -91,9 +90,8 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager: shared_context = Bool( True, - config=True, help="Share a single zmq.Context to talk to all my kernels", - ) + ).tag(config=True) _created_context = Bool(False) @@ -163,8 +161,11 @@ def pre_start_kernel( async def _add_kernel_when_ready( self, kernel_id: str, km: KernelManager, kernel_awaitable: t.Awaitable ) -> None: - await kernel_awaitable - self._kernels[kernel_id] = km + try: + await kernel_awaitable + self._kernels[kernel_id] = km + finally: + self._starting_kernels.pop(kernel_id, None) async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwargs) -> str: """Start a new kernel. @@ -182,12 +183,16 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg ) ) kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner - fut = asyncio.ensure_future( - self._add_kernel_when_ready(kernel_id, km, ensure_async(km.start_kernel(**kwargs))) - ) + + starter = ensure_async(km.start_kernel(**kwargs)) + fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter)) self._starting_kernels[kernel_id] = fut - await fut - del self._starting_kernels[kernel_id] + + if getattr(self, 'use_pending_kernels', False): + self._kernels[kernel_id] = km + else: + await fut + return kernel_id start_kernel = run_sync(_async_start_kernel) @@ -210,9 +215,13 @@ async def _async_shutdown_kernel( Will the kernel be restarted? """ self.log.info("Kernel shutdown: %s" % kernel_id) - + if kernel_id in self._starting_kernels: + try: + await self._starting_kernels[kernel_id] + except Exception: + self.remove_kernel(kernel_id) + return km = self.get_kernel(kernel_id) - await ensure_async(km.shutdown_kernel(now, restart)) self.remove_kernel(kernel_id) @@ -246,18 +255,11 @@ def remove_kernel(self, kernel_id: str) -> KernelManager: """ return self._kernels.pop(kernel_id, None) - async def _shutdown_starting_kernel(self, kid: str, now: bool) -> None: - if kid in self._starting_kernels: - await self._starting_kernels[kid] - await ensure_async(self.shutdown_kernel(kid, now=now)) - async def _async_shutdown_all(self, now: bool = False) -> None: """Shutdown all kernels.""" kids = self.list_kernel_ids() - futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in kids] - futs += [ - self._shutdown_starting_kernel(kid, now=now) for kid in self._starting_kernels.keys() - ] + kids += list(self._starting_kernels) + futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)] await asyncio.gather(*futs) shutdown_all = run_sync(_async_shutdown_all) @@ -466,6 +468,12 @@ class AsyncMultiKernelManager(MultiKernelManager): """, ) + use_pending_kernels = Bool( + False, + help="""Whether to make kernels available before the process has started. The + kernel has a `.ready` future which can be awaited before connecting""", + ).tag(config=True) + start_kernel = MultiKernelManager._async_start_kernel shutdown_kernel = MultiKernelManager._async_shutdown_kernel shutdown_all = MultiKernelManager._async_shutdown_all diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index 210639172..ed7f7cb51 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -188,6 +188,8 @@ class TestKernelManager: def test_lifecycle(self, km): km.start_kernel(stdout=PIPE, stderr=PIPE) assert km.is_alive() + is_done = km.ready.done() + assert is_done km.restart_kernel(now=True) assert km.is_alive() km.interrupt_kernel() @@ -439,6 +441,8 @@ async def test_lifecycle(self, async_km): await async_km.start_kernel(stdout=PIPE, stderr=PIPE) is_alive = await async_km.is_alive() assert is_alive + is_ready = async_km.ready.done() + assert is_ready await async_km.restart_kernel(now=True) is_alive = await async_km.is_alive() assert is_alive diff --git a/jupyter_client/tests/test_kernelspec.py b/jupyter_client/tests/test_kernelspec.py index 6953d4b18..4c99028ce 100644 --- a/jupyter_client/tests/test_kernelspec.py +++ b/jupyter_client/tests/test_kernelspec.py @@ -18,30 +18,18 @@ import pytest from jupyter_core import paths +from .utils import install_kernel +from .utils import sample_kernel_json from .utils import test_env from jupyter_client import kernelspec -sample_kernel_json = { - "argv": ["cat", "{connection_file}"], - "display_name": "Test kernel", -} - class KernelSpecTests(unittest.TestCase): - def _install_sample_kernel(self, kernels_dir): - """install a sample kernel in a kernels directory""" - sample_kernel_dir = pjoin(kernels_dir, "sample") - os.makedirs(sample_kernel_dir) - json_file = pjoin(sample_kernel_dir, "kernel.json") - with open(json_file, "w") as f: - json.dump(sample_kernel_json, f) - return sample_kernel_dir - def setUp(self): self.env_patch = test_env() self.env_patch.start() - self.sample_kernel_dir = self._install_sample_kernel( - pjoin(paths.jupyter_data_dir(), "kernels") + self.sample_kernel_dir = install_kernel( + pjoin(paths.jupyter_data_dir(), "kernels"), name="sample" ) self.ksm = kernelspec.KernelSpecManager() @@ -87,7 +75,7 @@ def test_find_all_specs(self): def test_kernel_spec_priority(self): td = TemporaryDirectory() self.addCleanup(td.cleanup) - sample_kernel = self._install_sample_kernel(td.name) + sample_kernel = install_kernel(td.name, name="sample") self.ksm.kernel_dirs.append(td.name) kernels = self.ksm.find_kernel_specs() self.assertEqual(kernels["sample"], self.sample_kernel_dir) diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index 7f45ff11f..d9bf7956a 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -1,12 +1,14 @@ """Tests for the notebook kernel and session manager.""" import asyncio import concurrent.futures +import os import sys import uuid from subprocess import PIPE from unittest import TestCase import pytest +from jupyter_core import paths from tornado.testing import AsyncTestCase from tornado.testing import gen_test from traitlets.config.loader import Config @@ -14,9 +16,11 @@ from ..localinterfaces import localhost from .utils import AsyncKMSubclass from .utils import AsyncMKMSubclass +from .utils import install_kernel from .utils import skip_win32 from .utils import SyncKMSubclass from .utils import SyncMKMSubclass +from .utils import test_env from jupyter_client import AsyncKernelManager from jupyter_client import KernelManager from jupyter_client.multikernelmanager import AsyncMultiKernelManager @@ -26,6 +30,10 @@ class TestKernelManager(TestCase): + def setUp(self): + self.env_patch = test_env() + self.env_patch.start() + super().setUp() # static so picklable for multiprocessing on Windows @staticmethod @@ -58,6 +66,7 @@ def _run_lifecycle(km, test_kid=None): else: kid = km.start_kernel(stdout=PIPE, stderr=PIPE) assert km.is_alive(kid) + assert km.get_kernel(kid).ready.done() assert kid in km assert kid in km.list_kernel_ids() assert len(km) == 1, f"{len(km)} != {1}" @@ -220,6 +229,10 @@ def test_subclass_callables(self): class TestAsyncKernelManager(AsyncTestCase): + def setUp(self): + self.env_patch = test_env() + self.env_patch.start() + super().setUp() # static so picklable for multiprocessing on Windows @staticmethod @@ -243,6 +256,13 @@ def _get_ipc_km(): km = AsyncMultiKernelManager(config=c) return km + @staticmethod + def _get_pending_kernels_km(): + c = Config() + c.AsyncMultiKernelManager.use_pending_kernels = True + km = AsyncMultiKernelManager(config=c) + return km + # static so picklable for multiprocessing on Windows @staticmethod async def _run_lifecycle(km, test_kid=None): @@ -334,6 +354,57 @@ async def test_shutdown_all_while_starting(self): # shutdown again is okay, because we have no kernels await km.shutdown_all() + @gen_test + async def test_use_pending_kernels(self): + km = self._get_pending_kernels_km() + kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kernel = km.get_kernel(kid) + assert not kernel.ready.done() + assert kid in km + assert kid in km.list_kernel_ids() + assert len(km) == 1, f"{len(km)} != {1}" + await kernel.ready + await km.restart_kernel(kid, now=True) + assert await km.is_alive(kid) + assert kid in km.list_kernel_ids() + await km.interrupt_kernel(kid) + k = km.get_kernel(kid) + assert isinstance(k, AsyncKernelManager) + await km.shutdown_kernel(kid, now=True) + assert kid not in km, f"{kid} not in {km}" + + @gen_test + async def test_use_pending_kernels_early_restart(self): + km = self._get_pending_kernels_km() + kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kernel = km.get_kernel(kid) + assert not kernel.ready.done() + with pytest.raises(RuntimeError): + await km.restart_kernel(kid, now=True) + await kernel.ready + await km.shutdown_kernel(kid, now=True) + assert kid not in km, f"{kid} not in {km}" + + @gen_test + async def test_use_pending_kernels_early_shutdown(self): + km = self._get_pending_kernels_km() + kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kernel = km.get_kernel(kid) + assert not kernel.ready.done() + await km.shutdown_kernel(kid, now=True) + assert kid not in km, f"{kid} not in {km}" + + @gen_test + async def test_use_pending_kernels_early_interrupt(self): + km = self._get_pending_kernels_km() + kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kernel = km.get_kernel(kid) + assert not kernel.ready.done() + with pytest.raises(RuntimeError): + await km.interrupt_kernel(kid) + await km.shutdown_kernel(kid, now=True) + assert kid not in km, f"{kid} not in {km}" + @gen_test async def test_tcp_cinfo(self): km = self._get_tcp_km() @@ -466,3 +537,30 @@ async def test_subclass_callables(self): assert mkm.call_count("cleanup_resources") == 0 assert kid not in mkm, f"{kid} not in {mkm}" + + @gen_test + async def test_bad_kernelspec(self): + km = self._get_tcp_km() + install_kernel( + os.path.join(paths.jupyter_data_dir(), "kernels"), + argv=["non_existent_executable"], + name="bad", + ) + with pytest.raises(FileNotFoundError): + await km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE) + + @gen_test + async def test_bad_kernelspec_pending(self): + km = self._get_pending_kernels_km() + install_kernel( + os.path.join(paths.jupyter_data_dir(), "kernels"), + argv=["non_existent_executable"], + name="bad", + ) + kernel_id = await km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE) + assert kernel_id in km._starting_kernels + with pytest.raises(FileNotFoundError): + await km.get_kernel(kernel_id).ready + assert kernel_id in km.list_kernel_ids() + await km.shutdown_kernel(kernel_id) + assert kernel_id not in km.list_kernel_ids() diff --git a/jupyter_client/tests/utils.py b/jupyter_client/tests/utils.py index 58d963ddf..ead0e2cdf 100644 --- a/jupyter_client/tests/utils.py +++ b/jupyter_client/tests/utils.py @@ -1,6 +1,7 @@ """Testing utils for jupyter_client tests """ +import json import os import sys from tempfile import TemporaryDirectory @@ -19,6 +20,26 @@ skip_win32 = pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") +sample_kernel_json = { + "argv": ["cat", "{connection_file}"], + "display_name": "Test kernel", +} + + +def install_kernel(kernels_dir, argv=None, name="test", display_name=None): + """install a kernel in a kernels directory""" + kernel_dir = pjoin(kernels_dir, name) + os.makedirs(kernel_dir) + kernel_json = { + "argv": argv or sample_kernel_json["argv"], + "display_name": display_name or sample_kernel_json["display_name"], + } + json_file = pjoin(kernel_dir, "kernel.json") + with open(json_file, "w") as f: + json.dump(kernel_json, f) + return kernel_dir + + class test_env(object): """Set Jupyter path variables to a temporary directory