Skip to content

Commit

Permalink
Adopt jupyter_kernel_mgmt as kernel management framework
Browse files Browse the repository at this point in the history
- Initial work towards using jupyter_kernel_mgmt in jupyter server
- Cherry-pick Notebook PR jupyter/notebook#4837
- Remove jupyter_client, use KernelFinder in Gateway
- Minor refactor to preserve MappingKernelManager behavior
- Session is no longer a Configurable, removed from classes list.
  Also removed some of the Gateway classes that shouldn't be there either.
- Get gateway functionality working
- Fix SessionHandler call to start kernel
- Initial support for async kernel management
- Plumb launch parameters
- Adjust kernel management with recent async updates
- Don't get child_watcher if on Windows
- Fix gateway kernelspec tests to updated JKM call
  Also fixed windows testing by increasing delay during cleanup of session
  and kernel tests - otherwise the temp directory could not be cleaned up,
  resulting in downstream side-affects.
- Require JKM >= 0.5, bump core min release
- Remove install of special patch branches for jkm
- Merge pytest PR, encode/decode kernel type
- Merge/convert missing sessions tests for JKM
- Add session and kernel equality methods
- Fix display of connection count when culling kernel
- Stop ping_callback if stream or socket closed exceptions occur
- Protect against KeyError, raise 404 if kernel_id is not found

Co-authored-by: Thomas Kluyver <[email protected]>
  • Loading branch information
kevin-bates and takluyver committed Jan 16, 2020
1 parent 5f99e53 commit 7f9df58
Show file tree
Hide file tree
Showing 22 changed files with 760 additions and 698 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Adopt Jupyter Kernel Management and Jupyter Protocol for kernel management framework per [JEP #45](https://github.com/jupyter/enhancement-proposals/pull/45) ([#112](https://github.com/jupyter/jupyter_server/pull/112)).

## [0.2.1] - 2020-1-10

### Added
Expand Down Expand Up @@ -56,4 +59,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [Batch 8](https://github.com/jupyter/jupyter_server/pull/106)

### Security
- Added a "secure_write to function for cookie/token saves ([#77](https://github.com/jupyter/jupyter_server/pull/77))
- Added a "secure_write to function for cookie/token saves ([#77](https://github.com/jupyter/jupyter_server/pull/77))
5 changes: 3 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# miniconda bootstrap from conda-forge recipe

matrix:
fast_finish: true

Expand All @@ -24,12 +25,12 @@ platform:
build: false

install:
- cmd: call %CONDA_INSTALL_LOCN%\Scripts\activate.bat
- cmd: call %CONDA_INSTALL_LOCN%\\Scripts\\activate.bat
- cmd: set CONDA_PY=%CONDA_PY%
- cmd: set CONDA_PY_SPEC=%CONDA_PY_SPEC%
- cmd: conda config --set show_channel_urls true
- cmd: conda config --add channels conda-forge
- cmd: conda update --yes --quiet conda
- cmd: conda update -y -q conda
- cmd: conda info -a
- cmd: conda create -y -q -n test-env-%CONDA_PY% python=%CONDA_PY_SPEC% pip pyzmq tornado jupyter_client nbformat nbconvert nose
- cmd: conda activate test-env-%CONDA_PY%
Expand Down
10 changes: 5 additions & 5 deletions jupyter_server/base/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,11 @@ def contents_js_source(self):
#---------------------------------------------------------------
# Manager objects
#---------------------------------------------------------------


@property
def kernel_finder(self):
return self.settings['kernel_finder']

@property
def kernel_manager(self):
return self.settings['kernel_manager']
Expand All @@ -261,10 +265,6 @@ def session_manager(self):
@property
def terminal_manager(self):
return self.settings['terminal_manager']

@property
def kernel_spec_manager(self):
return self.settings['kernel_spec_manager']

@property
def config_manager(self):
Expand Down
182 changes: 10 additions & 172 deletions jupyter_server/base/zmqhandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,81 +4,11 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import json
import struct
import sys
import tornado

from urllib.parse import urlparse
from tornado import gen, ioloop, web
from tornado.websocket import WebSocketHandler

from jupyter_client.session import Session
from jupyter_client.jsonutil import date_default, extract_dates
from ipython_genutils.py3compat import cast_unicode

from .handlers import JupyterHandler
from jupyter_server.utils import maybe_future


def serialize_binary_message(msg):
"""serialize a message as a binary blob
Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
The message serialized to bytes.
"""
# don't modify msg or buffer list in-place
msg = msg.copy()
buffers = list(msg.pop('buffers'))
if sys.version_info < (3, 4):
buffers = [x.tobytes() for x in buffers]
bmsg = json.dumps(msg, default=date_default).encode('utf8')
buffers.insert(0, bmsg)
nbufs = len(buffers)
offsets = [4 * (nbufs + 1)]
for buf in buffers[:-1]:
offsets.append(offsets[-1] + len(buf))
offsets_buf = struct.pack('!' + 'I' * (nbufs + 1), nbufs, *offsets)
buffers.insert(0, offsets_buf)
return b''.join(buffers)


def deserialize_binary_message(bmsg):
"""deserialize a message from a binary blog

Header:
4 bytes: number of msg parts (nbufs) as 32b int
4 * nbufs bytes: offset for each buffer as integer as 32b int
Offsets are from the start of the buffer, including the header.
Returns
-------
message dictionary
"""
nbufs = struct.unpack('!i', bmsg[:4])[0]
offsets = list(struct.unpack('!' + 'I' * nbufs, bmsg[4:4*(nbufs+1)]))
offsets.append(None)
bufs = []
for start, stop in zip(offsets[:-1], offsets[1:]):
bufs.append(bmsg[start:stop])
msg = json.loads(bufs[0].decode('utf8'))
msg['header'] = extract_dates(msg['header'])
msg['parent_header'] = extract_dates(msg['parent_header'])
msg['buffers'] = bufs[1:]
return msg
from tornado import ioloop
from tornado.iostream import StreamClosedError
from tornado.websocket import WebSocketClosedError

# ping interval for keeping websockets alive (30 seconds)
WS_PING_INTERVAL = 30000
Expand Down Expand Up @@ -182,107 +112,15 @@ def send_ping(self):
self.close()
return

self.ping(b'')
try:
self.ping(b'')
except (StreamClosedError, WebSocketClosedError):
# websocket has been closed, stop pinging
self.ping_callback.stop()
return

self.last_ping = now

def on_pong(self, data):
self.last_pong = ioloop.IOLoop.current().time()


class ZMQStreamHandler(WebSocketMixin, WebSocketHandler):

if tornado.version_info < (4,1):
"""Backport send_error from tornado 4.1 to 4.0"""
def send_error(self, *args, **kwargs):
if self.stream is None:
super(WebSocketHandler, self).send_error(*args, **kwargs)
else:
# If we get an uncaught exception during the handshake,
# we have no choice but to abruptly close the connection.
# TODO: for uncaught exceptions after the handshake,
# we can close the connection more gracefully.
self.stream.close()


def _reserialize_reply(self, msg_or_list, channel=None):
"""Reserialize a reply message using JSON.
msg_or_list can be an already-deserialized msg dict or the zmq buffer list.
If it is the zmq list, it will be deserialized with self.session.
This takes the msg list from the ZMQ socket and serializes the result for the websocket.
This method should be used by self._on_zmq_reply to build messages that can
be sent back to the browser.
"""
if isinstance(msg_or_list, dict):
# already unpacked
msg = msg_or_list
else:
idents, msg_list = self.session.feed_identities(msg_or_list)
msg = self.session.deserialize(msg_list)
if channel:
msg['channel'] = channel
if msg['buffers']:
buf = serialize_binary_message(msg)
return buf
else:
smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg)

def _on_zmq_reply(self, stream, msg_list):
# Sometimes this gets triggered when the on_close method is scheduled in the
# eventloop but hasn't been called.
if self.ws_connection is None or stream.closed():
self.log.warning("zmq message arrived on closed channel")
self.close()
return
channel = getattr(stream, 'channel', None)
try:
msg = self._reserialize_reply(msg_list, channel=channel)
except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else:
self.write_message(msg, binary=isinstance(msg, bytes))


class AuthenticatedZMQStreamHandler(ZMQStreamHandler, JupyterHandler):

def set_default_headers(self):
"""Undo the set_default_headers in JupyterHandler
which doesn't make sense for websockets
"""
pass

def pre_get(self):
"""Run before finishing the GET request
Extend this method to add logic that should fire before
the websocket finishes completing.
"""
# authenticate the request before opening the websocket
if self.get_current_user() is None:
self.log.warning("Couldn't authenticate WebSocket connection")
raise web.HTTPError(403)

if self.get_argument('session_id', False):
self.session.session = cast_unicode(self.get_argument('session_id'))
else:
self.log.warning("No session ID specified")

@gen.coroutine
def get(self, *args, **kwargs):
# pre_get can be a coroutine in subclasses
# assign and yield in two step to avoid tornado 3 issues
res = self.pre_get()
yield maybe_future(res)
res = super(AuthenticatedZMQStreamHandler, self).get(*args, **kwargs)
yield maybe_future(res)

def initialize(self):
self.log.debug("Initializing websocket connection %s", self.request.path)
self.session = Session(config=self.config)

def get_compression_options(self):
return self.settings.get('websocket_compression_options', None)
8 changes: 4 additions & 4 deletions jupyter_server/gateway/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from tornado.escape import url_escape, json_decode, utf8

from ipython_genutils.py3compat import cast_unicode
from jupyter_client.session import Session
from jupyter_protocol.session import Session, new_id_bytes
from traitlets.config.configurable import LoggingConfigurable

from .managers import GatewayClient
Expand Down Expand Up @@ -58,7 +58,7 @@ def authenticate(self):

def initialize(self):
self.log.debug("Initializing websocket connection %s", self.request.path)
self.session = Session(config=self.config)
self.session = Session(key=new_id_bytes())
self.gateway = GatewayWebSocketClient(gateway_url=GatewayClient.instance().url)

@gen.coroutine
Expand Down Expand Up @@ -231,8 +231,8 @@ class GatewayResourceHandler(APIHandler):
@web.authenticated
@gen.coroutine
def get(self, kernel_name, path, include_body=True):
ksm = self.kernel_spec_manager
kernel_spec_res = yield ksm.get_kernel_spec_resource(kernel_name, path)
kf = self.kernel_finder
kernel_spec_res = yield kf.get_kernel_spec_resource(kernel_name, path)
if kernel_spec_res is None:
self.log.warning("Kernelspec resource '{}' for '{}' not found. Gateway may not support"
" resource serving.".format(path, kernel_name))
Expand Down
41 changes: 25 additions & 16 deletions jupyter_server/gateway/managers.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import os
import json
import logging

from jupyter_kernel_mgmt.discovery import KernelFinder
from socket import gaierror
from tornado import gen, web
from tornado.concurrent import Future
from tornado.escape import json_encode, json_decode, url_escape
from tornado.httpclient import HTTPClient, AsyncHTTPClient, HTTPError

from ..services.kernels.kernelmanager import MappingKernelManager
from ..services.sessions.sessionmanager import SessionManager

from jupyter_client.kernelspec import KernelSpecManager
from ..utils import url_path_join

from ..utils import url_path_join, maybe_future
from traitlets import Instance, Unicode, Float, Bool, default, validate, TraitError
from traitlets.config import SingletonConfigurable

Expand Down Expand Up @@ -496,14 +497,16 @@ def shutdown_all(self, now=False):
self.remove_kernel(kernel_id)


class GatewayKernelSpecManager(KernelSpecManager):

def __init__(self, **kwargs):
super(GatewayKernelSpecManager, self).__init__(**kwargs)
class GatewayKernelFinder(KernelFinder):
def __init__(self, parent, providers=[]):
super(GatewayKernelFinder, self).__init__(providers=providers)
self.base_endpoint = url_path_join(GatewayClient.instance().url,
GatewayClient.instance().kernelspecs_endpoint)
self.base_resource_endpoint = url_path_join(GatewayClient.instance().url,
GatewayClient.instance().kernelspecs_resource_endpoint)
# Because KernelFinder is not a taitlet/Configurable, we need to simulate a configurable
self.parent = parent
self.log = logging.getLogger(__name__)

def _get_kernelspecs_endpoint_url(self, kernel_name=None):
"""Builds a url for the kernels endpoint
Expand All @@ -517,9 +520,16 @@ def _get_kernelspecs_endpoint_url(self, kernel_name=None):

return self.base_endpoint

@gen.coroutine
def get_all_specs(self):
fetched_kspecs = yield self.list_kernel_specs()
@asyncio.coroutine
def find_kernels(self):
remote_kspecs = yield from self.get_all_specs()

# convert to list of 2 tuples
for kernel_type, attributes in remote_kspecs.items():
yield kernel_type, attributes

async def get_all_specs(self):
fetched_kspecs = await self.list_kernel_specs()

# get the default kernel name and compare to that of this server.
# If different log a warning and reset the default. However, the
Expand All @@ -535,16 +545,15 @@ def get_all_specs(self):
km.default_kernel_name = remote_default_kernel_name

remote_kspecs = fetched_kspecs.get('kernelspecs')
raise gen.Return(remote_kspecs)
return remote_kspecs

@gen.coroutine
def list_kernel_specs(self):
async def list_kernel_specs(self):
"""Get a list of kernel specs."""
kernel_spec_url = self._get_kernelspecs_endpoint_url()
self.log.debug("Request list kernel specs at: %s", kernel_spec_url)
response = yield gateway_request(kernel_spec_url, method='GET')
response = await gateway_request(kernel_spec_url, method='GET')
kernel_specs = json_decode(response.body)
raise gen.Return(kernel_specs)
return kernel_specs

@gen.coroutine
def get_kernel_spec(self, kernel_name, **kwargs):
Expand Down
Loading

0 comments on commit 7f9df58

Please sign in to comment.