Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Kill subprocesses on shutdown. #869

Merged
merged 7 commits into from
Feb 18, 2022
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 79 additions & 21 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,49 @@

import asyncio
import concurrent.futures
from datetime import datetime
from functools import partial
import inspect
import itertools
import logging
import inspect
import os
from signal import signal, default_int_handler, SIGINT
import sys
import socket
import sys
import time
import uuid
import warnings
from datetime import datetime
from functools import partial
from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal

if sys.platform != "win32":
from signal import SIGKILL
else:
SIGKILL = None


try:
import psutil
except ImportError:
psutil = None


try:
# jupyter_client >= 5, use tz-aware now
from jupyter_client.session import utcnow as now
except ImportError:
# jupyter_client < 5, use local now()
now = datetime.now

import zmq
from IPython.core.error import StdinNotImplementedError
from jupyter_client.session import Session
from tornado import ioloop
from tornado.queues import Queue, QueueEmpty
import zmq
from traitlets import (Any, Bool, Dict, Float, Instance, Integer, List, Set,
Unicode, default, observe)
from traitlets.config.configurable import SingletonConfigurable
from zmq.eventloop.zmqstream import ZMQStream

from traitlets.config.configurable import SingletonConfigurable
from IPython.core.error import StdinNotImplementedError
from ipykernel.jsonutil import json_clean
from traitlets import (
Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
observe, default
)

from jupyter_client.session import Session

from ._version import kernel_protocol_version

Expand Down Expand Up @@ -796,14 +801,12 @@ async def comm_info_request(self, stream, ident, parent):
reply_content, parent, ident)
self.log.debug("%s", msg)

async def interrupt_request(self, stream, ident, parent):
pid = os.getpid()
pgid = os.getpgid(pid)

def _send_interupt_children(self):
if os.name == "nt":
self.log.error("Interrupt message not supported on Windows")

else:
pid = os.getpid()
pgid = os.getpgid(pid)
# Prefer process-group over process
if pgid and hasattr(os, "killpg"):
try:
Expand All @@ -816,6 +819,8 @@ async def interrupt_request(self, stream, ident, parent):
except OSError:
pass

async def interrupt_request(self, stream, ident, parent):
self._send_interupt_children()
content = parent['content']
self.session.send(stream, 'interrupt_reply', content, parent, ident=ident)
return
Expand All @@ -830,7 +835,7 @@ async def shutdown_request(self, stream, ident, parent):
content, parent
)

self._at_shutdown()
await self._at_shutdown()

self.log.debug('Stopping control ioloop')
control_io_loop = self.control_stream.io_loop
Expand Down Expand Up @@ -1131,9 +1136,62 @@ def _input_request(self, prompt, ident, parent, password=False):
raise EOFError
return value

def _at_shutdown(self):
async def _progressively_terminate_all_children(self):
if sys.platform == "win32":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method should work on both Windows and Posix: https://stackoverflow.com/a/25134985

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it a try. I know that the "loop and kill children" was switched back to killpg at some point in jupyter_client as the first one had issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(that was in jupyter/jupyter_client#743, i'll try to get the same logic from there)

self.log.info(f"Terminating subprocesses not yet supported on windows.")
return

pgid = os.getpgid(os.getpid())
if not pgid:
self.log.warning(f"No Pgid ({pgid}), not trying to stop subprocesses.")
return
if psutil is None:
# blindly send quickly sigterm/sigkill to processes if psutil not there.
self.log.debug("Please install psutil for a cleaner subprocess shutdown.")
self._send_interupt_children()
try:
await asyncio.sleep(0.05)
self.log.debug("Sending SIGTERM to {pgid}")
os.killpg(pgid, SIGTERM)
await asyncio.sleep(0.05)
self.log.debug("Sending SIGKILL to {pgid}")
os.killpg(pgid, SIGKILL)
except Exception:
self.log.exception("Exception during subprocesses termination")
return

sleeps = (0.01, 0.03, 0.1, 0.3, 1)
children = psutil.Process().children(recursive=True)
if not children:
self.log.debug("Kernel has no children.")
return
self.log.debug(f"Trying to interrupt then kill subprocesses : {children}")
self._send_interupt_children()

for signum in (SIGTERM, SIGKILL):
self.log.debug(
f"Will try to send {signum} ({Signals(signum)}) to subprocesses :{children}"
)
for delay in sleeps:
children = psutil.Process().children(recursive=True)
if not children:
self.log.debug("No more children, continuing shutdown routine.")
return
if pgid and hasattr(os, "killpg"):
try:
os.killpg(pgid, signum)
except OSError:
self.log.warning("OSError running killpg, not killing children")
return
self.log.debug(
f"Will sleep {delay}s before checking for children and retrying."
)
await ascynio.sleep(delay)

async def _at_shutdown(self):
"""Actions taken at shutdown by the kernel, called by python's atexit.
"""
await self._progressively_terminate_all_children()
if self._shutdown_message is not None:
self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
self.log.debug("%s", self._shutdown_message)
Expand Down