Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Silvester authored and github-actions[bot] committed Oct 25, 2021
1 parent 09967c1 commit 3c6e373
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 41 deletions.
7 changes: 7 additions & 0 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,14 @@ async def _async_start_kernel(self, **kw):
await ensure_async(self._launch_kernel(kernel_cmd, **kw))
await ensure_async(self.post_start_kernel(**kw))
if not done:
# Add a small sleep to ensure tests can capture the state before done
await asyncio.sleep(0.01)
self._ready.set_result(None)

except Exception as e:
if not done:
self._ready.set_exception(e)
self.log.exception(self._ready.exception())
raise e

start_kernel = run_sync(_async_start_kernel)
Expand Down Expand Up @@ -449,6 +452,10 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
Will this kernel be restarted after it is shutdown. When this
is True, connection files will not be cleaned up.
"""
# Shutdown is a no-op for a kernel that had a failed startup
if self._ready.exception():
return

self.shutting_down = True # Used by restarter to prevent race condition
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()
Expand Down
46 changes: 22 additions & 24 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ class MultiKernelManager(LoggingConfigurable):
""",
).tag(config=True)

use_pending_kernels = Bool(
False,
help="""Whether to make kernels available before the process has started. The
kernel has a `.ready` future which can be awaited before connecting""",
).tag(config=True)

@observe("kernel_manager_class")
def _kernel_manager_class_changed(self, change):
self.kernel_manager_factory = self._create_kernel_manager_factory()
Expand Down Expand Up @@ -167,8 +161,11 @@ def pre_start_kernel(
async def _add_kernel_when_ready(
self, kernel_id: str, km: KernelManager, kernel_awaitable: t.Awaitable
) -> None:
await kernel_awaitable
self._kernels[kernel_id] = km
try:
await kernel_awaitable
self._kernels[kernel_id] = km
finally:
self._starting_kernels.pop(kernel_id, None)

async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwargs) -> str:
"""Start a new kernel.
Expand All @@ -188,15 +185,13 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner

starter = ensure_async(km.start_kernel(**kwargs))
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
self._starting_kernels[kernel_id] = fut

if self.use_pending_kernels:
asyncio.create_task(starter)
if getattr(self, 'use_pending_kernels', False):
self._kernels[kernel_id] = km
else:
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
self._starting_kernels[kernel_id] = fut
await fut
del self._starting_kernels[kernel_id]

return kernel_id

Expand All @@ -220,9 +215,13 @@ async def _async_shutdown_kernel(
Will the kernel be restarted?
"""
self.log.info("Kernel shutdown: %s" % kernel_id)

if kernel_id in self._starting_kernels:
try:
await self._starting_kernels[kernel_id]
except Exception:
self.remove_kernel(kernel_id)
return
km = self.get_kernel(kernel_id)
await km.ready
await ensure_async(km.shutdown_kernel(now, restart))
self.remove_kernel(kernel_id)

Expand Down Expand Up @@ -256,18 +255,11 @@ def remove_kernel(self, kernel_id: str) -> KernelManager:
"""
return self._kernels.pop(kernel_id, None)

async def _shutdown_starting_kernel(self, kid: str, now: bool) -> None:
if kid in self._starting_kernels:
await self._starting_kernels[kid]
await ensure_async(self.shutdown_kernel(kid, now=now))

async def _async_shutdown_all(self, now: bool = False) -> None:
"""Shutdown all kernels."""
kids = self.list_kernel_ids()
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in kids]
futs += [
self._shutdown_starting_kernel(kid, now=now) for kid in self._starting_kernels.keys()
]
kids += list(self._starting_kernels)
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)]
await asyncio.gather(*futs)

shutdown_all = run_sync(_async_shutdown_all)
Expand Down Expand Up @@ -476,6 +468,12 @@ class AsyncMultiKernelManager(MultiKernelManager):
""",
)

use_pending_kernels = Bool(
False,
help="""Whether to make kernels available before the process has started. The
kernel has a `.ready` future which can be awaited before connecting""",
).tag(config=True)

start_kernel = MultiKernelManager._async_start_kernel
shutdown_kernel = MultiKernelManager._async_shutdown_kernel
shutdown_all = MultiKernelManager._async_shutdown_all
4 changes: 4 additions & 0 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ class TestKernelManager:
def test_lifecycle(self, km):
km.start_kernel(stdout=PIPE, stderr=PIPE)
assert km.is_alive()
is_done = km.ready.done()
assert is_done
km.restart_kernel(now=True)
assert km.is_alive()
km.interrupt_kernel()
Expand Down Expand Up @@ -439,6 +441,8 @@ async def test_lifecycle(self, async_km):
await async_km.start_kernel(stdout=PIPE, stderr=PIPE)
is_alive = await async_km.is_alive()
assert is_alive
is_ready = async_km.ready.done()
assert is_ready
await async_km.restart_kernel(now=True)
is_alive = await async_km.is_alive()
assert is_alive
Expand Down
22 changes: 5 additions & 17 deletions jupyter_client/tests/test_kernelspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,18 @@
import pytest
from jupyter_core import paths

from .utils import install_kernel
from .utils import sample_kernel_json
from .utils import test_env
from jupyter_client import kernelspec

sample_kernel_json = {
"argv": ["cat", "{connection_file}"],
"display_name": "Test kernel",
}


class KernelSpecTests(unittest.TestCase):
def _install_sample_kernel(self, kernels_dir):
"""install a sample kernel in a kernels directory"""
sample_kernel_dir = pjoin(kernels_dir, "sample")
os.makedirs(sample_kernel_dir)
json_file = pjoin(sample_kernel_dir, "kernel.json")
with open(json_file, "w") as f:
json.dump(sample_kernel_json, f)
return sample_kernel_dir

def setUp(self):
self.env_patch = test_env()
self.env_patch.start()
self.sample_kernel_dir = self._install_sample_kernel(
pjoin(paths.jupyter_data_dir(), "kernels")
self.sample_kernel_dir = install_kernel(
pjoin(paths.jupyter_data_dir(), "kernels"), name="sample"
)

self.ksm = kernelspec.KernelSpecManager()
Expand Down Expand Up @@ -87,7 +75,7 @@ def test_find_all_specs(self):
def test_kernel_spec_priority(self):
td = TemporaryDirectory()
self.addCleanup(td.cleanup)
sample_kernel = self._install_sample_kernel(td.name)
sample_kernel = install_kernel(td.name, name="sample")
self.ksm.kernel_dirs.append(td.name)
kernels = self.ksm.find_kernel_specs()
self.assertEqual(kernels["sample"], self.sample_kernel_dir)
Expand Down
98 changes: 98 additions & 0 deletions jupyter_client/tests/test_multikernelmanager.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
"""Tests for the notebook kernel and session manager."""
import asyncio
import concurrent.futures
import os
import sys
import uuid
from subprocess import PIPE
from unittest import TestCase

import pytest
from jupyter_core import paths
from tornado.testing import AsyncTestCase
from tornado.testing import gen_test
from traitlets.config.loader import Config

from ..localinterfaces import localhost
from .utils import AsyncKMSubclass
from .utils import AsyncMKMSubclass
from .utils import install_kernel
from .utils import skip_win32
from .utils import SyncKMSubclass
from .utils import SyncMKMSubclass
from .utils import test_env
from jupyter_client import AsyncKernelManager
from jupyter_client import KernelManager
from jupyter_client.multikernelmanager import AsyncMultiKernelManager
Expand All @@ -26,6 +30,10 @@


class TestKernelManager(TestCase):
def setUp(self):
self.env_patch = test_env()
self.env_patch.start()
super().setUp()

# static so picklable for multiprocessing on Windows
@staticmethod
Expand Down Expand Up @@ -58,6 +66,7 @@ def _run_lifecycle(km, test_kid=None):
else:
kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
assert km.is_alive(kid)
assert km.get_kernel(kid).ready.done()
assert kid in km
assert kid in km.list_kernel_ids()
assert len(km) == 1, f"{len(km)} != {1}"
Expand Down Expand Up @@ -220,6 +229,10 @@ def test_subclass_callables(self):


class TestAsyncKernelManager(AsyncTestCase):
def setUp(self):
self.env_patch = test_env()
self.env_patch.start()
super().setUp()

# static so picklable for multiprocessing on Windows
@staticmethod
Expand All @@ -243,6 +256,13 @@ def _get_ipc_km():
km = AsyncMultiKernelManager(config=c)
return km

@staticmethod
def _get_pending_kernels_km():
c = Config()
c.AsyncMultiKernelManager.use_pending_kernels = True
km = AsyncMultiKernelManager(config=c)
return km

# static so picklable for multiprocessing on Windows
@staticmethod
async def _run_lifecycle(km, test_kid=None):
Expand Down Expand Up @@ -334,6 +354,57 @@ async def test_shutdown_all_while_starting(self):
# shutdown again is okay, because we have no kernels
await km.shutdown_all()

@gen_test
async def test_use_pending_kernels(self):
km = self._get_pending_kernels_km()
kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)
kernel = km.get_kernel(kid)
assert not kernel.ready.done()
assert kid in km
assert kid in km.list_kernel_ids()
assert len(km) == 1, f"{len(km)} != {1}"
await kernel.ready
await km.restart_kernel(kid, now=True)
assert await km.is_alive(kid)
assert kid in km.list_kernel_ids()
await km.interrupt_kernel(kid)
k = km.get_kernel(kid)
assert isinstance(k, AsyncKernelManager)
await km.shutdown_kernel(kid, now=True)
assert kid not in km, f"{kid} not in {km}"

@gen_test
async def test_use_pending_kernels_early_restart(self):
km = self._get_pending_kernels_km()
kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)
kernel = km.get_kernel(kid)
assert not kernel.ready.done()
with pytest.raises(RuntimeError):
await km.restart_kernel(kid, now=True)
await kernel.ready
await km.shutdown_kernel(kid, now=True)
assert kid not in km, f"{kid} not in {km}"

@gen_test
async def test_use_pending_kernels_early_shutdown(self):
km = self._get_pending_kernels_km()
kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)
kernel = km.get_kernel(kid)
assert not kernel.ready.done()
await km.shutdown_kernel(kid, now=True)
assert kid not in km, f"{kid} not in {km}"

@gen_test
async def test_use_pending_kernels_early_interrupt(self):
km = self._get_pending_kernels_km()
kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)
kernel = km.get_kernel(kid)
assert not kernel.ready.done()
with pytest.raises(RuntimeError):
await km.interrupt_kernel(kid)
await km.shutdown_kernel(kid, now=True)
assert kid not in km, f"{kid} not in {km}"

@gen_test
async def test_tcp_cinfo(self):
km = self._get_tcp_km()
Expand Down Expand Up @@ -466,3 +537,30 @@ async def test_subclass_callables(self):
assert mkm.call_count("cleanup_resources") == 0

assert kid not in mkm, f"{kid} not in {mkm}"

@gen_test
async def test_bad_kernelspec(self):
km = self._get_tcp_km()
install_kernel(
os.path.join(paths.jupyter_data_dir(), "kernels"),
argv=["non_existent_executable"],
name="bad",
)
with pytest.raises(FileNotFoundError):
await km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE)

@gen_test
async def test_bad_kernelspec_pending(self):
km = self._get_pending_kernels_km()
install_kernel(
os.path.join(paths.jupyter_data_dir(), "kernels"),
argv=["non_existent_executable"],
name="bad",
)
kernel_id = await km.start_kernel(kernel_name="bad", stdout=PIPE, stderr=PIPE)
assert kernel_id in km._starting_kernels
with pytest.raises(FileNotFoundError):
await km.get_kernel(kernel_id).ready
assert kernel_id in km.list_kernel_ids()
await km.shutdown_kernel(kernel_id)
assert kernel_id not in km.list_kernel_ids()
21 changes: 21 additions & 0 deletions jupyter_client/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Testing utils for jupyter_client tests
"""
import json
import os
import sys
from tempfile import TemporaryDirectory
Expand All @@ -19,6 +20,26 @@
skip_win32 = pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows")


sample_kernel_json = {
"argv": ["cat", "{connection_file}"],
"display_name": "Test kernel",
}


def install_kernel(kernels_dir, argv=None, name="test", display_name=None):
"""install a kernel in a kernels directory"""
kernel_dir = pjoin(kernels_dir, name)
os.makedirs(kernel_dir)
kernel_json = {
"argv": argv or sample_kernel_json["argv"],
"display_name": display_name or sample_kernel_json["display_name"],
}
json_file = pjoin(kernel_dir, "kernel.json")
with open(json_file, "w") as f:
json.dump(kernel_json, f)
return kernel_dir


class test_env(object):
"""Set Jupyter path variables to a temporary directory
Expand Down

0 comments on commit 3c6e373

Please sign in to comment.