diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 5c9c5ece2..d3c8d30ca 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -112,7 +112,7 @@ jobs: pip freeze - name: Check types - run: mypy jupyter_client --exclude '\/tests|kernelspecapp|ioloop|runapp' + run: mypy jupyter_client --exclude '\/tests|kernelspecapp|ioloop|runapp' --install-types --non-interactive - name: Run the tests run: pytest --cov jupyter_client -v jupyter_client diff --git a/docs/api/index.rst b/docs/api/index.rst index 125646f1e..aace26093 100644 --- a/docs/api/index.rst +++ b/docs/api/index.rst @@ -10,3 +10,4 @@ jupyter_client API kernelspec manager client + provisioners diff --git a/docs/api/manager.rst b/docs/api/manager.rst index 659f95db0..66194fddb 100644 --- a/docs/api/manager.rst +++ b/docs/api/manager.rst @@ -9,12 +9,11 @@ manager - starting, stopping, signalling The name of the kernel to launch (see :ref:`kernelspecs`). - .. automethod:: start_kernel + .. autoattribute:: provisioner - .. attribute:: kernel + The kernel provisioner with which this :class:`KernelManager` is communicating. This will generally be a :class:`LocalProvisioner` instance unless the kernelspec indicates otherwise. - Once the kernel has been started, this is the :class:`subprocess.Popen` - class for the kernel process. + .. automethod:: start_kernel .. automethod:: is_alive diff --git a/docs/api/provisioners.rst b/docs/api/provisioners.rst new file mode 100644 index 000000000..d468960e9 --- /dev/null +++ b/docs/api/provisioners.rst @@ -0,0 +1,71 @@ +kernel provisioner apis +======================= + +.. seealso:: + + :doc:`/provisioning` + +.. module:: jupyter_client.provisioning.provisioner_base + +.. autoclass:: KernelProvisionerBase + + .. attribute:: kernel_spec + + The kernel specification associated with the provisioned kernel (see :ref:`kernelspecs`). + + .. attribute:: kernel_id + + The provisioned kernel's ID. + + .. attribute:: connection_info + + The provisioned kernel's connection information. + + + .. autoproperty:: has_process + + .. automethod:: poll + + .. automethod:: wait + + .. automethod:: send_signal + + .. automethod:: kill + + .. automethod:: terminate + + .. automethod:: launch_kernel + + .. automethod:: cleanup + + .. automethod:: shutdown_requested + + .. automethod:: pre_launch + + .. automethod:: post_launch + + .. automethod:: get_provisioner_info + + .. automethod:: load_provisioner_info + + .. automethod:: get_shutdown_wait_time + + .. automethod:: _finalize_env + + .. automethod:: __apply_env_substitutions + +.. module:: jupyter_client.provisioning.local_provisioner + +.. autoclass:: LocalProvisioner + +.. module:: jupyter_client.provisioning.factory + +.. autoclass:: KernelProvisionerFactory + + .. attribute:: default_provisioner_name + + Indicates the name of the provisioner to use when no kernel_provisioner entry is present in the kernel specification. This value can also be specified via the environment variable ``JUPYTER_DEFAULT_PROVISIONER_NAME``. + + .. automethod:: is_provisioner_available + + .. automethod:: create_provisioner_instance diff --git a/docs/index.rst b/docs/index.rst index a238aba40..33667e070 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -23,6 +23,7 @@ with Jupyter kernels. kernels wrapperkernels + provisioning .. toctree:: :maxdepth: 2 diff --git a/docs/provisioning.rst b/docs/provisioning.rst new file mode 100644 index 000000000..aba9fbe81 --- /dev/null +++ b/docs/provisioning.rst @@ -0,0 +1,355 @@ +.. _provisioning + +Customizing the kernel's runtime environment +============================================ + +Kernel Provisioning +~~~~~~~~~~~~~~~~~~~ + +Introduced in the 7.0 release, Kernel Provisioning enables the ability +for third parties to manage the lifecycle of a kernel's runtime +environment. By implementing and configuring a *kernel provisioner*, +third parties now have the ability to provision kernels for different +environments, typically managed by resource managers like Kubernetes, +Hadoop YARN, Slurm, etc. For example, a *Kubernetes Provisioner* would +be responsible for launching a kernel within its own Kubernetes pod, +communicating the kernel's connection information back to the +application (residing in a separate pod), and terminating the pod upon +the kernel's termination. In essence, a kernel provisioner is an +*abstraction layer* between the ``KernelManager`` and today's kernel +*process* (i.e., ``Popen``). + +The kernel manager and kernel provisioner relationship +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Prior to this enhancement, the only extension point for customizing a +kernel's behavior could occur by subclassing ``KernelManager``. This +proved to be a limitation because the Jupyter framework allows for a +single ``KernelManager`` class at any time. While applications could +introduce a ``KernelManager`` subclass of their own, that +``KernelManager`` was then tied directly to *that* application and +thereby not usable as a ``KernelManager`` in another application. As a +result, we consider the ``KernelManager`` class to be an +*application-owned entity* upon which application-specific behaviors can +be implemented. + +Kernel provisioners, on the other hand, are contained within the +``KernelManager`` (i.e., a *has-a* relationship) and applications are +agnostic as to what *kind* of provisioner is in use other than what is +conveyed via the kernel's specification (kernelspec). All kernel +interactions still occur via the ``KernelManager`` and ``KernelClient`` +classes within ``jupyter_client`` and potentially subclassed by the +application. + +Kernel provisioners are not related in any way to the ``KernelManager`` +instance that controls their lifecycle, nor do they have any affinity to +the application within which they are used. They merely provide a +vehicle by which authors can extend the landscape in which a kernel can +reside, while not side-effecting the application. That said, some kernel +provisioners may introduce requirements on the application. For example +(and completely hypothetically speaking), a ``SlurmProvisioner`` may +impose the constraint that the server (``jupyter_client``) resides on an +edge node of the Slurm cluster. These kinds of requirements can be +mitigated by leveraging applications like `Jupyter Kernel Gateway `_ or +`Jupyter Enterprise Gateway `_ +where the gateway server resides on the edge +node of (or within) the cluster, etc. + +Discovery +~~~~~~~~~ + +Kernel provisioning does not alter today's kernel discovery mechanism +that utilizes well-known directories of ``kernel.json`` files. Instead, +it optionally extends the current ``metadata`` stanza within the +``kernel.json`` to include the specification of the kernel provisioner +name, along with an optional ``config`` stanza, consisting of +provisioner-specific configuration items. For example, a container-based +provisioner will likely need to specify the image name in this section. +The important point is that the content of this section is +provisioner-specific. + +.. code:: JSON + + "metadata": { + "kernel_provisioner": { + "provisioner_name": "k8s-provisioner", + "config": { + "image_name": "my_docker_org/kernel:2.1.5", + "max_cpus": 4 + } + } + }, + +Kernel provisioner authors implement their provisioners by deriving from +:class:`KernelProvisionerBase` and expose their provisioner for consumption +via entry-points: + +.. code:: python + + 'jupyter_client.kernel_provisioners': [ + 'k8s-provisioner = my_package:K8sProvisioner', + ], + +Backwards Compatibility +~~~~~~~~~~~~~~~~~~~~~~~ + +Prior to this release, no ``kernel.json`` (kernelspec) will contain a +provisioner entry, yet the framework is now based on using provisioners. +As a result, when a ``kernel_provisioner`` stanza is **not** present in +a selected kernelspec, jupyter client will, by default, use the built-in +``LocalProvisioner`` implementation as its provisioner. This provisioner +retains today's local kernel functionality. It can also be subclassed +for those provisioner authors wanting to extend the functionality of +local kernels. The result of launching a kernel in this manner is +equivalent to the following stanza existing in the ``kernel.json`` file: + +.. code:: JSON + + "metadata": { + "kernel_provisioner": { + "provisioner_name": "local-provisioner", + "config": { + } + } + }, + +Should a given installation wish to use a *different* provisioner as +their "default provisioner" (including subclasses of +``LocalProvisioner``), they can do so by specifying a value for +``KernelProvisionerFactory.default_provisioner_name``. + +Implementing a custom provisioner +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The impact of Kernel Provisioning is that it enables the ability to +implement custom kernel provisioners to manage a kernel's lifecycle +within any runtime environment. There are currently two approaches by +which that can be accomplished, extending the ``KernelProvisionerBase`` +class or extending the built-in class - ``LocalProvisioner``. As more +provisioners are introduced, some may be implemented in an abstract +sense, from which specific implementations can be authored. + +Extending ``LocalProvisioner`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you're interested in running kernels locally and yet adjust their +behavior, there's a good chance you can simply extend +``LocalProvisioner`` via subclassing. This amounts to deriving from +``LocalProvisioner`` and overriding appropriate methods to provide your +custom functionality. + +In this example, RBACProvisioner will verify whether the current user is +in the role meant for this kernel by calling a method implemented within *this* provisioner. If the user is not in the role, an exception will be thrown. + +.. code:: python + + class RBACProvisioner(LocalProvisioner): + + role: str = Unicode(config=True) + + async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]: + + if not self.user_in_role(self.role): + raise PermissionError(f"User is not in role {self.role} and " + f"cannot launch this kernel.") + + return super().pre_launch(**kwargs) + +It is important to note *when* it's necessary to call the superclass in +a given method - since the operations it performs may be critical to the +kernel's management. As a result, you'll likely need to become familiar +with how ``LocalProvisioner`` operates. + +Extending ``KernelProvisionerBase`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you'd like to launch your kernel in an environment other than the +local server, then you will need to consider subclassing :class:`KernelProvisionerBase` +directly. This will allow you to implement the various kernel process +controls relative to your target environment. For instance, if you +wanted to have your kernel hosted in a Hadoop YARN cluster, you will +need to implement process-control methods like :meth:`poll` and :meth:`wait` +to use the YARN REST API. Or, similarly, a Kubernetes-based provisioner +would need to implement the process-control methods using the Kubernetes client +API, etc. + +By modeling the :class:`KernelProvisionerBase` methods after :class:`subprocess.Popen` +a natural mapping between today's kernel lifecycle management takes place. This, +coupled with the ability to add configuration directly into the ``config:`` stanza +of the ``kernel_provisioner`` metadata, allows for things like endpoint address, +image names, namespaces, hosts lists, etc. to be specified relative to your +kernel provisioner implementation. + +The ``kernel_id`` corresponding to the launched kernel and used by the +kernel manager is now available *prior* to the kernel's launch. This +enables provisioners with a unique *key* they can use to discover and +control their kernel when launched into resource-managed clusters such +as Hadoop YARN or Kubernetes. + +.. tip:: + Use ``kernel_id`` as a discovery mechanism from your provisioner! + +Here's a prototyped implementation of a couple of the abstract methods +of :class:`KernelProvisionerBase` for use in an Hadoop YARN cluster to +help illustrate a provisioner's implementation. Note that the built-in +implementation of :class:`LocalProvisioner` can also be used as a reference. + +Notice the internal method ``_get_application_id()``. This method is +what the provisioner uses to determine if the YARN application (i.e., +the kernel) is still running within te cluster. Although the provisioner +doesn't dictate the application id, the application id is +discovered via the application *name* which is a function of ``kernel_id``. + +.. code:: python + + async def poll(self) -> Optional[int]: + """Submitting a new kernel/app to YARN will take a while to be ACCEPTED. + Thus application ID will probably not be available immediately for poll. + So will regard the application as RUNNING when application ID still in + ACCEPTED or SUBMITTED state. + + :return: None if the application's ID is available and state is + ACCEPTED/SUBMITTED/RUNNING. Otherwise 0. + """ + result = 0 + if self._get_application_id(): + state = self._query_app_state_by_id(self.application_id) + if state in YarnProvisioner.initial_states: + result = None + + return result + + + async def send_signal(self, signum): + """Currently only support 0 as poll and other as kill. + + :param signum + :return: + """ + if signum == 0: + return await self.poll() + elif signum == signal.SIGKILL: + return await self.kill() + else: + return await super().send_signal(signum) + +Notice how in some cases we can compose provisioner methods to implement others. For +example, since sending a signal number of 0 is tantamount to polling the process, we +go ahead and call :meth:`poll` to handle `signum` of 0 and :meth:`kill` to handle +`SIGKILL` requests. + +Here we see how ``_get_application_id`` uses the ``kernel_id`` to acquire the application +id - which is the *primary id* for controlling YARN application lifecycles. Since startup +in resource-managed clusters can tend to take much longer than local kernels, you'll typically +need a polling or notification mechanism within your provisioner. In addition, your +provisioner will be asked by the ``KernelManager`` what is an acceptable startup time. +This answer is implemented in the provisioner via the :meth:`get_shutdown_wait_time` method. + +.. code:: python + + def _get_application_id(self, ignore_final_states: bool = False) -> str: + + if not self.application_id: + app = self._query_app_by_name(self.kernel_id) + state_condition = True + if type(app) is dict: + state = app.get('state') + self.last_known_state = state + + if ignore_final_states: + state_condition = state not in YarnProvisioner.final_states + + if len(app.get('id', '')) > 0 and state_condition: + self.application_id = app['id'] + self.log.info(f"ApplicationID: '{app['id']}' assigned for " + f"KernelID: '{self.kernel_id}', state: {state}.") + if not self.application_id: + self.log.debug(f"ApplicationID not yet assigned for KernelID: " + f"'{self.kernel_id}' - retrying...") + return self.application_id + + + def get_shutdown_wait_time(self, recommended: Optional[float] = 5.0) -> float: + + if recommended < yarn_shutdown_wait_time: + recommended = yarn_shutdown_wait_time + self.log.debug(f"{type(self).__name__} shutdown wait time adjusted to " + f"{recommended} seconds.") + + return recommended + +Registering your custom provisioner +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Once your custom provisioner has been authored, it needs to be exposed +as an +`entry point `_. +To do this add the following to your ``setup.py`` (or equivalent) in its +``entry_points`` stanza using the group name +``jupyter_client.kernel_provisioners``: + +:: + + 'jupyter_client.kernel_provisioners': [ + 'rbac-provisioner = acme.rbac.provisioner:RBACProvisioner', + ], + +where: + +- ``rbac-provisioner`` is the *name* of your provisioner and what will + be referenced within the ``kernel.json`` file +- ``acme.rbac.provisioner`` identifies the provisioner module name, and +- ``RBACProvisioner`` is custom provisioner object name + (implementation) that (directly or indirectly) derives from + ``KernelProvisionerBase`` + +Deploying your custom provisioner +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The final step in getting your custom provisioner deployed is to add a +``kernel_provisioner`` stanza to the appropriate ``kernel.json`` files. +This can be accomplished manually or programmatically (in which some +tooling is implemented to create the appropriate ``kernel.json`` file). +In either case, the end result is the same - a ``kernel.json`` file with +the appropriate stanza within ``metadata``. The *vision* is that kernel +provisioner packages will include an application that creates kernel +specifications (i.e., ``kernel.json`` et. al.) pertaining to that +provisioner. + +Following on the previous example of ``RBACProvisioner``, one would find +the following ``kernel.json`` file in directory +``/usr/local/share/jupyter/kernels/rbac_kernel``: + +.. code:: JSON + + { + "argv": ["python", "-m", "ipykernel_launcher", "-f", "{connection_file}"], + "env": {}, + "display_name": "RBAC Kernel", + "language": "python", + "interrupt_mode": "signal", + "metadata": { + "kernel_provisioner": { + "provisioner_name": "rbac-provisioner", + "config": { + "role": "data_scientist" + } + } + } + } + +Listing available kernel provisioners +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +To confirm that your custom provisioner is available for use, +the ``jupyter kernelspec`` command has been extended to include +a `provisioners` sub-command. As a result, running ``jupyter kernelspec provisioners`` +will list the available provisioners by name followed by their module and object +names (colon-separated): + +.. code:: bash + + $ jupyter kernelspec provisioners + + Available kernel provisioners: + local-provisioner jupyter_client.provisioning:LocalProvisioner + rbac-provisioner acme.rbac.provisioner:RBACProvisioner diff --git a/jupyter_client/__init__.py b/jupyter_client/__init__.py index 400bdea63..1aa6feadb 100644 --- a/jupyter_client/__init__.py +++ b/jupyter_client/__init__.py @@ -13,3 +13,5 @@ from .manager import run_kernel # noqa from .multikernelmanager import AsyncMultiKernelManager # noqa from .multikernelmanager import MultiKernelManager # noqa +from .provisioning import KernelProvisionerBase # noqa +from .provisioning import LocalProvisioner # noqa diff --git a/jupyter_client/connect.py b/jupyter_client/connect.py index 418d73207..29ce180e4 100644 --- a/jupyter_client/connect.py +++ b/jupyter_client/connect.py @@ -19,6 +19,7 @@ from typing import Dict from typing import List from typing import Optional +from typing import Set from typing import Tuple from typing import Union @@ -29,29 +30,34 @@ from traitlets import Bool # type: ignore from traitlets import CaselessStrEnum from traitlets import Instance +from traitlets import Int from traitlets import Integer from traitlets import observe from traitlets import Type from traitlets import Unicode from traitlets.config import LoggingConfigurable # type: ignore +from traitlets.config import SingletonConfigurable from .localinterfaces import localhost from .utils import _filefind +# Define custom type for kernel connection info +KernelConnectionInfo = Dict[str, Union[int, str, bytes]] + def write_connection_file( fname: Optional[str] = None, - shell_port: int = 0, - iopub_port: int = 0, - stdin_port: int = 0, - hb_port: int = 0, - control_port: int = 0, + shell_port: Union[Integer, Int, int] = 0, + iopub_port: Union[Integer, Int, int] = 0, + stdin_port: Union[Integer, Int, int] = 0, + hb_port: Union[Integer, Int, int] = 0, + control_port: Union[Integer, Int, int] = 0, ip: str = "", key: bytes = b"", transport: str = "tcp", signature_scheme: str = "hmac-sha256", kernel_name: str = "", -) -> Tuple[str, Dict[str, Union[int, str]]]: +) -> Tuple[str, KernelConnectionInfo]: """Generates a JSON config file, including the selection of random ports. Parameters @@ -139,7 +145,7 @@ def write_connection_file( if hb_port <= 0: hb_port = ports.pop(0) - cfg: Dict[str, Union[int, str]] = dict( + cfg: KernelConnectionInfo = dict( shell_port=shell_port, iopub_port=iopub_port, stdin_port=stdin_port, @@ -250,7 +256,7 @@ def find_connection_file( def tunnel_to_kernel( - connection_info: Union[str, Dict[str, Any]], + connection_info: Union[str, KernelConnectionInfo], sshserver: str, sshkey: Optional[str] = None, ) -> Tuple[Any, ...]: @@ -398,7 +404,7 @@ def _session_default(self): # Connection and ipc file management # -------------------------------------------------------------------------- - def get_connection_info(self, session: bool = False) -> Dict[str, Any]: + def get_connection_info(self, session: bool = False) -> KernelConnectionInfo: """Return the connection info as a dict Parameters @@ -543,7 +549,7 @@ def load_connection_file(self, connection_file: Optional[str] = None) -> None: info = json.load(f) self.load_connection_info(info) - def load_connection_info(self, info: Dict[str, int]) -> None: + def load_connection_info(self, info: KernelConnectionInfo) -> None: """Load connection info from a dict containing connection info. Typically this data comes from a connection file @@ -574,6 +580,19 @@ def load_connection_info(self, info: Dict[str, int]) -> None: if "signature_scheme" in info: self.session.signature_scheme = info["signature_scheme"] + def _force_connection_info(self, info: KernelConnectionInfo) -> None: + """Unconditionally loads connection info from a dict containing connection info. + + Overwrites connection info-based attributes, regardless of their current values + and writes this information to the connection file. + """ + # Reset current ports to 0 and indicate file has not been written to enable override + self._connection_file_written = False + for name in port_names: + setattr(self, name, 0) + self.load_connection_info(info) + self.write_connection_file() + # -------------------------------------------------------------------------- # Creating connected sockets # -------------------------------------------------------------------------- @@ -627,8 +646,44 @@ def connect_control(self, identity: Optional[bytes] = None) -> zmq.sugar.socket. return self._create_connected_socket("control", identity=identity) +class LocalPortCache(SingletonConfigurable): + """ + Used to keep track of local ports in order to prevent race conditions that + can occur between port acquisition and usage by the kernel. All locally- + provisioned kernels should use this mechanism to limit the possibility of + race conditions. Note that this does not preclude other applications from + acquiring a cached but unused port, thereby re-introducing the issue this + class is attempting to resolve (minimize). + See: https://github.com/jupyter/jupyter_client/issues/487 + """ + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.currently_used_ports: Set[int] = set() + + def find_available_port(self, ip: str) -> int: + while True: + tmp_sock = socket.socket() + tmp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, b"\0" * 8) + tmp_sock.bind((ip, 0)) + port = tmp_sock.getsockname()[1] + tmp_sock.close() + + # This is a workaround for https://github.com/jupyter/jupyter_client/issues/487 + # We prevent two kernels to have the same ports. + if port not in self.currently_used_ports: + self.currently_used_ports.add(port) + return port + + def return_port(self, port: int) -> None: + if port in self.currently_used_ports: # Tolerate uncached ports + self.currently_used_ports.remove(port) + + __all__ = [ "write_connection_file", "find_connection_file", "tunnel_to_kernel", + "KernelConnectionInfo", + "LocalPortCache", ] diff --git a/jupyter_client/kernelspec.py b/jupyter_client/kernelspec.py index 7ecc765dd..99b35447d 100644 --- a/jupyter_client/kernelspec.py +++ b/jupyter_client/kernelspec.py @@ -21,6 +21,8 @@ from traitlets import Unicode from traitlets.config import LoggingConfigurable # type: ignore +from .provisioning import KernelProvisionerFactory as KPF + pjoin = os.path.join NATIVE_KERNEL_NAME = "python3" @@ -204,6 +206,7 @@ def _get_kernel_spec_by_name(self, kernel_name, resource_dir): """Returns a :class:`KernelSpec` instance for a given kernel_name and resource_dir. """ + kspec = None if kernel_name == NATIVE_KERNEL_NAME: try: from ipykernel.kernelspec import RESOURCES, get_kernel_dict @@ -212,9 +215,14 @@ def _get_kernel_spec_by_name(self, kernel_name, resource_dir): pass else: if resource_dir == RESOURCES: - return self.kernel_spec_class(resource_dir=resource_dir, **get_kernel_dict()) + kspec = self.kernel_spec_class(resource_dir=resource_dir, **get_kernel_dict()) + if not kspec: + kspec = self.kernel_spec_class.from_resource_dir(resource_dir) + + if not KPF.instance(parent=self.parent).is_provisioner_available(kspec): + raise NoSuchKernel(kernel_name) - return self.kernel_spec_class.from_resource_dir(resource_dir) + return kspec def _find_spec_directory(self, kernel_name): """Find the resource directory of a named kernel spec""" @@ -240,13 +248,12 @@ def get_kernel_spec(self, kernel_name): """ if not _is_valid_kernel_name(kernel_name): self.log.warning( - "Kernelspec name %r is invalid: %s", - kernel_name, - _kernel_name_description, + f"Kernelspec name {kernel_name} is invalid: {_kernel_name_description}" ) resource_dir = self._find_spec_directory(kernel_name.lower()) if resource_dir is None: + self.log.warning(f"Kernelspec name {kernel_name} cannot be found!") raise NoSuchKernel(kernel_name) return self._get_kernel_spec_by_name(kernel_name, resource_dir) @@ -277,6 +284,8 @@ def get_all_specs(self): spec = self.get_kernel_spec(kname) res[kname] = {"resource_dir": resource_dir, "spec": spec.to_dict()} + except NoSuchKernel: + pass # The appropriate warning has already been logged except Exception: self.log.warning("Error loading kernelspec %r", kname, exc_info=True) return res diff --git a/jupyter_client/kernelspecapp.py b/jupyter_client/kernelspecapp.py index da310d3be..0690840e2 100644 --- a/jupyter_client/kernelspecapp.py +++ b/jupyter_client/kernelspecapp.py @@ -17,6 +17,7 @@ from . import __version__ from .kernelspec import KernelSpecManager +from .provisioning.factory import KernelProvisionerFactory class ListKernelSpecs(JupyterApp): @@ -270,6 +271,22 @@ def start(self): self.exit(e) +class ListProvisioners(JupyterApp): + version = __version__ + description = """List available provisioners for use in kernel specifications.""" + + def start(self): + kfp = KernelProvisionerFactory.instance(parent=self) + print("Available kernel provisioners:") + provisioners = kfp.get_provisioner_entries() + + # pad to width of longest kernel name + name_len = len(sorted(provisioners, key=lambda name: len(name))[-1]) + + for name in sorted(provisioners): + print(f" {name.ljust(name_len)} {provisioners[name]}") + + class KernelSpecApp(Application): version = __version__ name = "jupyter kernelspec" @@ -288,6 +305,7 @@ class KernelSpecApp(Application): InstallNativeKernelSpec, InstallNativeKernelSpec.description.splitlines()[0], ), + "provisioners": (ListProvisioners, ListProvisioners.description.splitlines()[0]), } ) diff --git a/jupyter_client/launcher.py b/jupyter_client/launcher.py index eec1354d5..818b4c81a 100644 --- a/jupyter_client/launcher.py +++ b/jupyter_client/launcher.py @@ -155,14 +155,18 @@ def launch_kernel( # Allow to use ~/ in the command or its arguments cmd = [os.path.expanduser(s) for s in cmd] proc = Popen(cmd, **kwargs) - except Exception: - msg = "Failed to run command:\n{}\n" " PATH={!r}\n" " with kwargs:\n{!r}\n" - # exclude environment variables, - # which may contain access tokens and the like. - without_env = {key: value for key, value in kwargs.items() if key != "env"} - msg = msg.format(cmd, env.get("PATH", os.defpath), without_env) - get_logger().error(msg) - raise + except Exception as ex: + try: + msg = "Failed to run command:\n{}\n" " PATH={!r}\n" " with kwargs:\n{!r}\n" + # exclude environment variables, + # which may contain access tokens and the like. + without_env = {key: value for key, value in kwargs.items() if key != "env"} + msg = msg.format(cmd, env.get("PATH", os.defpath), without_env) + get_logger().error(msg) + except Exception as ex2: # Don't let a formatting/logger issue lead to the wrong exception + print(f"Failed to run command: '{cmd}' due to exception: {ex}") + print(f"The following exception occurred handling the previous failure: {ex2}") + raise ex if sys.platform == "win32": # Attach the interrupt event to the Popen objet so it can be used later. diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 07d1bfa67..fe23d3acd 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -7,9 +7,9 @@ import signal import sys import typing as t +import uuid from contextlib import contextmanager from enum import Enum -from subprocess import Popen import zmq from traitlets import Any # type: ignore @@ -25,14 +25,13 @@ from traitlets.utils.importstring import import_item # type: ignore from .connect import ConnectionFileMixin -from .localinterfaces import is_local_ip -from .localinterfaces import local_ips from .managerabc import KernelManagerABC +from .provisioning import KernelProvisionerBase +from .provisioning import KernelProvisionerFactory as KPF from .utils import ensure_async from .utils import run_sync from jupyter_client import KernelClient from jupyter_client import kernelspec -from jupyter_client import launch_kernel class _ShutdownStatus(Enum): @@ -57,7 +56,7 @@ class KernelManager(ConnectionFileMixin): """ def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + super().__init__(**kwargs) self._shutdown_status = _ShutdownStatus.Unset _created_context: Bool = Bool(False) @@ -84,9 +83,12 @@ def _client_factory_default(self) -> Type: def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None: self.client_factory = import_item(str(change["new"])) - # The kernel process with which the KernelManager is communicating. - # generally a Popen instance - kernel: Any = Any() + kernel_id: str = Unicode(None, allow_none=True) + + # The kernel provisioner with which this KernelManager is communicating. + # This will generally be a LocalProvisioner instance unless the kernelspec + # indicates otherwise. + provisioner: t.Optional[KernelProvisionerBase] = None kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager) @@ -104,11 +106,13 @@ def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None: config=True, help="Time to wait for a kernel to terminate before killing it, " "in seconds. When a shutdown request is initiated, the kernel " - "will be immediately send and interrupt (SIGINT), followed" + "will be immediately sent an interrupt (SIGINT), followed" "by a shutdown_request message, after 1/2 of `shutdown_wait_time`" "it will be sent a terminate (SIGTERM) request, and finally at " "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate " - "and kill may be equivalent on windows.", + "and kill may be equivalent on windows. Note that this value can be" + "overridden by the in-use kernel provisioner since shutdown times may" + "vary by provisioned environment.", ) kernel_name: Unicode = Unicode(kernelspec.NATIVE_KERNEL_NAME) @@ -242,12 +246,18 @@ def from_ns(match): return [pat.sub(from_ns, arg) for arg in cmd] - async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw) -> Popen: + async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw) -> None: """actually launch the kernel override in a subclass to launch kernel subprocesses differently + Note that provisioners can now be used to customize kernel environments + and """ - return launch_kernel(kernel_cmd, **kw) + assert self.provisioner is not None + connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw) + assert self.provisioner.has_process + # Provisioner provides the connection information. Load into kernel manager and write file. + self._force_connection_info(connection_info) _launch_kernel = run_sync(_async_launch_kernel) @@ -264,7 +274,7 @@ def _close_control_socket(self) -> None: self._control_socket.close() self._control_socket = None - def pre_start_kernel(self, **kw) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]: + async def pre_start_kernel(self, **kw) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]: """Prepares a kernel for startup in a separate process. If random ports (port=0) are being used, this method must be called @@ -276,59 +286,31 @@ def pre_start_kernel(self, **kw) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]: keyword arguments that are passed down to build the kernel_cmd and launching the kernel (e.g. Popen kwargs). """ - self.shutting_down = False - if self.transport == "tcp" and not is_local_ip(self.ip): - raise RuntimeError( - "Can only launch a kernel on a local interface. " - "This one is not: %s." - "Make sure that the '*_address' attributes are " - "configured properly. " - "Currently valid addresses are: %s" % (self.ip, local_ips()) - ) - - # write connection file / get default ports - self.write_connection_file() - + self.kernel_id = self.kernel_id or kw.pop('kernel_id', str(uuid.uuid4())) # save kwargs for use in restart self._launch_args = kw.copy() - # build the Popen cmd - extra_arguments = kw.pop("extra_arguments", []) - kernel_cmd = self.format_kernel_cmd(extra_arguments=extra_arguments) - env = kw.pop("env", os.environ).copy() - # Don't allow PYTHONEXECUTABLE to be passed to kernel process. - # If set, it can bork all the things. - env.pop("PYTHONEXECUTABLE", None) - - # Environment variables from kernel spec are added to os.environ. - assert self.kernel_spec is not None - env.update(self._get_env_substitutions(self.kernel_spec.env, env)) - - kw["env"] = env + if self.provisioner is None: # will not be None on restarts + self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance( + self.kernel_id, + self.kernel_spec, + parent=self, + ) + kw = await self.provisioner.pre_launch(**kw) + kernel_cmd = kw.pop('cmd') return kernel_cmd, kw - def _get_env_substitutions( - self, - templated_env: t.Optional[t.Dict[str, str]], - substitution_values: t.Dict[str, str], - ) -> t.Optional[t.Dict[str, str]]: - """Walks env entries in templated_env and applies possible substitutions from current env - (represented by substitution_values). - Returns the substituted list of env entries. + async def post_start_kernel(self, **kw) -> None: + """Performs any post startup tasks relative to the kernel. + + Parameters + ---------- + `**kw` : optional + keyword arguments that were used in the kernel process's launch. """ - substituted_env = {} - if templated_env: - from string import Template - - # For each templated env entry, fill any templated references - # matching names of env variables with those values and build - # new dict with substitutions. - for k, v in templated_env.items(): - substituted_env.update({k: Template(v).safe_substitute(substitution_values)}) - return substituted_env - - def post_start_kernel(self, **kw) -> None: self.start_restarter() self._connect_control_socket() + assert self.provisioner is not None + await self.provisioner.post_launch(**kw) async def _async_start_kernel(self, **kw): """Starts a kernel on this host in a separate process. @@ -342,25 +324,31 @@ async def _async_start_kernel(self, **kw): keyword arguments that are passed down to build the kernel_cmd and launching the kernel (e.g. Popen kwargs). """ - kernel_cmd, kw = self.pre_start_kernel(**kw) + kernel_cmd, kw = await self.pre_start_kernel(**kw) # launch the kernel subprocess self.log.debug("Starting kernel: %s", kernel_cmd) - self.kernel = await ensure_async(self._launch_kernel(kernel_cmd, **kw)) - self.post_start_kernel(**kw) + await ensure_async(self._launch_kernel(kernel_cmd, **kw)) + await self.post_start_kernel(**kw) start_kernel = run_sync(_async_start_kernel) - def request_shutdown(self, restart: bool = False) -> None: + async def request_shutdown(self, restart: bool = False) -> None: """Send a shutdown request via control channel""" content = dict(restart=restart) msg = self.session.msg("shutdown_request", content=content) # ensure control socket is connected self._connect_control_socket() self.session.send(self._control_socket, msg) + assert self.provisioner is not None + await self.provisioner.shutdown_requested(restart=restart) + self._shutdown_status = _ShutdownStatus.ShutdownRequest async def _async_finish_shutdown( - self, waittime: t.Optional[float] = None, pollinterval: float = 0.1 + self, + waittime: t.Optional[float] = None, + pollinterval: float = 0.1, + restart: t.Optional[bool] = False, ) -> None: """Wait for kernel shutdown, then kill process if it doesn't shutdown. @@ -369,7 +357,9 @@ async def _async_finish_shutdown( """ if waittime is None: waittime = max(self.shutdown_wait_time, 0) - self._shutdown_status = _ShutdownStatus.ShutdownRequest + if self.provisioner: # Allow provisioner to override + waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime) + try: await asyncio.wait_for( self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 @@ -386,17 +376,16 @@ async def _async_finish_shutdown( except asyncio.TimeoutError: self.log.debug("Kernel is taking too long to finish, killing") self._shutdown_status = _ShutdownStatus.SigkillRequest - await ensure_async(self._kill_kernel()) + await ensure_async(self._kill_kernel(restart=restart)) else: # Process is no longer alive, wait and clear - if self.kernel is not None: - while self.kernel.poll() is None: - await asyncio.sleep(pollinterval) - self.kernel = None + if self.has_kernel: + assert self.provisioner is not None + await self.provisioner.wait() finish_shutdown = run_sync(_async_finish_shutdown) - def cleanup_resources(self, restart: bool = False) -> None: + async def _async_cleanup_resources(self, restart: bool = False) -> None: """Clean up resources when the kernel is shut down""" if not restart: self.cleanup_connection_file() @@ -408,6 +397,11 @@ def cleanup_resources(self, restart: bool = False) -> None: if self._created_context and not restart: self.context.destroy(linger=100) + if self.provisioner: + await self.provisioner.cleanup(restart=restart) + + cleanup_resources = run_sync(_async_cleanup_resources) + async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False): """Attempts to stop the kernel process cleanly. @@ -435,13 +429,13 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) if now: await ensure_async(self._kill_kernel()) else: - self.request_shutdown(restart=restart) + await ensure_async(self.request_shutdown(restart=restart)) # Don't send any additional kernel kill messages immediately, to give # the kernel a chance to properly execute shutdown actions. Wait for at # most 1s, checking every 0.1s. - await ensure_async(self.finish_shutdown()) + await ensure_async(self.finish_shutdown(restart=restart)) - self.cleanup_resources(restart=restart) + await ensure_async(self.cleanup_resources(restart=restart)) shutdown_kernel = run_sync(_async_shutdown_kernel) @@ -487,67 +481,25 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False, @property def has_kernel(self) -> bool: - """Has a kernel been started that we are managing.""" - return self.kernel is not None + """Has a kernel process been started that we are actively managing.""" + return self.provisioner is not None and self.provisioner.has_process - async def _async_send_kernel_sigterm(self) -> None: + async def _async_send_kernel_sigterm(self, restart: bool = False) -> None: """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" if self.has_kernel: - # Signal the kernel to terminate (sends SIGTERM on Unix and - # if the kernel is a subprocess and we are on windows; this is - # equivalent to kill - try: - if hasattr(self.kernel, "terminate"): - self.kernel.terminate() - elif hasattr(signal, "SIGTERM"): - await self._async_signal_kernel(signal.SIGTERM) - else: - self.log.debug( - "Cannot set term signal to kernel, no" - " `.terminate()` method and no values for SIGTERM" - ) - except OSError as e: - # In Windows, we will get an Access Denied error if the process - # has already terminated. Ignore it. - if sys.platform == "win32": - if e.winerror != 5: # type: ignore - raise - # On Unix, we may get an ESRCH error if the process has already - # terminated. Ignore it. - else: - from errno import ESRCH - - if e.errno != ESRCH: - raise + assert self.provisioner is not None + await self.provisioner.terminate(restart=restart) _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm) - async def _async_kill_kernel(self) -> None: + async def _async_kill_kernel(self, restart: bool = False) -> None: """Kill the running kernel. This is a private method, callers should use shutdown_kernel(now=True). """ if self.has_kernel: - # Signal the kernel to terminate (sends SIGKILL on Unix and calls - # TerminateProcess() on Win32). - try: - if hasattr(signal, "SIGKILL"): - await self._async_signal_kernel(signal.SIGKILL) # type: ignore - else: - self.kernel.kill() - except OSError as e: - # In Windows, we will get an Access Denied error if the process - # has already terminated. Ignore it. - if sys.platform == "win32": - if e.winerror != 5: # type: ignore - raise - # On Unix, we may get an ESRCH error if the process has already - # terminated. Ignore it. - else: - from errno import ESRCH - - if e.errno != ESRCH: - raise + assert self.provisioner is not None + await self.provisioner.kill(restart=restart) # Wait until the kernel terminates. try: @@ -558,10 +510,8 @@ async def _async_kill_kernel(self) -> None: pass else: # Process is no longer alive, wait and clear - if self.kernel is not None: - while self.kernel.poll() is None: - await asyncio.sleep(0.1) - self.kernel = None + if self.has_kernel: + await self.provisioner.wait() _kill_kernel = run_sync(_async_kill_kernel) @@ -575,12 +525,7 @@ async def _async_interrupt_kernel(self) -> None: assert self.kernel_spec is not None interrupt_mode = self.kernel_spec.interrupt_mode if interrupt_mode == "signal": - if sys.platform == "win32": - from .win_interrupt import send_interrupt - - send_interrupt(self.kernel.win32_interrupt_event) - else: - await self._async_signal_kernel(signal.SIGINT) + await self._async_signal_kernel(signal.SIGINT) elif interrupt_mode == "message": msg = self.session.msg("interrupt_request", content={}) @@ -600,14 +545,8 @@ async def _async_signal_kernel(self, signum: int) -> None: only useful on Unix systems. """ if self.has_kernel: - if hasattr(os, "getpgid") and hasattr(os, "killpg"): - try: - pgid = os.getpgid(self.kernel.pid) # type: ignore - os.killpg(pgid, signum) # type: ignore - return - except OSError: - pass - self.kernel.send_signal(signum) + assert self.provisioner is not None + await self.provisioner.send_signal(signum) else: raise RuntimeError("Cannot signal kernel. No kernel is running!") @@ -616,13 +555,11 @@ async def _async_signal_kernel(self, signum: int) -> None: async def _async_is_alive(self) -> bool: """Is the kernel process still running?""" if self.has_kernel: - if self.kernel.poll() is None: + assert self.provisioner is not None + ret = await self.provisioner.poll() + if ret is None: return True - else: - return False - else: - # we don't have a kernel - return False + return False is_alive = run_sync(_async_is_alive) @@ -645,6 +582,7 @@ class AsyncKernelManager(KernelManager): _launch_kernel = KernelManager._async_launch_kernel start_kernel = KernelManager._async_start_kernel finish_shutdown = KernelManager._async_finish_shutdown + cleanup_resources = KernelManager._async_cleanup_resources shutdown_kernel = KernelManager._async_shutdown_kernel restart_kernel = KernelManager._async_restart_kernel _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm diff --git a/jupyter_client/multikernelmanager.py b/jupyter_client/multikernelmanager.py index 05d944dc8..f9e25d809 100644 --- a/jupyter_client/multikernelmanager.py +++ b/jupyter_client/multikernelmanager.py @@ -65,12 +65,6 @@ class MultiKernelManager(LoggingConfigurable): """, ) - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # Cache all the currently used ports - self.currently_used_ports = set() - @observe("kernel_manager_class") def _kernel_manager_class_changed(self, change): self.kernel_manager_factory = self._create_kernel_manager_factory() @@ -91,33 +85,10 @@ def create_kernel_manager(*args, **kwargs) -> KernelManager: self.context = self._context_default() kwargs.setdefault("context", self.context) km = kernel_manager_ctor(*args, **kwargs) - - if km.cache_ports: - km.shell_port = self._find_available_port(km.ip) - km.iopub_port = self._find_available_port(km.ip) - km.stdin_port = self._find_available_port(km.ip) - km.hb_port = self._find_available_port(km.ip) - km.control_port = self._find_available_port(km.ip) - return km return create_kernel_manager - def _find_available_port(self, ip: str) -> int: - while True: - tmp_sock = socket.socket() - tmp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, b"\0" * 8) - tmp_sock.bind((ip, 0)) - port = tmp_sock.getsockname()[1] - tmp_sock.close() - - # This is a workaround for https://github.com/jupyter/jupyter_client/issues/487 - # We prevent two kernels to have the same ports. - if port not in self.currently_used_ports: - self.currently_used_ports.add(port) - - return port - shared_context = Bool( True, config=True, @@ -210,6 +181,7 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg km_class=self.kernel_manager_class.__class__ ) ) + kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner fut = asyncio.ensure_future( self._add_kernel_when_ready(kernel_id, km, ensure_async(km.start_kernel(**kwargs))) ) @@ -241,21 +213,9 @@ async def _async_shutdown_kernel( km = self.get_kernel(kernel_id) - ports = ( - km.shell_port, - km.iopub_port, - km.stdin_port, - km.hb_port, - km.control_port, - ) - await ensure_async(km.shutdown_kernel(now, restart)) self.remove_kernel(kernel_id) - if km.cache_ports and not restart: - for port in ports: - self.currently_used_ports.remove(port) - shutdown_kernel = run_sync(_async_shutdown_kernel) @kernel_method @@ -324,6 +284,8 @@ def signal_kernel(self, kernel_id: str, signum: int) -> None: ========== kernel_id : uuid The id of the kernel to signal. + signum : int + Signal number to send kernel. """ self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum)) @@ -335,6 +297,13 @@ def restart_kernel(self, kernel_id: str, now: bool = False) -> None: ========== kernel_id : uuid The id of the kernel to interrupt. + now : bool, optional + If True, the kernel is forcefully restarted *immediately*, without + having a chance to do any cleanup action. Otherwise the kernel is + given 1s to clean up before a forceful restart is issued. + + In all cases the kernel is restarted, the only difference is whether + it is given a chance to perform a clean shutdown or not. """ self.log.info("Kernel restarted: %s" % kernel_id) diff --git a/jupyter_client/provisioning/__init__.py b/jupyter_client/provisioning/__init__.py new file mode 100644 index 000000000..2d6c47aee --- /dev/null +++ b/jupyter_client/provisioning/__init__.py @@ -0,0 +1,3 @@ +from .factory import KernelProvisionerFactory # noqa +from .local_provisioner import LocalProvisioner # noqa +from .provisioner_base import KernelProvisionerBase # noqa diff --git a/jupyter_client/provisioning/factory.py b/jupyter_client/provisioning/factory.py new file mode 100644 index 000000000..212e03625 --- /dev/null +++ b/jupyter_client/provisioning/factory.py @@ -0,0 +1,173 @@ +"""Kernel Provisioner Classes""" +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. +import os +from typing import Any +from typing import Dict +from typing import List + +from entrypoints import EntryPoint # type: ignore +from entrypoints import get_group_all +from entrypoints import get_single +from entrypoints import NoSuchEntryPoint +from traitlets.config import default # type: ignore +from traitlets.config import SingletonConfigurable +from traitlets.config import Unicode + +from .provisioner_base import KernelProvisionerBase + + +class KernelProvisionerFactory(SingletonConfigurable): + """ + :class:`KernelProvisionerFactory` is responsible for creating provisioner instances. + + A singleton instance, `KernelProvisionerFactory` is also used by the :class:`KernelSpecManager` + to validate `kernel_provisioner` references found in kernel specifications to confirm their + availability (in cases where the kernel specification references a kernel provisioner that has + not been installed into the current Python environment). + + It's `default_provisioner_name` attribute can be used to specify the default provisioner + to use when a kernel_spec is found to not reference a provisioner. It's value defaults to + `"local-provisioner"` which identifies the local provisioner implemented by + :class:`LocalProvisioner`. + """ + + GROUP_NAME = 'jupyter_client.kernel_provisioners' + provisioners: Dict[str, EntryPoint] = {} + + default_provisioner_name_env = "JUPYTER_DEFAULT_PROVISIONER_NAME" + default_provisioner_name = Unicode( + config=True, + help="""Indicates the name of the provisioner to use when no kernel_provisioner + entry is present in the kernelspec.""", + ) + + @default('default_provisioner_name') + def default_provisioner_name_default(self): + return os.getenv(self.default_provisioner_name_env, "local-provisioner") + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + + for ep in KernelProvisionerFactory._get_all_provisioners(): + self.provisioners[ep.name] = ep + + def is_provisioner_available(self, kernel_spec: Any) -> bool: + """ + Reads the associated ``kernel_spec`` to determine the provisioner and returns whether it + exists as an entry_point (True) or not (False). If the referenced provisioner is not + in the current cache or cannot be loaded via entry_points, a warning message is issued + indicating it is not available. + """ + is_available: bool = True + provisioner_cfg = self._get_provisioner_config(kernel_spec) + provisioner_name = str(provisioner_cfg.get('provisioner_name')) + if not self._check_availability(provisioner_name): + is_available = False + self.log.warning( + f"Kernel '{kernel_spec.display_name}' is referencing a kernel " + f"provisioner ('{provisioner_name}') that is not available. " + f"Ensure the appropriate package has been installed and retry." + ) + return is_available + + def create_provisioner_instance( + self, kernel_id: str, kernel_spec: Any, parent: Any + ) -> KernelProvisionerBase: + """ + Reads the associated ``kernel_spec`` to see if it has a `kernel_provisioner` stanza. + If one exists, it instantiates an instance. If a kernel provisioner is not + specified in the kernel specification, a default provisioner stanza is fabricated + and instantiated corresponding to the current value of `default_provisioner_name` trait. + The instantiated instance is returned. + + If the provisioner is found to not exist (not registered via entry_points), + `ModuleNotFoundError` is raised. + """ + provisioner_cfg = self._get_provisioner_config(kernel_spec) + provisioner_name = str(provisioner_cfg.get('provisioner_name')) + if not self._check_availability(provisioner_name): + raise ModuleNotFoundError( + f"Kernel provisioner '{provisioner_name}' has not been registered." + ) + + self.log.debug( + f"Instantiating kernel '{kernel_spec.display_name}' with " + f"kernel provisioner: {provisioner_name}" + ) + provisioner_class = self.provisioners[provisioner_name].load() + provisioner_config = provisioner_cfg.get('config') + provisioner: KernelProvisionerBase = provisioner_class( + kernel_id=kernel_id, kernel_spec=kernel_spec, parent=parent, **provisioner_config + ) + return provisioner + + def _check_availability(self, provisioner_name: str) -> bool: + """ + Checks that the given provisioner is available. + + If the given provisioner is not in the current set of loaded provisioners an attempt + is made to fetch the named entry point and, if successful, loads it into the cache. + + :param provisioner_name: + :return: + """ + is_available = True + if provisioner_name not in self.provisioners: + try: + ep = KernelProvisionerFactory._get_provisioner(provisioner_name) + self.provisioners[provisioner_name] = ep # Update cache + except NoSuchEntryPoint: + is_available = False + return is_available + + def _get_provisioner_config(self, kernel_spec: Any) -> Dict[str, Any]: + """ + Return the kernel_provisioner stanza from the kernel_spec. + + Checks the kernel_spec's metadata dictionary for a kernel_provisioner entry. + If found, it is returned, else one is created relative to the DEFAULT_PROVISIONER + and returned. + + Parameters + ---------- + kernel_spec : Any - this is a KernelSpec type but listed as Any to avoid circular import + The kernel specification object from which the provisioner dictionary is derived. + + Returns + ------- + dict + The provisioner portion of the kernel_spec. If one does not exist, it will contain + the default information. If no `config` sub-dictionary exists, an empty `config` + dictionary will be added. + """ + env_provisioner = kernel_spec.metadata.get('kernel_provisioner', {}) + if 'provisioner_name' in env_provisioner: # If no provisioner_name, return default + if ( + 'config' not in env_provisioner + ): # if provisioner_name, but no config stanza, add one + env_provisioner.update({"config": {}}) + return env_provisioner # Return what we found (plus config stanza if necessary) + return {"provisioner_name": self.default_provisioner_name, "config": {}} + + def get_provisioner_entries(self) -> Dict[str, str]: + """ + Returns a dictionary of provisioner entries. + + The key is the provisioner name for its entry point. The value is the colon-separated + string of the entry point's module name and object name. + """ + entries = {} + for name, ep in self.provisioners.items(): + entries[name] = f"{ep.module_name}:{ep.object_name}" + return entries + + @staticmethod + def _get_all_provisioners() -> List[EntryPoint]: + """Wrapper around entrypoints.get_group_all() - primarily to facilitate testing.""" + return get_group_all(KernelProvisionerFactory.GROUP_NAME) + + @staticmethod + def _get_provisioner(name: str) -> EntryPoint: + """Wrapper around entrypoints.get_single() - primarily to facilitate testing.""" + return get_single(KernelProvisionerFactory.GROUP_NAME, name) diff --git a/jupyter_client/provisioning/local_provisioner.py b/jupyter_client/provisioning/local_provisioner.py new file mode 100644 index 000000000..2b2264f9c --- /dev/null +++ b/jupyter_client/provisioning/local_provisioner.py @@ -0,0 +1,211 @@ +"""Kernel Provisioner Classes""" +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. +import asyncio +import os +import signal +import sys +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +from ..connect import KernelConnectionInfo +from ..connect import LocalPortCache +from ..launcher import launch_kernel +from ..localinterfaces import is_local_ip +from ..localinterfaces import local_ips +from .provisioner_base import KernelProvisionerBase + + +class LocalProvisioner(KernelProvisionerBase): + """ + :class:`LocalProvisioner` is a concrete class of ABC :py:class:`KernelProvisionerBase` + and is the out-of-box default implementation used when no kernel provisioner is + specified in the kernel specification (``kernel.json``). It provides functional + parity to existing applications by launching the kernel locally and using + :class:`subprocess.Popen` to manage its lifecycle. + + This class is intended to be subclassed for customizing local kernel environments + and serve as a reference implementation for other custom provisioners. + """ + + process = None + _exit_future = None + pid = None + pgid = None + ip = None + ports_cached = False + + @property + def has_process(self) -> bool: + return self.process is not None + + async def poll(self) -> Optional[int]: + + ret = 0 + if self.process: + ret = self.process.poll() + return ret + + async def wait(self) -> Optional[int]: + ret = 0 + if self.process: + # Use busy loop at 100ms intervals, polling until the process is + # not alive. If we find the process is no longer alive, complete + # its cleanup via the blocking wait(). Callers are responsible for + # issuing calls to wait() using a timeout (see kill()). + while await self.poll() is None: + await asyncio.sleep(0.1) + + # Process is no longer alive, wait and clear + ret = self.process.wait() + self.process = None # allow has_process to now return False + return ret + + async def send_signal(self, signum: int) -> None: + """Sends a signal to the process group of the kernel (this + usually includes the kernel and any subprocesses spawned by + the kernel). + + Note that since only SIGTERM is supported on Windows, we will + check if the desired signal is for interrupt and apply the + applicable code on Windows in that case. + """ + if self.process: + if signum == signal.SIGINT and sys.platform == 'win32': + from ..win_interrupt import send_interrupt + + send_interrupt(self.process.win32_interrupt_event) + return + + # Prefer process-group over process + if self.pgid and hasattr(os, "killpg"): + try: + os.killpg(self.pgid, signum) + return + except OSError: + pass + try: + self.process.send_signal(signum) + except OSError: + pass + return + + async def kill(self, restart: bool = False) -> None: + if self.process: + try: + self.process.kill() + except OSError as e: + # In Windows, we will get an Access Denied error if the process + # has already terminated. Ignore it. + if sys.platform == 'win32': + if e.winerror != 5: + raise + # On Unix, we may get an ESRCH error if the process has already + # terminated. Ignore it. + else: + from errno import ESRCH + + if e.errno != ESRCH: + raise + + async def terminate(self, restart: bool = False) -> None: + if self.process: + return self.process.terminate() + + async def cleanup(self, restart: bool = False) -> None: + if self.ports_cached and not restart: + # provisioner is about to be destroyed, return cached ports + lpc = LocalPortCache.instance() + ports = ( + self.connection_info['shell_port'], + self.connection_info['iopub_port'], + self.connection_info['stdin_port'], + self.connection_info['hb_port'], + self.connection_info['control_port'], + ) + for port in ports: + lpc.return_port(port) + + async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]: + """Perform any steps in preparation for kernel process launch. + + This includes applying additional substitutions to the kernel launch command and env. + It also includes preparation of launch parameters. + + Returns the updated kwargs. + """ + + # This should be considered temporary until a better division of labor can be defined. + km = self.parent + if km: + if km.transport == 'tcp' and not is_local_ip(km.ip): + raise RuntimeError( + "Can only launch a kernel on a local interface. " + "This one is not: %s." + "Make sure that the '*_address' attributes are " + "configured properly. " + "Currently valid addresses are: %s" % (km.ip, local_ips()) + ) + # build the Popen cmd + extra_arguments = kwargs.pop('extra_arguments', []) + + # write connection file / get default ports + # TODO - change when handshake pattern is adopted + if km.cache_ports and not self.ports_cached: + lpc = LocalPortCache.instance() + km.shell_port = lpc.find_available_port(km.ip) + km.iopub_port = lpc.find_available_port(km.ip) + km.stdin_port = lpc.find_available_port(km.ip) + km.hb_port = lpc.find_available_port(km.ip) + km.control_port = lpc.find_available_port(km.ip) + self.ports_cached = True + + km.write_connection_file() + self.connection_info = km.get_connection_info() + + kernel_cmd = km.format_kernel_cmd( + extra_arguments=extra_arguments + ) # This needs to remain here for b/c + else: + extra_arguments = kwargs.pop('extra_arguments', []) + kernel_cmd = self.kernel_spec.argv + extra_arguments + + return await super().pre_launch(cmd=kernel_cmd, **kwargs) + + async def launch_kernel(self, cmd: List[str], **kwargs: Any) -> KernelConnectionInfo: + scrubbed_kwargs = LocalProvisioner._scrub_kwargs(kwargs) + self.process = launch_kernel(cmd, **scrubbed_kwargs) + pgid = None + if hasattr(os, "getpgid"): + try: + pgid = os.getpgid(self.process.pid) # type: ignore + except OSError: + pass + + self.pid = self.process.pid + self.pgid = pgid + return self.connection_info + + @staticmethod + def _scrub_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]: + """Remove any keyword arguments that Popen does not tolerate.""" + keywords_to_scrub: List[str] = ['extra_arguments', 'kernel_id'] + scrubbed_kwargs = kwargs.copy() + for kw in keywords_to_scrub: + scrubbed_kwargs.pop(kw, None) + return scrubbed_kwargs + + async def get_provisioner_info(self) -> Dict: + """Captures the base information necessary for persistence relative to this instance.""" + provisioner_info = await super().get_provisioner_info() + provisioner_info.update({'pid': self.pid, 'pgid': self.pgid, 'ip': self.ip}) + return provisioner_info + + async def load_provisioner_info(self, provisioner_info: Dict) -> None: + """Loads the base information necessary for persistence relative to this instance.""" + await super().load_provisioner_info(provisioner_info) + self.pid = provisioner_info['pid'] + self.pgid = provisioner_info['pgid'] + self.ip = provisioner_info['ip'] diff --git a/jupyter_client/provisioning/provisioner_base.py b/jupyter_client/provisioning/provisioner_base.py new file mode 100644 index 000000000..2ff350419 --- /dev/null +++ b/jupyter_client/provisioning/provisioner_base.py @@ -0,0 +1,253 @@ +"""Kernel Provisioner Classes""" +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. +import os +from abc import ABC +from abc import ABCMeta +from abc import abstractmethod +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +from traitlets.config import Instance # type: ignore +from traitlets.config import LoggingConfigurable +from traitlets.config import Unicode + +from ..connect import KernelConnectionInfo + + +class KernelProvisionerMeta(ABCMeta, type(LoggingConfigurable)): # type: ignore + pass + + +class KernelProvisionerBase(ABC, LoggingConfigurable, metaclass=KernelProvisionerMeta): + """ + Abstract base class defining methods for KernelProvisioner classes. + + A majority of methods are abstract (requiring implementations via a subclass) while + some are optional and others provide implementations common to all instances. + Subclasses should be aware of which methods require a call to the superclass. + + Many of these methods model those of :class:`subprocess.Popen` for parity with + previous versions where the kernel process was managed directly. + """ + + # The kernel specification associated with this provisioner + kernel_spec: Any = Instance('jupyter_client.kernelspec.KernelSpec', allow_none=True) + kernel_id: str = Unicode(None, allow_none=True) + connection_info: KernelConnectionInfo = {} + + @property + @abstractmethod + def has_process(self) -> bool: + """ + Returns true if this provisioner is currently managing a process. + + This property is asserted to be True immediately following a call to + the provisioner's :meth:`launch_kernel` method. + """ + pass + + @abstractmethod + async def poll(self) -> Optional[int]: + """ + Checks if kernel process is still running. + + If running, None is returned, otherwise the process's integer-valued exit code is returned. + This method is called from :meth:`KernelManager.is_alive`. + """ + pass + + @abstractmethod + async def wait(self) -> Optional[int]: + """ + Waits for kernel process to terminate. + + This method is called from `KernelManager.finish_shutdown()` and + `KernelManager.kill_kernel()` when terminating a kernel gracefully or + immediately, respectively. + """ + pass + + @abstractmethod + async def send_signal(self, signum: int) -> None: + """ + Sends signal identified by signum to the kernel process. + + This method is called from `KernelManager.signal_kernel()` to send the + kernel process a signal. + """ + pass + + @abstractmethod + async def kill(self, restart: bool = False) -> None: + """ + Kill the kernel process. + + This is typically accomplished via a SIGKILL signal, which cannot be caught. + This method is called from `KernelManager.kill_kernel()` when terminating + a kernel immediately. + + restart is True if this operation will precede a subsequent launch_kernel request. + """ + pass + + @abstractmethod + async def terminate(self, restart: bool = False) -> None: + """ + Terminates the kernel process. + + This is typically accomplished via a SIGTERM signal, which can be caught, allowing + the kernel provisioner to perform possible cleanup of resources. This method is + called indirectly from `KernelManager.finish_shutdown()` during a kernel's + graceful termination. + + restart is True if this operation precedes a start launch_kernel request. + """ + pass + + @abstractmethod + async def launch_kernel(self, cmd: List[str], **kwargs: Any) -> KernelConnectionInfo: + """ + Launch the kernel process and return its connection information. + + This method is called from `KernelManager.launch_kernel()` during the + kernel manager's start kernel sequence. + """ + pass + + @abstractmethod + async def cleanup(self, restart: bool = False) -> None: + """ + Cleanup any resources allocated on behalf of the kernel provisioner. + + This method is called from `KernelManager.cleanup_resources()` as part of + its shutdown kernel sequence. + + restart is True if this operation precedes a start launch_kernel request. + """ + pass + + async def shutdown_requested(self, restart: bool = False) -> None: + """ + Allows the provisioner to determine if the kernel's shutdown has been requested. + + This method is called from `KernelManager.request_shutdown()` as part of + its shutdown sequence. + + This method is optional and is primarily used in scenarios where the provisioner + may need to perform other operations in preparation for a kernel's shutdown. + """ + pass + + async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]: + """ + Perform any steps in preparation for kernel process launch. + + This includes applying additional substitutions to the kernel launch command + and environment. It also includes preparation of launch parameters. + + NOTE: Subclass implementations are advised to call this method as it applies + environment variable substitutions from the local environment and calls the + provisioner's :meth:`_finalize_env()` method to allow each provisioner the + ability to cleanup the environment variables that will be used by the kernel. + + This method is called from `KernelManager.pre_start_kernel()` as part of its + start kernel sequence. + + Returns the (potentially updated) keyword arguments that are passed to + :meth:`launch_kernel()`. + """ + env = kwargs.pop('env', os.environ).copy() + env.update(self.__apply_env_substitutions(env)) + self._finalize_env(env) + kwargs['env'] = env + + return kwargs + + async def post_launch(self, **kwargs: Any) -> None: + """ + Perform any steps following the kernel process launch. + + This method is called from `KernelManager.post_start_kernel()` as part of its + start kernel sequence. + """ + pass + + async def get_provisioner_info(self) -> Dict[str, Any]: + """ + Captures the base information necessary for persistence relative to this instance. + + This enables applications that subclass `KernelManager` to persist a kernel provisioner's + relevant information to accomplish functionality like disaster recovery or high availability + by calling this method via the kernel manager's `provisioner` attribute. + + NOTE: The superclass method must always be called first to ensure proper serialization. + """ + provisioner_info: Dict[str, Any] = dict() + provisioner_info['kernel_id'] = self.kernel_id + provisioner_info['connection_info'] = self.connection_info + return provisioner_info + + async def load_provisioner_info(self, provisioner_info: Dict) -> None: + """ + Loads the base information necessary for persistence relative to this instance. + + The inverse of `get_provisioner_info()`, this enables applications that subclass + `KernelManager` to re-establish communication with a provisioner that is managing + a (presumably) remote kernel from an entirely different process that the original + provisioner. + + NOTE: The superclass method must always be called first to ensure proper deserialization. + """ + self.kernel_id = provisioner_info['kernel_id'] + self.connection_info = provisioner_info['connection_info'] + + def get_shutdown_wait_time(self, recommended: float = 5.0) -> float: + """ + Returns the time allowed for a complete shutdown. This may vary by provisioner. + + This method is called from `KernelManager.finish_shutdown()` during the graceful + phase of its kernel shutdown sequence. + + The recommended value will typically be what is configured in the kernel manager. + """ + return recommended + + def _finalize_env(self, env: Dict[str, str]) -> None: + """ + Ensures env is appropriate prior to launch. + + This method is called from `KernelProvisionerBase.pre_launch()` during the kernel's + start sequence. + + NOTE: Subclasses should be sure to call super()._finalize_env(env) + """ + if self.kernel_spec.language and self.kernel_spec.language.lower().startswith("python"): + # Don't allow PYTHONEXECUTABLE to be passed to kernel process. + # If set, it can bork all the things. + env.pop('PYTHONEXECUTABLE', None) + + def __apply_env_substitutions(self, substitution_values: Dict[str, str]): + """ + Walks entries in the kernelspec's env stanza and applies substitutions from current env. + + This method is called from `KernelProvisionerBase.pre_launch()` during the kernel's + start sequence. + + Returns the substituted list of env entries. + + NOTE: This method is private and is not intended to be overridden by provisioners. + """ + substituted_env = {} + if self.kernel_spec: + from string import Template + + # For each templated env entry, fill any templated references + # matching names of env variables with those values and build + # new dict with substitutions. + templated_env = self.kernel_spec.env + for k, v in templated_env.items(): + substituted_env.update({k: Template(v).safe_substitute(substitution_values)}) + return substituted_env diff --git a/jupyter_client/tests/conftest.py b/jupyter_client/tests/conftest.py index 19f481dbd..8f9ad7378 100644 --- a/jupyter_client/tests/conftest.py +++ b/jupyter_client/tests/conftest.py @@ -3,6 +3,12 @@ import sys import pytest +from jupyter_core import paths + +from .utils import test_env + +pjoin = os.path.join + if os.name == "nt" and sys.version_info >= (3, 7): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) @@ -20,3 +26,16 @@ def event_loop(): yield loop finally: loop.close() + + +@pytest.fixture(autouse=True) +def env(): + env_patch = test_env() + env_patch.start() + yield + env_patch.stop() + + +@pytest.fixture() +def kernel_dir(): + return pjoin(paths.jupyter_data_dir(), 'kernels') diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index 44e136cfd..f90fbc6fd 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -19,7 +19,6 @@ from ..manager import start_new_kernel from .utils import AsyncKMSubclass from .utils import SyncKMSubclass -from .utils import test_env from jupyter_client import AsyncKernelManager from jupyter_client import KernelManager @@ -28,14 +27,6 @@ TIMEOUT = 30 -@pytest.fixture(autouse=True) -def env(): - env_patch = test_env() - env_patch.start() - yield - env_patch.stop() - - @pytest.fixture(params=["tcp", "ipc"]) def transport(request): if sys.platform == "win32" and request.param == "ipc": # @@ -353,7 +344,7 @@ def test_start_sequence_kernels(self, config, install_kernel): self._run_signaltest_lifecycle(config) self._run_signaltest_lifecycle(config) - @pytest.mark.timeout(TIMEOUT) + @pytest.mark.timeout(TIMEOUT + 10) def test_start_parallel_thread_kernels(self, config, install_kernel): if config.KernelManager.transport == "ipc": # FIXME pytest.skip("IPC transport is currently not working for this test!") diff --git a/jupyter_client/tests/test_provisioning.py b/jupyter_client/tests/test_provisioning.py new file mode 100644 index 000000000..3aa3db0e2 --- /dev/null +++ b/jupyter_client/tests/test_provisioning.py @@ -0,0 +1,345 @@ +"""Test Provisioning""" +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. +import asyncio +import json +import os +import signal +import sys +from subprocess import PIPE +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +import pytest +from entrypoints import EntryPoint +from entrypoints import NoSuchEntryPoint +from jupyter_core import paths +from traitlets import Int +from traitlets import Unicode + +from ..connect import KernelConnectionInfo +from ..kernelspec import KernelSpecManager +from ..kernelspec import NoSuchKernel +from ..launcher import launch_kernel +from ..manager import AsyncKernelManager +from ..provisioning import KernelProvisionerBase +from ..provisioning import KernelProvisionerFactory +from ..provisioning import LocalProvisioner + +pjoin = os.path.join + + +class SubclassedTestProvisioner(LocalProvisioner): + + config_var_1: int = Int(config=True) + config_var_2: str = Unicode(config=True) + + pass + + +class CustomTestProvisioner(KernelProvisionerBase): + + process = None + pid = None + pgid = None + + config_var_1: int = Int(config=True) + config_var_2: str = Unicode(config=True) + + @property + def has_process(self) -> bool: + return self.process is not None + + async def poll(self) -> Optional[int]: + ret = 0 + if self.process: + ret = self.process.poll() + return ret + + async def wait(self) -> Optional[int]: + ret = 0 + if self.process: + while await self.poll() is None: + await asyncio.sleep(0.1) + + # Process is no longer alive, wait and clear + ret = self.process.wait() + self.process = None + return ret + + async def send_signal(self, signum: int) -> None: + if self.process: + if signum == signal.SIGINT and sys.platform == 'win32': + from ..win_interrupt import send_interrupt + + send_interrupt(self.process.win32_interrupt_event) + return + + # Prefer process-group over process + if self.pgid and hasattr(os, "killpg"): + try: + os.killpg(self.pgid, signum) + return + except OSError: + pass + return self.process.send_signal(signum) + + async def kill(self, restart=False) -> None: + if self.process: + self.process.kill() + + async def terminate(self, restart=False) -> None: + if self.process: + self.process.terminate() + + async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]: + km = self.parent + if km: + # save kwargs for use in restart + km._launch_args = kwargs.copy() + # build the Popen cmd + extra_arguments = kwargs.pop('extra_arguments', []) + + # write connection file / get default ports + km.write_connection_file() + self.connection_info = km.get_connection_info() + + kernel_cmd = km.format_kernel_cmd( + extra_arguments=extra_arguments + ) # This needs to remain here for b/c + + return await super().pre_launch(cmd=kernel_cmd, **kwargs) + + async def launch_kernel(self, cmd: List[str], **kwargs: Any) -> KernelConnectionInfo: + scrubbed_kwargs = kwargs + self.process = launch_kernel(cmd, **scrubbed_kwargs) + pgid = None + if hasattr(os, "getpgid"): + try: + pgid = os.getpgid(self.process.pid) + except OSError: + pass + + self.pid = self.process.pid + self.pgid = pgid + return self.connection_info + + async def cleanup(self, restart=False) -> None: + pass + + +class NewTestProvisioner(CustomTestProvisioner): + pass + + +def build_kernelspec(name: str, provisioner: Optional[str] = None) -> None: + spec = { + 'argv': [ + sys.executable, + '-m', + 'jupyter_client.tests.signalkernel', + '-f', + '{connection_file}', + ], + 'display_name': f"Signal Test Kernel w {provisioner}", + 'env': {'TEST_VARS': '${TEST_VARS}:test_var_2'}, + 'metadata': {}, + } + + if provisioner: + kernel_provisioner = {'kernel_provisioner': {'provisioner_name': provisioner}} + spec['metadata'].update(kernel_provisioner) + if provisioner != 'local-provisioner': + spec['metadata']['kernel_provisioner']['config'] = { + 'config_var_1': 42, + 'config_var_2': name, + } + + kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', name) + os.makedirs(kernel_dir) + with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f: + f.write(json.dumps(spec)) + + +def new_provisioner(): + build_kernelspec('new_provisioner', 'new-test-provisioner') + + +def custom_provisioner(): + build_kernelspec('custom_provisioner', 'custom-test-provisioner') + + +@pytest.fixture +def all_provisioners(): + build_kernelspec('no_provisioner') + build_kernelspec('missing_provisioner', 'missing-provisioner') + build_kernelspec('default_provisioner', 'local-provisioner') + build_kernelspec('subclassed_provisioner', 'subclassed-test-provisioner') + custom_provisioner() + + +@pytest.fixture( + params=[ + 'no_provisioner', + 'default_provisioner', + 'missing_provisioner', + 'custom_provisioner', + 'subclassed_provisioner', + ] +) +def akm(request, all_provisioners): + return AsyncKernelManager(kernel_name=request.param) + + +initial_provisioner_map = { + 'local-provisioner': ('jupyter_client.provisioning', 'LocalProvisioner'), + 'subclassed-test-provisioner': ( + 'jupyter_client.tests.test_provisioning', + 'SubclassedTestProvisioner', + ), + 'custom-test-provisioner': ('jupyter_client.tests.test_provisioning', 'CustomTestProvisioner'), +} + + +def mock_get_all_provisioners() -> List[EntryPoint]: + result = [] + for name, epstr in initial_provisioner_map.items(): + result.append(EntryPoint(name, epstr[0], epstr[1])) + return result + + +def mock_get_provisioner(name) -> EntryPoint: + if name == 'new-test-provisioner': + return EntryPoint( + 'new-test-provisioner', 'jupyter_client.tests.test_provisioning', 'NewTestProvisioner' + ) + + if name in initial_provisioner_map: + return EntryPoint(name, initial_provisioner_map[name][0], initial_provisioner_map[name][1]) + + raise NoSuchEntryPoint(KernelProvisionerFactory.GROUP_NAME, name) + + +@pytest.fixture +def kpf(monkeypatch): + """Setup the Kernel Provisioner Factory, mocking the entrypoint fetch calls.""" + monkeypatch.setattr( + KernelProvisionerFactory, '_get_all_provisioners', mock_get_all_provisioners + ) + monkeypatch.setattr(KernelProvisionerFactory, '_get_provisioner', mock_get_provisioner) + factory = KernelProvisionerFactory.instance() + return factory + + +class TestDiscovery: + def test_find_all_specs(self, kpf, all_provisioners): + ksm = KernelSpecManager() + kernels = ksm.get_all_specs() + + # Ensure specs for initial provisioners exist, + # and missing_provisioner & new_provisioner don't + assert 'no_provisioner' in kernels + assert 'default_provisioner' in kernels + assert 'subclassed_provisioner' in kernels + assert 'custom_provisioner' in kernels + assert 'missing_provisioner' not in kernels + assert 'new_provisioner' not in kernels + + def test_get_missing(self, all_provisioners): + ksm = KernelSpecManager() + with pytest.raises(NoSuchKernel): + ksm.get_kernel_spec('missing_provisioner') + + def test_get_new(self, kpf): + new_provisioner() # Introduce provisioner after initialization of KPF + ksm = KernelSpecManager() + kernel = ksm.get_kernel_spec('new_provisioner') + assert 'new-test-provisioner' == kernel.metadata['kernel_provisioner']['provisioner_name'] + + +class TestRuntime: + async def akm_test(self, kernel_mgr): + """Starts a kernel, validates the associated provisioner's config, shuts down kernel """ + + assert kernel_mgr.provisioner is None + if kernel_mgr.kernel_name == 'missing_provisioner': + with pytest.raises(NoSuchKernel): + await kernel_mgr.start_kernel() + else: + await kernel_mgr.start_kernel() + + TestRuntime.validate_provisioner(kernel_mgr) + + await kernel_mgr.shutdown_kernel() + assert kernel_mgr.provisioner.has_process is False + + @pytest.mark.asyncio + async def test_existing(self, kpf, akm): + await self.akm_test(akm) + + @pytest.mark.asyncio + async def test_new(self, kpf): + new_provisioner() # Introduce provisioner after initialization of KPF + new_km = AsyncKernelManager(kernel_name='new_provisioner') + await self.akm_test(new_km) + + @pytest.mark.asyncio + async def test_custom_lifecycle(self, kpf): + custom_provisioner() + async_km = AsyncKernelManager(kernel_name='custom_provisioner') + await async_km.start_kernel(stdout=PIPE, stderr=PIPE) + is_alive = await async_km.is_alive() + assert is_alive + await async_km.restart_kernel(now=True) + is_alive = await async_km.is_alive() + assert is_alive + await async_km.interrupt_kernel() + assert isinstance(async_km, AsyncKernelManager) + await async_km.shutdown_kernel(now=True) + is_alive = await async_km.is_alive() + assert is_alive is False + assert async_km.context.closed + + @pytest.mark.asyncio + async def test_default_provisioner_config(self, kpf, all_provisioners): + kpf.default_provisioner_name = 'custom-test-provisioner' + async_km = AsyncKernelManager(kernel_name='no_provisioner') + await async_km.start_kernel(stdout=PIPE, stderr=PIPE) + is_alive = await async_km.is_alive() + assert is_alive + + assert isinstance(async_km.provisioner, CustomTestProvisioner) + assert async_km.provisioner.config_var_1 == 0 # Not in kernelspec, so default of 0 exists + + await async_km.shutdown_kernel(now=True) + is_alive = await async_km.is_alive() + assert is_alive is False + assert async_km.context.closed + + @staticmethod + def validate_provisioner(akm: AsyncKernelManager): + # Ensure the provisioner is managing a process at this point + assert akm.provisioner is not None and akm.provisioner.has_process + + # Validate provisioner config + if akm.kernel_name in ['no_provisioner', 'default_provisioner']: + assert not hasattr(akm.provisioner, 'config_var_1') + assert not hasattr(akm.provisioner, 'config_var_2') + else: + assert akm.provisioner.config_var_1 == 42 + assert akm.provisioner.config_var_2 == akm.kernel_name + + # Validate provisioner class + if akm.kernel_name in ['no_provisioner', 'default_provisioner', 'subclassed_provisioner']: + assert isinstance(akm.provisioner, LocalProvisioner) + if akm.kernel_name == 'subclassed_provisioner': + assert isinstance(akm.provisioner, SubclassedTestProvisioner) + else: + assert not isinstance(akm.provisioner, SubclassedTestProvisioner) + else: + assert isinstance(akm.provisioner, CustomTestProvisioner) + assert not isinstance(akm.provisioner, LocalProvisioner) + if akm.kernel_name == 'new_provisioner': + assert isinstance(akm.provisioner, NewTestProvisioner) diff --git a/jupyter_client/tests/utils.py b/jupyter_client/tests/utils.py index 252c368de..a1ef0a6cf 100644 --- a/jupyter_client/tests/utils.py +++ b/jupyter_client/tests/utils.py @@ -130,7 +130,7 @@ def request_shutdown(self, restart=False): """ Record call and defer to superclass """ @subclass_recorder - def finish_shutdown(self, waittime=None, pollinterval=0.1): + def finish_shutdown(self, waittime=None, pollinterval=0.1, restart=False): """ Record call and defer to superclass """ @subclass_recorder @@ -191,7 +191,7 @@ def request_shutdown(self, kernel_id, restart=False): """ Record call and defer to superclass """ @subclass_recorder - def finish_shutdown(self, kernel_id, waittime=None, pollinterval=0.1): + def finish_shutdown(self, kernel_id, waittime=None, pollinterval=0.1, restart=False): """ Record call and defer to superclass """ @subclass_recorder diff --git a/requirements.txt b/requirements.txt index aa3a832e4..9219c73b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +entrypoints jupyter_core>=4.6.0 nest-asyncio>=1.5 python-dateutil>=2.1 diff --git a/setup.py b/setup.py index ca9ec46cf..4a2167932 100644 --- a/setup.py +++ b/setup.py @@ -70,5 +70,8 @@ 'jupyter-run = jupyter_client.runapp:RunApp.launch_instance', 'jupyter-kernel = jupyter_client.kernelapp:main', ], + 'jupyter_client.kernel_provisioners': [ + 'local-provisioner = jupyter_client.provisioning:LocalProvisioner', + ], }, )