diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index c1494ffbb..7a7fcde45 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -5,23 +5,31 @@ import asyncio import concurrent.futures -from datetime import datetime -from functools import partial +import inspect import itertools import logging -import inspect import os -from signal import signal, default_int_handler, SIGINT -import sys import socket +import sys import time import uuid import warnings +from datetime import datetime +from functools import partial +from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal + +if sys.platform != "win32": + from signal import SIGKILL +else: + SIGKILL = "windown-SIGKILL-sentinel" + + try: import psutil except ImportError: psutil = None + try: # jupyter_client >= 5, use tz-aware now from jupyter_client.session import utcnow as now @@ -29,20 +37,17 @@ # jupyter_client < 5, use local now() now = datetime.now +import zmq +from IPython.core.error import StdinNotImplementedError +from jupyter_client.session import Session from tornado import ioloop from tornado.queues import Queue, QueueEmpty -import zmq +from traitlets import (Any, Bool, Dict, Float, Instance, Integer, List, Set, + Unicode, default, observe) +from traitlets.config.configurable import SingletonConfigurable from zmq.eventloop.zmqstream import ZMQStream -from traitlets.config.configurable import SingletonConfigurable -from IPython.core.error import StdinNotImplementedError from ipykernel.jsonutil import json_clean -from traitlets import ( - Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, - observe, default -) - -from jupyter_client.session import Session from ._version import kernel_protocol_version @@ -796,14 +801,12 @@ async def comm_info_request(self, stream, ident, parent): reply_content, parent, ident) self.log.debug("%s", msg) - async def interrupt_request(self, stream, ident, parent): - pid = os.getpid() - pgid = os.getpgid(pid) - + def _send_interupt_children(self): if os.name == "nt": self.log.error("Interrupt message not supported on Windows") - else: + pid = os.getpid() + pgid = os.getpgid(pid) # Prefer process-group over process if pgid and hasattr(os, "killpg"): try: @@ -816,6 +819,8 @@ async def interrupt_request(self, stream, ident, parent): except OSError: pass + async def interrupt_request(self, stream, ident, parent): + self._send_interupt_children() content = parent['content'] self.session.send(stream, 'interrupt_reply', content, parent, ident=ident) return @@ -830,7 +835,7 @@ async def shutdown_request(self, stream, ident, parent): content, parent ) - self._at_shutdown() + await self._at_shutdown() self.log.debug('Stopping control ioloop') control_io_loop = self.control_stream.io_loop @@ -1131,10 +1136,86 @@ def _input_request(self, prompt, ident, parent, password=False): raise EOFError return value - def _at_shutdown(self): + def _killpg(self, signal): + """ + similar to killpg but use psutil if it can on windows + or if pgid is none + + """ + pgid = os.getpgid(os.getpid()) + if pgid and hasattr(os, "killpg"): + try: + os.killpg(pgid, signal) + except (OSError) as e: + self.log.exception(f"OSError running killpg, not killing children.") + return + elif psutil is not None: + children = parent.children(recursive=True) + for p in children: + try: + if signal == SIGTERM: + p.terminate() + elif signal == SIGKILL: + p.kill() + except psutil.NoSuchProcess: + pass + + async def _progressively_terminate_all_children(self): + + pgid = os.getpgid(os.getpid()) + if psutil is None: + # blindly send quickly sigterm/sigkill to processes if psutil not there. + self.log.info("Please install psutil for a cleaner subprocess shutdown.") + self._send_interupt_children() + await asyncio.sleep(0.05) + self.log.debug("Sending SIGTERM to {pgid}") + self._killpg(SIGTERM) + await asyncio.sleep(0.05) + self.log.debug("Sending SIGKILL to {pgid}") + self._killpg(pgid, SIGKILL) + + sleeps = (0.01, 0.03, 0.1, 0.3, 1, 3, 10) + children = psutil.Process().children(recursive=True) + if not children: + self.log.debug("Kernel has no children.") + return + self.log.debug(f"Trying to interrupt then kill subprocesses : {children}") + self._send_interupt_children() + + for signum in (SIGTERM, SIGKILL): + self.log.debug( + f"Will try to send {signum} ({Signals(signum)!r}) to subprocesses :{children}" + ) + for delay in sleeps: + children = psutil.Process().children(recursive=True) + try: + if not children: + self.log.warning( + "No more children, continuing shutdown routine." + ) + return + except psutil.NoSuchProcess: + pass + self._killpg(15) + self.log.debug( + f"Will sleep {delay}s before checking for children and retrying. {children}" + ) + await asyncio.sleep(delay) + + async def _at_shutdown(self): """Actions taken at shutdown by the kernel, called by python's atexit. """ - if self._shutdown_message is not None: - self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) - self.log.debug("%s", self._shutdown_message) - self.control_stream.flush(zmq.POLLOUT) + try: + await self._progressively_terminate_all_children() + except Exception as e: + self.log.exception("Exception during subprocesses termination %s", e) + + finally: + if self._shutdown_message is not None: + self.session.send( + self.iopub_socket, + self._shutdown_message, + ident=self._topic("shutdown"), + ) + self.log.debug("%s", self._shutdown_message) + self.control_stream.flush(zmq.POLLOUT) diff --git a/setup.py b/setup.py index 95dffbc66..e7af2a86c 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ def run(self): 'tornado>=4.2,<7.0', 'matplotlib-inline>=0.1.0,<0.2.0', 'appnope;platform_system=="Darwin"', + 'psutil;platform_system=="Windows"', 'nest_asyncio', ], extras_require={