Skip to content

Commit

Permalink
Merge pull request #869 from Carreau/kill-sp
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 authored Feb 18, 2022
2 parents 8d87867 + aaad575 commit 78c83ad
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 25 deletions.
131 changes: 106 additions & 25 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,49 @@

import asyncio
import concurrent.futures
from datetime import datetime
from functools import partial
import inspect
import itertools
import logging
import inspect
import os
from signal import signal, default_int_handler, SIGINT
import sys
import socket
import sys
import time
import uuid
import warnings
from datetime import datetime
from functools import partial
from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal

if sys.platform != "win32":
from signal import SIGKILL
else:
SIGKILL = "windown-SIGKILL-sentinel"


try:
import psutil
except ImportError:
psutil = None


try:
# jupyter_client >= 5, use tz-aware now
from jupyter_client.session import utcnow as now
except ImportError:
# jupyter_client < 5, use local now()
now = datetime.now

import zmq
from IPython.core.error import StdinNotImplementedError
from jupyter_client.session import Session
from tornado import ioloop
from tornado.queues import Queue, QueueEmpty
import zmq
from traitlets import (Any, Bool, Dict, Float, Instance, Integer, List, Set,
Unicode, default, observe)
from traitlets.config.configurable import SingletonConfigurable
from zmq.eventloop.zmqstream import ZMQStream

from traitlets.config.configurable import SingletonConfigurable
from IPython.core.error import StdinNotImplementedError
from ipykernel.jsonutil import json_clean
from traitlets import (
Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
observe, default
)

from jupyter_client.session import Session

from ._version import kernel_protocol_version

Expand Down Expand Up @@ -796,14 +801,12 @@ async def comm_info_request(self, stream, ident, parent):
reply_content, parent, ident)
self.log.debug("%s", msg)

async def interrupt_request(self, stream, ident, parent):
pid = os.getpid()
pgid = os.getpgid(pid)

def _send_interupt_children(self):
if os.name == "nt":
self.log.error("Interrupt message not supported on Windows")

else:
pid = os.getpid()
pgid = os.getpgid(pid)
# Prefer process-group over process
if pgid and hasattr(os, "killpg"):
try:
Expand All @@ -816,6 +819,8 @@ async def interrupt_request(self, stream, ident, parent):
except OSError:
pass

async def interrupt_request(self, stream, ident, parent):
self._send_interupt_children()
content = parent['content']
self.session.send(stream, 'interrupt_reply', content, parent, ident=ident)
return
Expand All @@ -830,7 +835,7 @@ async def shutdown_request(self, stream, ident, parent):
content, parent
)

self._at_shutdown()
await self._at_shutdown()

self.log.debug('Stopping control ioloop')
control_io_loop = self.control_stream.io_loop
Expand Down Expand Up @@ -1131,10 +1136,86 @@ def _input_request(self, prompt, ident, parent, password=False):
raise EOFError
return value

def _at_shutdown(self):
def _killpg(self, signal):
"""
similar to killpg but use psutil if it can on windows
or if pgid is none
"""
pgid = os.getpgid(os.getpid())
if pgid and hasattr(os, "killpg"):
try:
os.killpg(pgid, signal)
except (OSError) as e:
self.log.exception(f"OSError running killpg, not killing children.")
return
elif psutil is not None:
children = parent.children(recursive=True)
for p in children:
try:
if signal == SIGTERM:
p.terminate()
elif signal == SIGKILL:
p.kill()
except psutil.NoSuchProcess:
pass

async def _progressively_terminate_all_children(self):

pgid = os.getpgid(os.getpid())
if psutil is None:
# blindly send quickly sigterm/sigkill to processes if psutil not there.
self.log.info("Please install psutil for a cleaner subprocess shutdown.")
self._send_interupt_children()
await asyncio.sleep(0.05)
self.log.debug("Sending SIGTERM to {pgid}")
self._killpg(SIGTERM)
await asyncio.sleep(0.05)
self.log.debug("Sending SIGKILL to {pgid}")
self._killpg(pgid, SIGKILL)

sleeps = (0.01, 0.03, 0.1, 0.3, 1, 3, 10)
children = psutil.Process().children(recursive=True)
if not children:
self.log.debug("Kernel has no children.")
return
self.log.debug(f"Trying to interrupt then kill subprocesses : {children}")
self._send_interupt_children()

for signum in (SIGTERM, SIGKILL):
self.log.debug(
f"Will try to send {signum} ({Signals(signum)!r}) to subprocesses :{children}"
)
for delay in sleeps:
children = psutil.Process().children(recursive=True)
try:
if not children:
self.log.warning(
"No more children, continuing shutdown routine."
)
return
except psutil.NoSuchProcess:
pass
self._killpg(15)
self.log.debug(
f"Will sleep {delay}s before checking for children and retrying. {children}"
)
await asyncio.sleep(delay)

async def _at_shutdown(self):
"""Actions taken at shutdown by the kernel, called by python's atexit.
"""
if self._shutdown_message is not None:
self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
self.log.debug("%s", self._shutdown_message)
self.control_stream.flush(zmq.POLLOUT)
try:
await self._progressively_terminate_all_children()
except Exception as e:
self.log.exception("Exception during subprocesses termination %s", e)

finally:
if self._shutdown_message is not None:
self.session.send(
self.iopub_socket,
self._shutdown_message,
ident=self._topic("shutdown"),
)
self.log.debug("%s", self._shutdown_message)
self.control_stream.flush(zmq.POLLOUT)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def run(self):
'tornado>=4.2,<7.0',
'matplotlib-inline>=0.1.0,<0.2.0',
'appnope;platform_system=="Darwin"',
'psutil;platform_system=="Windows"',
'nest_asyncio',
],
extras_require={
Expand Down

0 comments on commit 78c83ad

Please sign in to comment.