From a877478909762dd046927115049b73dab6a519b5 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Tue, 4 Jan 2022 15:55:20 -0800 Subject: [PATCH 01/15] simplify pending state logic and require multikernelmanager to take more responsibility --- jupyter_client/manager.py | 60 +++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 4908177b5..e5485fe9d 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -3,6 +3,7 @@ # Distributed under the terms of the Modified BSD License. import asyncio import os +import functools import re import signal import sys @@ -51,6 +52,32 @@ class _ShutdownStatus(Enum): SigkillRequest = "SigkillRequest" +def in_pending_state(method): + """Sets the kernel to a pending state by + creating a fresh Future for the KernelManager's `ready` + attribute. Once the method is finished, set the Future's results. + """ + @functools.wraps(method) + async def wrapper(self, *args, **kwargs): + # Create a future for the decorated method + try: + self._ready = Future() + except RuntimeError: + # No event loop running, use concurrent future + self._ready = CFuture() + try: + # call wrapped method, await, and set the result or exception. + out = await method(self, *args, **kwargs) + # Add a small sleep to ensure tests can capture the state before done + await asyncio.sleep(0.01) + self._ready.set_result(None) + return out + except Exception as e: + self._ready.set_exception(e) + self.log.exception(self._ready.exception()) + return wrapper + + class KernelManager(ConnectionFileMixin): """Manages a single kernel in a subprocess on this host. @@ -329,6 +356,7 @@ async def _async_post_start_kernel(self, **kw) -> None: post_start_kernel = run_sync(_async_post_start_kernel) + @in_pending_state async def _async_start_kernel(self, **kw): """Starts a kernel on this host in a separate process. @@ -341,25 +369,12 @@ async def _async_start_kernel(self, **kw): keyword arguments that are passed down to build the kernel_cmd and launching the kernel (e.g. Popen kwargs). """ - done = self._ready.done() - - try: - kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw)) + kernel_cmd, kw = await ensure_async(self.pre_start_kernel(**kw)) - # launch the kernel subprocess - self.log.debug("Starting kernel: %s", kernel_cmd) - await ensure_async(self._launch_kernel(kernel_cmd, **kw)) - await ensure_async(self.post_start_kernel(**kw)) - if not done: - # Add a small sleep to ensure tests can capture the state before done - await asyncio.sleep(0.01) - self._ready.set_result(None) - - except Exception as e: - if not done: - self._ready.set_exception(e) - self.log.exception(self._ready.exception()) - raise e + # launch the kernel subprocess + self.log.debug("Starting kernel: %s", kernel_cmd) + await ensure_async(self._launch_kernel(kernel_cmd, **kw)) + await ensure_async(self.post_start_kernel(**kw)) start_kernel = run_sync(_async_start_kernel) @@ -434,6 +449,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None: cleanup_resources = run_sync(_async_cleanup_resources) + @in_pending_state async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False): """Attempts to stop the kernel process cleanly. @@ -452,10 +468,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ - # Shutdown is a no-op for a kernel that had a failed startup - if self._ready.exception(): - return - self.shutting_down = True # Used by restarter to prevent race condition # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -473,6 +485,7 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) await ensure_async(self.cleanup_resources(restart=restart)) + shutdown_kernel = run_sync(_async_shutdown_kernel) async def _async_restart_kernel(self, now: bool = False, newports: bool = False, **kw) -> None: @@ -503,9 +516,6 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False, if self._launch_args is None: raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.") - if not self._ready.done(): - raise RuntimeError("Cannot restart the kernel. " "Kernel has not fully started.") - # Stop currently running kernel. await ensure_async(self.shutdown_kernel(now=now, restart=True)) From 2b8790adbfb7e6e24d67314e8dceb172e04e46ee Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Tue, 4 Jan 2022 16:12:29 -0800 Subject: [PATCH 02/15] remove unnecessary whitespace --- jupyter_client/manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index e5485fe9d..2b5f2f600 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -485,7 +485,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) await ensure_async(self.cleanup_resources(restart=restart)) - shutdown_kernel = run_sync(_async_shutdown_kernel) async def _async_restart_kernel(self, now: bool = False, newports: bool = False, **kw) -> None: From 8076482581b6342d94f3c28a91107188e9176224 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Tue, 4 Jan 2022 16:15:12 -0800 Subject: [PATCH 03/15] precommit hooks --- jupyter_client/manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 2b5f2f600..0ef009083 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -2,8 +2,8 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. import asyncio -import os import functools +import os import re import signal import sys @@ -57,6 +57,7 @@ def in_pending_state(method): creating a fresh Future for the KernelManager's `ready` attribute. Once the method is finished, set the Future's results. """ + @functools.wraps(method) async def wrapper(self, *args, **kwargs): # Create a future for the decorated method @@ -75,6 +76,7 @@ async def wrapper(self, *args, **kwargs): except Exception as e: self._ready.set_exception(e) self.log.exception(self._ready.exception()) + return wrapper From 73ae431b7042255bcc0fce46c69e3f1a14ea718e Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Wed, 5 Jan 2022 15:06:25 -0800 Subject: [PATCH 04/15] handle shutdown and restart for pending kernels in the multikernelmanager --- jupyter_client/multikernelmanager.py | 20 ++++++++++++++-- .../tests/test_multikernelmanager.py | 24 ++++++++++++------- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index bb07a4008..4262d4dc8 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -214,6 +214,14 @@ async def _async_shutdown_kernel( restart : bool Will the kernel be restarted? """ + kernel = self.get_kernel(kernel_id) + if getattr(self, 'use_pending_kernels', False): + # Make sure the previous kernel started before trying to shutdown. + if not kernel.ready.done(): + raise RuntimeError("Kernel is in a pending state. Cannot shutdown.") + # If the kernel didn't start properly, no need to shutdown. + elif kernel.ready.exception(): + return self.log.info("Kernel shutdown: %s" % kernel_id) if kernel_id in self._starting_kernels: try: @@ -291,8 +299,7 @@ def signal_kernel(self, kernel_id: str, signum: int) -> None: """ self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum)) - @kernel_method - def restart_kernel(self, kernel_id: str, now: bool = False) -> None: + async def _async_restart_kernel(self, kernel_id: str, now: bool = False) -> None: """Restart a kernel by its uuid, keeping the same ports. Parameters @@ -307,7 +314,15 @@ def restart_kernel(self, kernel_id: str, now: bool = False) -> None: In all cases the kernel is restarted, the only difference is whether it is given a chance to perform a clean shutdown or not. """ + kernel = self.get_kernel(kernel_id) + if getattr(self, 'use_pending_kernels', False): + if not kernel.ready.done(): + raise RuntimeError("Kernel is in a pending state. Cannot restart.") + out = await ensure_async(kernel.restart_kernel(now=now)) self.log.info("Kernel restarted: %s" % kernel_id) + return out + + restart_kernel = run_sync(_async_restart_kernel) @kernel_method def is_alive(self, kernel_id: str) -> bool: @@ -475,5 +490,6 @@ class AsyncMultiKernelManager(MultiKernelManager): ).tag(config=True) start_kernel = MultiKernelManager._async_start_kernel + restart_kernel = MultiKernelManager._async_restart_kernel shutdown_kernel = MultiKernelManager._async_shutdown_kernel shutdown_all = MultiKernelManager._async_shutdown_all diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index d9bf7956a..fe90820bc 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -29,6 +29,14 @@ TIMEOUT = 30 +async def now(awaitable): + """Use this function ensure that this awaitable + happens before other awaitables defined after it. + """ + (out,) = await asyncio.gather(awaitable) + return out + + class TestKernelManager(TestCase): def setUp(self): self.env_patch = test_env() @@ -357,32 +365,32 @@ async def test_shutdown_all_while_starting(self): @gen_test async def test_use_pending_kernels(self): km = self._get_pending_kernels_km() - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await now(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() assert kid in km assert kid in km.list_kernel_ids() assert len(km) == 1, f"{len(km)} != {1}" - await kernel.ready - await km.restart_kernel(kid, now=True) + await now(kernel.ready) + await now(km.restart_kernel(kid, now=True)) assert await km.is_alive(kid) assert kid in km.list_kernel_ids() - await km.interrupt_kernel(kid) + await now(km.interrupt_kernel(kid)) k = km.get_kernel(kid) assert isinstance(k, AsyncKernelManager) - await km.shutdown_kernel(kid, now=True) + await now(km.shutdown_kernel(kid, now=True)) assert kid not in km, f"{kid} not in {km}" @gen_test async def test_use_pending_kernels_early_restart(self): km = self._get_pending_kernels_km() - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await now(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() with pytest.raises(RuntimeError): await km.restart_kernel(kid, now=True) - await kernel.ready - await km.shutdown_kernel(kid, now=True) + await now(kernel.ready) + await now(km.shutdown_kernel(kid, now=True)) assert kid not in km, f"{kid} not in {km}" @gen_test From 5f97e4e878e0f271952b52877b386f2611dd5839 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Fri, 7 Jan 2022 16:11:00 -0800 Subject: [PATCH 05/15] work updates to tests --- jupyter_client/manager.py | 1 + jupyter_client/multikernelmanager.py | 60 +++++++++++++++---- .../tests/test_multikernelmanager.py | 55 +++++++++++------ 3 files changed, 86 insertions(+), 30 deletions(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 0ef009083..ade92e125 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -89,6 +89,7 @@ class KernelManager(ConnectionFileMixin): def __init__(self, *args, **kwargs): super().__init__(**kwargs) self._shutdown_status = _ShutdownStatus.Unset + # Create a place holder future. try: self._ready = Future() except RuntimeError: diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 4262d4dc8..29c199848 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -97,7 +97,12 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager: context = Instance("zmq.Context") - _starting_kernels = Dict() + _pending_kernels = Dict() + + @property + def _starting_kernels(self): + """A shim for backwards compatibility.""" + return self._pending_kernels @default("context") def _context_default(self) -> zmq.Context: @@ -165,7 +170,22 @@ async def _add_kernel_when_ready( await kernel_awaitable self._kernels[kernel_id] = km finally: - self._starting_kernels.pop(kernel_id, None) + self._pending_kernels.pop(kernel_id, None) + + async def _remove_kernel_when_ready( + self, kernel_id: str, kernel_awaitable: t.Awaitable + ) -> None: + try: + await kernel_awaitable + self.remove_kernel(kernel_id) + finally: + self._pending_kernels.pop(kernel_id, None) + + def _using_pending_kernels(self): + """Returns a boolean; a clearer method for determining if + this multikernelmanager is using pending kernels or not + """ + return getattr(self, 'use_pending_kernels', False) async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwargs) -> str: """Start a new kernel. @@ -186,12 +206,18 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg starter = ensure_async(km.start_kernel(**kwargs)) fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter)) - self._starting_kernels[kernel_id] = fut + self._pending_kernels[kernel_id] = fut - if getattr(self, 'use_pending_kernels', False): + # Handling a Pending Kernel + if self._using_pending_kernels(): + # If using pending kernels, do not block + # on the kernel start. self._kernels[kernel_id] = km else: await fut + # raise an exception if one occurred during kernel startup. + if km.ready.exception(): + raise km.ready.exception() return kernel_id @@ -215,23 +241,28 @@ async def _async_shutdown_kernel( Will the kernel be restarted? """ kernel = self.get_kernel(kernel_id) - if getattr(self, 'use_pending_kernels', False): + if self._using_pending_kernels(): # Make sure the previous kernel started before trying to shutdown. if not kernel.ready.done(): raise RuntimeError("Kernel is in a pending state. Cannot shutdown.") # If the kernel didn't start properly, no need to shutdown. elif kernel.ready.exception(): + self.remove_kernel(kernel_id) return self.log.info("Kernel shutdown: %s" % kernel_id) - if kernel_id in self._starting_kernels: + if kernel_id in self._pending_kernels: try: - await self._starting_kernels[kernel_id] + await self._pending_kernels[kernel_id] except Exception: self.remove_kernel(kernel_id) return km = self.get_kernel(kernel_id) - await ensure_async(km.shutdown_kernel(now, restart)) - self.remove_kernel(kernel_id) + stopper = ensure_async(km.shutdown_kernel(now, restart)) + fut = asyncio.ensure_future(self._remove_kernel_when_ready(kernel_id, stopper)) + self._pending_kernels[kernel_id] = fut + # Await the kernel if not using pending kernels. + if not self._using_pending_kernels(): + await fut shutdown_kernel = run_sync(_async_shutdown_kernel) @@ -266,13 +297,13 @@ def remove_kernel(self, kernel_id: str) -> KernelManager: async def _async_shutdown_all(self, now: bool = False) -> None: """Shutdown all kernels.""" kids = self.list_kernel_ids() - kids += list(self._starting_kernels) + kids += list(self._pending_kernels) futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)] await asyncio.gather(*futs) shutdown_all = run_sync(_async_shutdown_all) - @kernel_method + # @kernel_method def interrupt_kernel(self, kernel_id: str) -> None: """Interrupt (SIGINT) the kernel by its uuid. @@ -281,7 +312,12 @@ def interrupt_kernel(self, kernel_id: str) -> None: kernel_id : uuid The id of the kernel to interrupt. """ + kernel = self.get_kernel(kernel_id) + if not kernel.ready.done(): + raise RuntimeError("Kernel is in a pending state. Cannot interrupt.") + out = kernel.interrupt_kernel() self.log.info("Kernel interrupted: %s" % kernel_id) + return out @kernel_method def signal_kernel(self, kernel_id: str, signum: int) -> None: @@ -315,7 +351,7 @@ async def _async_restart_kernel(self, kernel_id: str, now: bool = False) -> None it is given a chance to perform a clean shutdown or not. """ kernel = self.get_kernel(kernel_id) - if getattr(self, 'use_pending_kernels', False): + if self._using_pending_kernels(): if not kernel.ready.done(): raise RuntimeError("Kernel is in a pending state. Cannot restart.") out = await ensure_async(kernel.restart_kernel(now=now)) diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index fe90820bc..8cd953d60 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -4,6 +4,7 @@ import os import sys import uuid +from asyncio import ensure_future from subprocess import PIPE from unittest import TestCase @@ -354,7 +355,7 @@ async def test_shutdown_all_while_starting(self): self.assertNotIn(kid, km) # Start another kernel - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) self.assertIn(kid, km) self.assertEqual(len(km), 1) await km.shutdown_all() @@ -365,52 +366,69 @@ async def test_shutdown_all_while_starting(self): @gen_test async def test_use_pending_kernels(self): km = self._get_pending_kernels_km() - kid = await now(km.start_kernel(stdout=PIPE, stderr=PIPE)) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() assert kid in km assert kid in km.list_kernel_ids() assert len(km) == 1, f"{len(km)} != {1}" - await now(kernel.ready) - await now(km.restart_kernel(kid, now=True)) - assert await km.is_alive(kid) + # Wait for the kernel to start. + await kernel.ready + await km.restart_kernel(kid, now=True) + out = await km.is_alive(kid) + assert out assert kid in km.list_kernel_ids() - await now(km.interrupt_kernel(kid)) + await km.interrupt_kernel(kid) k = km.get_kernel(kid) assert isinstance(k, AsyncKernelManager) - await now(km.shutdown_kernel(kid, now=True)) + await ensure_future(km.shutdown_kernel(kid, now=True)) + # Wait for the kernel to shutdown + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test async def test_use_pending_kernels_early_restart(self): km = self._get_pending_kernels_km() - kid = await now(km.start_kernel(stdout=PIPE, stderr=PIPE)) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() with pytest.raises(RuntimeError): await km.restart_kernel(kid, now=True) - await now(kernel.ready) - await now(km.shutdown_kernel(kid, now=True)) + await kernel.ready + await ensure_future(km.shutdown_kernel(kid, now=True)) + # Wait for the kernel to shutdown + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test async def test_use_pending_kernels_early_shutdown(self): km = self._get_pending_kernels_km() - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() - await km.shutdown_kernel(kid, now=True) + # Try shutting down while the kernel is pending + with pytest.raises(RuntimeError): + await ensure_future(km.shutdown_kernel(kid, now=True)) + await kernel.ready + # Shutdown once the kernel is ready + await ensure_future(km.shutdown_kernel(kid, now=True)) + # Wait for the kernel to shutdown + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test async def test_use_pending_kernels_early_interrupt(self): km = self._get_pending_kernels_km() - kid = await km.start_kernel(stdout=PIPE, stderr=PIPE) + kid = await ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE)) kernel = km.get_kernel(kid) assert not kernel.ready.done() with pytest.raises(RuntimeError): await km.interrupt_kernel(kid) - await km.shutdown_kernel(kid, now=True) + # Now wait for the kernel to be ready. + await kernel.ready + await ensure_future(km.shutdown_kernel(kid, now=True)) + # Wait for the kernel to shutdown + await kernel.ready assert kid not in km, f"{kid} not in {km}" @gen_test @@ -555,7 +573,7 @@ async def test_bad_kernelspec(self): name="bad", ) with pytest.raises(FileNotFoundError): - await km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE) + await ensure_future(km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE)) @gen_test async def test_bad_kernelspec_pending(self): @@ -565,10 +583,11 @@ async def test_bad_kernelspec_pending(self): argv=["non_existent_executable"], name="bad", ) - kernel_id = await km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE) - assert kernel_id in km._starting_kernels + kernel_id = await ensure_future( + km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE) + ) with pytest.raises(FileNotFoundError): await km.get_kernel(kernel_id).ready assert kernel_id in km.list_kernel_ids() - await km.shutdown_kernel(kernel_id) + await ensure_future(km.shutdown_kernel(kernel_id)) assert kernel_id not in km.list_kernel_ids() From dfc1a85d45bc425fc7e1961b3de5c38b9ac33ba9 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Fri, 7 Jan 2022 16:18:40 -0800 Subject: [PATCH 06/15] ignore mypy error --- jupyter_client/multikernelmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 29c199848..def1db463 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -217,7 +217,7 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg await fut # raise an exception if one occurred during kernel startup. if km.ready.exception(): - raise km.ready.exception() + raise km.ready.exception() # type: ignore return kernel_id From 54c86a0d020eaf8099fdfac05ca31bb627e38b1f Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 10 Jan 2022 08:33:54 -0800 Subject: [PATCH 07/15] fix regression in KM. be sure to raise an exception when a kernel doesn't start properly --- jupyter_client/manager.py | 1 + jupyter_client/multikernelmanager.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index ade92e125..bc3190f25 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -76,6 +76,7 @@ async def wrapper(self, *args, **kwargs): except Exception as e: self._ready.set_exception(e) self.log.exception(self._ready.exception()) + raise e return wrapper diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index def1db463..621a17a83 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -207,7 +207,6 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg starter = ensure_async(km.start_kernel(**kwargs)) fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter)) self._pending_kernels[kernel_id] = fut - # Handling a Pending Kernel if self._using_pending_kernels(): # If using pending kernels, do not block From 94595afb902390dda415be169e68aa81e483c3c3 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 10 Jan 2022 09:49:17 -0800 Subject: [PATCH 08/15] ensure shutdown_kernel is backwards compatible --- jupyter_client/multikernelmanager.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 621a17a83..98267d49d 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -239,19 +239,14 @@ async def _async_shutdown_kernel( restart : bool Will the kernel be restarted? """ - kernel = self.get_kernel(kernel_id) - if self._using_pending_kernels(): - # Make sure the previous kernel started before trying to shutdown. - if not kernel.ready.done(): - raise RuntimeError("Kernel is in a pending state. Cannot shutdown.") - # If the kernel didn't start properly, no need to shutdown. - elif kernel.ready.exception(): - self.remove_kernel(kernel_id) - return + # If the kernel in a pending state? self.log.info("Kernel shutdown: %s" % kernel_id) - if kernel_id in self._pending_kernels: + if self._using_pending_kernels() and kernel_id in self._pending_kernels: + raise RuntimeError("Kernel is in a pending state. Cannot shutdown.") + elif kernel_id in self._pending_kernels: + kernel = self._pending_kernels[kernel_id] try: - await self._pending_kernels[kernel_id] + await kernel except Exception: self.remove_kernel(kernel_id) return @@ -296,7 +291,7 @@ def remove_kernel(self, kernel_id: str) -> KernelManager: async def _async_shutdown_all(self, now: bool = False) -> None: """Shutdown all kernels.""" kids = self.list_kernel_ids() - kids += list(self._pending_kernels) + kids += list(self._starting_kernels) futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)] await asyncio.gather(*futs) From de8d2c5470d0a52df71b945db3bc1689a2b1cf60 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 10 Jan 2022 10:00:45 -0800 Subject: [PATCH 09/15] in shutdown remove kernels that raise exceptions --- jupyter_client/multikernelmanager.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 98267d49d..d5f615d47 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -239,10 +239,11 @@ async def _async_shutdown_kernel( restart : bool Will the kernel be restarted? """ - # If the kernel in a pending state? self.log.info("Kernel shutdown: %s" % kernel_id) + # If we're using pending kernels, block shutdown when a kernel is pending. if self._using_pending_kernels() and kernel_id in self._pending_kernels: raise RuntimeError("Kernel is in a pending state. Cannot shutdown.") + # If the kernel isn't in a ready state, wait for it to be ready. elif kernel_id in self._pending_kernels: kernel = self._pending_kernels[kernel_id] try: @@ -251,6 +252,10 @@ async def _async_shutdown_kernel( self.remove_kernel(kernel_id) return km = self.get_kernel(kernel_id) + # If a pending kernel raised an exception, remove it. + if km.ready.exception(): + self.remove_kernel(kernel_id) + return stopper = ensure_async(km.shutdown_kernel(now, restart)) fut = asyncio.ensure_future(self._remove_kernel_when_ready(kernel_id, stopper)) self._pending_kernels[kernel_id] = fut From 9bc967a4eee13008d8fa31530e1ac038b208672f Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 10 Jan 2022 14:45:46 -0800 Subject: [PATCH 10/15] shutdown all on pending kernels requires waiting for non-pending state --- jupyter_client/multikernelmanager.py | 34 ++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index d5f615d47..459b86eb8 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -222,6 +222,22 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg start_kernel = run_sync(_async_start_kernel) + async def _shutdown_kernel_when_ready( + self, + kernel_id: str, + now: t.Optional[bool] = False, + restart: t.Optional[bool] = False, + ) -> None: + """Wait for a pending kernel to be ready + before shutting the kernel down. + """ + # Only do this if using pending kernels + if self._using_pending_kernels(): + kernel = self._kernels[kernel_id] + await kernel.ready + # Once out of a pending state, we can call shutdown. + await ensure_async(self.shutdown_kernel(kernel_id, now=now, restart=restart)) + async def _async_shutdown_kernel( self, kernel_id: str, @@ -243,9 +259,9 @@ async def _async_shutdown_kernel( # If we're using pending kernels, block shutdown when a kernel is pending. if self._using_pending_kernels() and kernel_id in self._pending_kernels: raise RuntimeError("Kernel is in a pending state. Cannot shutdown.") - # If the kernel isn't in a ready state, wait for it to be ready. - elif kernel_id in self._pending_kernels: - kernel = self._pending_kernels[kernel_id] + # If the kernel is still starting, wait for it to be ready. + elif kernel_id in self._starting_kernels: + kernel = self._starting_kernels[kernel_id] try: await kernel except Exception: @@ -262,6 +278,9 @@ async def _async_shutdown_kernel( # Await the kernel if not using pending kernels. if not self._using_pending_kernels(): await fut + # raise an exception if one occurred during kernel shutdown. + if km.ready.exception(): + raise km.ready.exception() # type: ignore shutdown_kernel = run_sync(_async_shutdown_kernel) @@ -296,9 +315,14 @@ def remove_kernel(self, kernel_id: str) -> KernelManager: async def _async_shutdown_all(self, now: bool = False) -> None: """Shutdown all kernels.""" kids = self.list_kernel_ids() - kids += list(self._starting_kernels) - futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)] + kids += list(self._pending_kernels) + futs = [ensure_async(self._shutdown_kernel_when_ready(kid, now=now)) for kid in set(kids)] await asyncio.gather(*futs) + # When using "shutdown all", all pending kernels + # should be awaited before exiting this method. + if self._using_pending_kernels(): + for km in self._kernels.values(): + await km.ready shutdown_all = run_sync(_async_shutdown_all) From 2d7626cd27b5bddc868874c7012eaa0f9f83b3d9 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 10 Jan 2022 16:58:44 -0800 Subject: [PATCH 11/15] add docs for pending kernels --- docs/index.rst | 1 + docs/pending-kernels.rst | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) create mode 100644 docs/pending-kernels.rst diff --git a/docs/index.rst b/docs/index.rst index f550e23bf..4731a328d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -24,6 +24,7 @@ with Jupyter kernels. kernels wrapperkernels provisioning + pending-kernels .. toctree:: :maxdepth: 2 diff --git a/docs/pending-kernels.rst b/docs/pending-kernels.rst new file mode 100644 index 000000000..4699feb0a --- /dev/null +++ b/docs/pending-kernels.rst @@ -0,0 +1,36 @@ +Pending Kernels +=============== + +*Added in 7.1.0* + +In scenarios where an kernel takes a long time to start (e.g. kernels running remotely), it can be advantageous to immediately return the kernel's model and ID from key methods like ``.start_kernel()`` and ``.shutdown_kernel()``. The kernel will continue its task without blocking other managerial actions. + +This intermediate state is called a **"pending kernel"**. + +How they work +------------- + +When ``.start_kernel()`` or ``.shutdown_kernel()`` is called, a ``Future`` is created under the ``KernelManager.ready`` property. This property can be awaited anytime to ensure that the kernel moves out of its pending state, e.g.: + +.. code-block:: python + + # await a Kernel Manager's `.ready` property to + # block further action until the kernel is out + # of its pending state. + await kernel_manager.ready + +Once the kernel is finished pending, ``.ready.done()`` will be ``True`` and either 1) ``.ready.result()`` will return ``None`` or 2) ``.ready.exception()`` will return a raised exception + +Using pending kernels +--------------------- + +The most common way to interact with pending kernels is through the ``MultiKernelManager``—the object that manages a collection of kernels—by setting its ``use_pending_kernels`` trait to ``True``. Pending kernels are "opt-in"; they are not used by default in the ``MultiKernelManager``. + +When ``use_pending_kernels`` is ``True``, the following changes are made to the ``MultiKernelManager``: + +1. ``start_kernel`` and ``stop_kernel`` return immediately while running the pending task in a background thread. +2. The following methods raise a ``RuntimeError`` if a kernel is pending: + * ``restart_kernel`` + * ``interrupt_kernel`` + * ``shutdown_kernel`` +3. ``shutdown_all`` will wait for all pending kernels to become ready before attempting to shut them down. From a20be0230bf694b1a1689df415b27f887d06a4d8 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Tue, 11 Jan 2022 09:28:05 -0800 Subject: [PATCH 12/15] remove commented out line --- jupyter_client/multikernelmanager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 459b86eb8..defce51a2 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -326,7 +326,6 @@ async def _async_shutdown_all(self, now: bool = False) -> None: shutdown_all = run_sync(_async_shutdown_all) - # @kernel_method def interrupt_kernel(self, kernel_id: str) -> None: """Interrupt (SIGINT) the kernel by its uuid. From da3e79dcb49797194f14f1158791b74deeeae5e8 Mon Sep 17 00:00:00 2001 From: Zachary Sailer Date: Thu, 13 Jan 2022 12:19:52 -0800 Subject: [PATCH 13/15] prevent tests from failing fast --- .github/workflows/downstream.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index 53fd620c6..92e4d3594 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: python-version: ["3.9"] - + fail-fast: false steps: - name: Checkout uses: actions/checkout@v2 From 5841560cc6fe2cdd124899ea2a3336f508bafdc5 Mon Sep 17 00:00:00 2001 From: Zachary Sailer Date: Thu, 13 Jan 2022 12:30:40 -0800 Subject: [PATCH 14/15] continue on errors in github workflow --- .github/workflows/downstream.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index 92e4d3594..48127493f 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -22,12 +22,14 @@ jobs: - name: Test IPykernel uses: jupyterlab/maintainer-tools/.github/actions/downstream-test@v1 + continue-on-error: true with: package_name: ipykernel package_spec: "pyqt5 ipykernel[test]" - name: Test NBClient uses: jupyterlab/maintainer-tools/.github/actions/downstream-test@v1 + continue-on-error: true with: package_name: nbclient env_values: IPYKERNEL_CELL_NAME=\ @@ -40,6 +42,7 @@ jobs: - name: Test nbconvert uses: jupyterlab/maintainer-tools/.github/actions/downstream-test@v1 + continue-on-error: true with: package_name: nbconvert @@ -52,6 +55,7 @@ jobs: - name: Setup conda ${{ matrix.python-version }} uses: conda-incubator/setup-miniconda@v2 + continue-on-error: true with: auto-update-conda: true activate-environment: jupyter_kernel_test From a3824983228b0380693ca40bdbea6e22f9670a15 Mon Sep 17 00:00:00 2001 From: Zachary Sailer Date: Thu, 13 Jan 2022 13:24:07 -0800 Subject: [PATCH 15/15] Remove continue-on-error in github workflow. --- .github/workflows/downstream.yml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index 48127493f..3f765554c 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -12,7 +12,6 @@ jobs: strategy: matrix: python-version: ["3.9"] - fail-fast: false steps: - name: Checkout uses: actions/checkout@v2 @@ -22,14 +21,12 @@ jobs: - name: Test IPykernel uses: jupyterlab/maintainer-tools/.github/actions/downstream-test@v1 - continue-on-error: true with: package_name: ipykernel package_spec: "pyqt5 ipykernel[test]" - name: Test NBClient uses: jupyterlab/maintainer-tools/.github/actions/downstream-test@v1 - continue-on-error: true with: package_name: nbclient env_values: IPYKERNEL_CELL_NAME=\ @@ -42,7 +39,6 @@ jobs: - name: Test nbconvert uses: jupyterlab/maintainer-tools/.github/actions/downstream-test@v1 - continue-on-error: true with: package_name: nbconvert @@ -55,7 +51,6 @@ jobs: - name: Setup conda ${{ matrix.python-version }} uses: conda-incubator/setup-miniconda@v2 - continue-on-error: true with: auto-update-conda: true activate-environment: jupyter_kernel_test