diff --git a/jupyter_client/__init__.py b/jupyter_client/__init__.py index 0a6eec513..ff0e7d340 100644 --- a/jupyter_client/__init__.py +++ b/jupyter_client/__init__.py @@ -1,9 +1,14 @@ """Client-side implementations of the Jupyter protocol""" +import pathlib + from ._version import __version__ # noqa from ._version import protocol_version # noqa from ._version import protocol_version_info # noqa from ._version import version_info # noqa +JUPYTER_CLIENT_EVENTS_URI = "https://events.jupyter.org/jupyter_client" +DEFAULT_EVENTS_SCHEMA_PATH = pathlib.Path(__file__).parent / "event_schemas" + try: from .asynchronous import AsyncKernelClient # noqa from .blocking import BlockingKernelClient # noqa diff --git a/jupyter_client/event_schemas/kernel_manager/v1.yaml b/jupyter_client/event_schemas/kernel_manager/v1.yaml new file mode 100644 index 000000000..6ef539e63 --- /dev/null +++ b/jupyter_client/event_schemas/kernel_manager/v1.yaml @@ -0,0 +1,36 @@ +"$id": https://events.jupyter.org/jupyter_client/kernel_manager/v1 +version: 1 +title: Kernel Manager Events +description: | + Record actions on kernels by the KernelManager. +type: object +required: + - kernel_id + - action +properties: + kernel_id: + oneOf: + - type: string + - type: "null" + description: The kernel's unique ID. + action: + enum: + - pre_start + - launch + - post_start + - interrupt + - restart + - kill + - request_shutdown + - finish_shutdown + - cleanup_resources + - restart_started + - restart_finished + - shutdown_started + - shutdown_finished + description: | + Action performed by the KernelManager API. + caller: + type: string + enum: + - kernel_manager diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index 19afd0db7..65a0c22d8 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -15,6 +15,7 @@ from enum import Enum import zmq +from jupyter_events import EventLogger # type: ignore[import] from traitlets import Any from traitlets import Bool from traitlets import default @@ -33,6 +34,8 @@ from .provisioning import KernelProvisionerFactory as KPF from .utils import ensure_async from .utils import run_sync +from jupyter_client import DEFAULT_EVENTS_SCHEMA_PATH +from jupyter_client import JUPYTER_CLIENT_EVENTS_URI from jupyter_client import KernelClient from jupyter_client import kernelspec @@ -91,6 +94,27 @@ class KernelManager(ConnectionFileMixin): This version starts kernels with Popen. """ + event_schema_id = JUPYTER_CLIENT_EVENTS_URI + "/kernel_manager/v1" + event_logger = Instance(EventLogger).tag(config=True) + + @default("event_logger") + def _default_event_logger(self): + if self.parent and hasattr(self.parent, "event_logger"): + return self.parent.event_logger + else: + # If parent does not have an event logger, create one. + logger = EventLogger() + schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml" + logger.register_event_schema(schema_path) + return logger + + def _emit(self, *, action: str) -> None: + """Emit event using the core event schema from Jupyter Server's Contents Manager.""" + self.event_logger.emit( + schema_id=self.event_schema_id, + data={"action": action, "kernel_id": self.kernel_id, "caller": "kernel_manager"}, + ) + _ready: t.Union[Future, CFuture] def __init__(self, *args, **kwargs): @@ -308,6 +332,7 @@ async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> No assert self.provisioner.has_process # Provisioner provides the connection information. Load into kernel manager and write file. self._force_connection_info(connection_info) + self._emit(action="launch") _launch_kernel = run_sync(_async_launch_kernel) @@ -350,6 +375,7 @@ async def _async_pre_start_kernel( ) kw = await self.provisioner.pre_launch(**kw) kernel_cmd = kw.pop('cmd') + self._emit(action="pre_start") return kernel_cmd, kw pre_start_kernel = run_sync(_async_pre_start_kernel) @@ -366,6 +392,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None: self._connect_control_socket() assert self.provisioner is not None await self.provisioner.post_launch(**kw) + self._emit(action="post_start") post_start_kernel = run_sync(_async_post_start_kernel) @@ -401,6 +428,7 @@ async def _async_request_shutdown(self, restart: bool = False) -> None: assert self.provisioner is not None await self.provisioner.shutdown_requested(restart=restart) self._shutdown_status = _ShutdownStatus.ShutdownRequest + self._emit(action="request_shutdown") request_shutdown = run_sync(_async_request_shutdown) @@ -442,6 +470,7 @@ async def _async_finish_shutdown( if self.has_kernel: assert self.provisioner is not None await self.provisioner.wait() + self._emit(action="finish_shutdown") finish_shutdown = run_sync(_async_finish_shutdown) @@ -459,6 +488,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None: if self.provisioner: await self.provisioner.cleanup(restart=restart) + self._emit(action="cleanup_resources") cleanup_resources = run_sync(_async_cleanup_resources) @@ -481,6 +511,7 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ + self._emit(action="shutdown_started") self.shutting_down = True # Used by restarter to prevent race condition # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -498,6 +529,7 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) await ensure_async(self.finish_shutdown(restart=restart)) await ensure_async(self.cleanup_resources(restart=restart)) + self._emit(action="shutdown_finished") shutdown_kernel = run_sync(_async_shutdown_kernel) @@ -528,6 +560,7 @@ async def _async_restart_kernel( Any options specified here will overwrite those used to launch the kernel. """ + self._emit(action="restart_started") if self._launch_args is None: raise RuntimeError("Cannot restart the kernel. No previous call to 'start_kernel'.") @@ -540,6 +573,7 @@ async def _async_restart_kernel( # Start new kernel. self._launch_args.update(kw) await ensure_async(self.start_kernel(**self._launch_args)) + self._emit(action="restart_finished") restart_kernel = run_sync(_async_restart_kernel) @@ -576,6 +610,7 @@ async def _async_kill_kernel(self, restart: bool = False) -> None: # Process is no longer alive, wait and clear if self.has_kernel: await self.provisioner.wait() + self._emit(action="kill") _kill_kernel = run_sync(_async_kill_kernel) @@ -597,6 +632,7 @@ async def _async_interrupt_kernel(self) -> None: self.session.send(self._control_socket, msg) else: raise RuntimeError("Cannot interrupt kernel. No kernel is running!") + self._emit(action="interrupt") interrupt_kernel = run_sync(_async_interrupt_kernel) diff --git a/pyproject.toml b/pyproject.toml index 4af1b7af0..685076829 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "pyzmq>=23.0", "tornado>=6.2", "traitlets", + "jupyter_events>=0.5.0" ] [[project.authors]] @@ -56,9 +57,10 @@ test = [ "mypy", "pre-commit", "pytest", - "pytest-asyncio>=0.18", + "pytest-asyncio>=0.19", "pytest-cov", "pytest-timeout", + "jupyter_events[test]" ] doc = [ "ipykernel", diff --git a/tests/conftest.py b/tests/conftest.py index b52872a89..f48c703ea 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,6 +16,9 @@ pjoin = os.path.join +pytest_plugins = ["jupyter_events.pytest_plugin"] + + # Handle resource limit # Ensure a minimal soft limit of DEFAULT_SOFT if the current hard limit is at least that much. if resource is not None: diff --git a/tests/test_kernelmanager.py b/tests/test_kernelmanager.py index e92ad0bf6..4c560ad9f 100644 --- a/tests/test_kernelmanager.py +++ b/tests/test_kernelmanager.py @@ -18,6 +18,7 @@ from .utils import AsyncKMSubclass from .utils import SyncKMSubclass from jupyter_client import AsyncKernelManager +from jupyter_client import DEFAULT_EVENTS_SCHEMA_PATH from jupyter_client import KernelManager from jupyter_client.manager import _ShutdownStatus from jupyter_client.manager import start_new_async_kernel @@ -92,14 +93,14 @@ def start_kernel(): @pytest.fixture -def km(config): - km = KernelManager(config=config) +def km(config, jp_event_logger): + km = KernelManager(config=config, event_logger=jp_event_logger) return km @pytest.fixture -def km_subclass(config): - km = SyncKMSubclass(config=config) +def km_subclass(config, jp_event_logger): + km = SyncKMSubclass(config=config, event_logger=jp_event_logger) return km @@ -112,15 +113,36 @@ def zmq_context(): ctx.term() +@pytest.fixture +def jp_event_schemas(): + return [DEFAULT_EVENTS_SCHEMA_PATH / "kernel_manager" / "v1.yaml"] + + +@pytest.fixture +def check_emitted_events(jp_read_emitted_events): + """Check the given events where emitted""" + + def _(*expected_list): + read_events = jp_read_emitted_events() + events = [e for e in read_events if e["caller"] == "kernel_manager"] + # Ensure that the number of read events match the expected events. + assert len(events) == len(expected_list) + # Loop through the events and make sure they are in order of expected. + for i, action in enumerate(expected_list): + assert "action" in events[i] and action == events[i]["action"] + + return _ + + @pytest.fixture(params=[AsyncKernelManager, AsyncKMSubclass]) -def async_km(request, config): - km = request.param(config=config) +def async_km(request, config, jp_event_logger): + km = request.param(config=config, event_logger=jp_event_logger) return km @pytest.fixture -def async_km_subclass(config): - km = AsyncKMSubclass(config=config) +def async_km_subclass(config, jp_event_logger): + km = AsyncKMSubclass(config=config, event_logger=jp_event_logger) return km @@ -193,18 +215,35 @@ async def test_async_signal_kernel_subprocesses(self, name, install, expected): class TestKernelManager: - def test_lifecycle(self, km): + def test_lifecycle(self, km, jp_read_emitted_events, check_emitted_events): km.start_kernel(stdout=PIPE, stderr=PIPE) + check_emitted_events("pre_start", "launch", "post_start") kc = km.client() assert km.is_alive() is_done = km.ready.done() assert is_done km.restart_kernel(now=True) + check_emitted_events( + "restart_started", + "shutdown_started", + "interrupt", + "kill", + "cleanup_resources", + "shutdown_finished", + "pre_start", + "launch", + "post_start", + "restart_finished", + ) assert km.is_alive() km.interrupt_kernel() + check_emitted_events("interrupt") assert isinstance(km, KernelManager) kc.stop_channels() km.shutdown_kernel(now=True) + check_emitted_events( + "shutdown_started", "interrupt", "kill", "cleanup_resources", "shutdown_finished" + ) assert km.context.closed def test_get_connect_info(self, km): @@ -448,7 +487,10 @@ def execute(cmd): @pytest.mark.asyncio class TestAsyncKernelManager: - async def test_lifecycle(self, async_km): + async def test_lifecycle( + self, + async_km, + ): await async_km.start_kernel(stdout=PIPE, stderr=PIPE) is_alive = await async_km.is_alive() assert is_alive diff --git a/tests/test_manager.py b/tests/test_manager.py index e3d6ea222..717c46560 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -32,3 +32,13 @@ def test_connection_file_real_path(): km._launch_args = {} cmds = km.format_kernel_cmd() assert cmds[4] == "foobar" + + +def test_kernel_manager_event_logger(jp_event_handler, jp_read_emitted_events): + action = "pre_start" + km = KernelManager() + km.event_logger.register_handler(jp_event_handler) + km._emit(action=action) + output = jp_read_emitted_events()[0] + assert "kernel_id" in output and output["kernel_id"] is None + assert "action" in output and output["action"] == action