From 80da994964ffd0027661e394435431c7f6822deb Mon Sep 17 00:00:00 2001 From: Matthias Bussonnier Date: Wed, 16 Feb 2022 17:53:21 +0100 Subject: [PATCH] BUG: Kill subprocesses on shutdown. Fixes #jupyter/jupyter_client#104 This should make sure we properly cull all subprocesses at shutdown, it does change one of the private method from sync to async in order to no user time.sleep or thread so this may affect subclasses, though I doubt it. It's also not completely clear to me whether this works on windows as SIGINT I belove is not a thing. Regardless as this affects things like dask, and others that are mostly on unix, it should be an improvement. It does the following, stopping as soon as it does not find any more children to current process. - Send sigint to everything - Immediately send sigterm in look with an exponential backoff from 0.01 to 1 second roughtly multiplying the delay until next send by 3 each time. - Switch to sending sigkill with same backoff. There is no delay after sigint, as this is just a courtesy. The delays backoff are not configurable. I can imagine that on slow systems it may make sens --- ipykernel/kernelbase.py | 73 +++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 18 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index c1494ffbb..e3e36a8a1 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -5,23 +5,26 @@ import asyncio import concurrent.futures -from datetime import datetime -from functools import partial +import inspect import itertools import logging -import inspect import os -from signal import signal, default_int_handler, SIGINT -import sys import socket +import sys import time import uuid import warnings +from datetime import datetime +from functools import partial +from signal import (SIGINT, SIGKILL, SIGTERM, Signals, default_int_handler, + signal) + try: import psutil except ImportError: psutil = None + try: # jupyter_client >= 5, use tz-aware now from jupyter_client.session import utcnow as now @@ -29,20 +32,17 @@ # jupyter_client < 5, use local now() now = datetime.now +import zmq +from IPython.core.error import StdinNotImplementedError +from jupyter_client.session import Session from tornado import ioloop from tornado.queues import Queue, QueueEmpty -import zmq +from traitlets import (Any, Bool, Dict, Float, Instance, Integer, List, Set, + Unicode, default, observe) +from traitlets.config.configurable import SingletonConfigurable from zmq.eventloop.zmqstream import ZMQStream -from traitlets.config.configurable import SingletonConfigurable -from IPython.core.error import StdinNotImplementedError from ipykernel.jsonutil import json_clean -from traitlets import ( - Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, - observe, default -) - -from jupyter_client.session import Session from ._version import kernel_protocol_version @@ -796,13 +796,13 @@ async def comm_info_request(self, stream, ident, parent): reply_content, parent, ident) self.log.debug("%s", msg) - async def interrupt_request(self, stream, ident, parent): + def _send_interupt_children(self): + pid = os.getpid() pgid = os.getpgid(pid) if os.name == "nt": self.log.error("Interrupt message not supported on Windows") - else: # Prefer process-group over process if pgid and hasattr(os, "killpg"): @@ -816,6 +816,8 @@ async def interrupt_request(self, stream, ident, parent): except OSError: pass + async def interrupt_request(self, stream, ident, parent): + self._send_interupt_children() content = parent['content'] self.session.send(stream, 'interrupt_reply', content, parent, ident=ident) return @@ -830,7 +832,7 @@ async def shutdown_request(self, stream, ident, parent): content, parent ) - self._at_shutdown() + await self._at_shutdown() self.log.debug('Stopping control ioloop') control_io_loop = self.control_stream.io_loop @@ -1131,9 +1133,44 @@ def _input_request(self, prompt, ident, parent, password=False): raise EOFError return value - def _at_shutdown(self): + async def _progressively_terminate_all_children(self): + + pgid = os.getpgid(os.getpid()) + if not pgid: + self.log.warning(f"No Pgid ({pgid=}), not trying to stop subprocesses.") + return + + sleeps = (0.01, 0.03, 0.1, 0.3, 1) + children = psutil.Process().children(recursive=True) + if not children: + self.log.debug("Kernel has no children.") + return + self.log.debug(f"Trying to interrupt then kill subprocesses : {children=}") + self._send_interupt_children() + for signum in (SIGTERM, SIGKILL): + self.log.debug( + f"Will try to send {signum} ({Signals(signum)}) to subprocesses :{children}" + ) + for delay in sleeps: + children = psutil.Process().children(recursive=True) + if not children: + self.log.debug("No more children, continuing shutdown routine.") + return + if pgid and hasattr(os, "killpg"): + try: + os.killpg(pgid, signum) + except OSError: + self.log.warning("OSError running killpg, not killing children") + return + self.log.debug( + f"Will sleep {delay}s before checking for children and retrying." + ) + await ascynio.sleep(delay) + + async def _at_shutdown(self): """Actions taken at shutdown by the kernel, called by python's atexit. """ + await self._progressively_terminate_all_children() if self._shutdown_message is not None: self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) self.log.debug("%s", self._shutdown_message)