Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill dangling subprocesses #632

Open
wants to merge 17 commits into
base: rolling
Choose a base branch
from
106 changes: 89 additions & 17 deletions launch/launch/actions/execute_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from osrf_pycommon.process_utils import async_execute_process
from osrf_pycommon.process_utils import AsyncSubprocessProtocol

import psutil

from .emit_event import EmitEvent
from .opaque_function import OpaqueFunction
from .timer_action import TimerAction
Expand Down Expand Up @@ -64,9 +66,7 @@
from ..launch_description_entity import LaunchDescriptionEntity
from ..some_actions_type import SomeActionsType
from ..some_substitutions_type import SomeSubstitutionsType
from ..substitution import Substitution # noqa: F401
from ..substitutions import LaunchConfiguration
from ..substitutions import PythonExpression
from ..utilities import create_future
from ..utilities import is_a_subclass
from ..utilities import normalize_to_list_of_substitutions
Expand All @@ -87,6 +87,8 @@ def __init__(
'sigterm_timeout', default=5),
sigkill_timeout: SomeSubstitutionsType = LaunchConfiguration(
'sigkill_timeout', default=5),
signal_lingering_subprocesses: SomeSubstitutionsType = LaunchConfiguration(
'signal_lingering_subprocesses', default=True),
emulate_tty: bool = False,
output: SomeSubstitutionsType = 'log',
output_format: Text = '[{this.process_description.final_name}] {line}',
Expand Down Expand Up @@ -158,6 +160,11 @@ def __init__(
as a string or a list of strings and Substitutions to be resolved
at runtime, defaults to the LaunchConfiguration called
'sigkill_timeout'
:param: signal_lingering_subprocesses if `True`, all subprocesses spawned by the process
will be signaled to make sure they finish.
The sequence of signals used is the same SIGINT/SIGTERM/SIGKILL sequence
used to kill the main process.
Subprocesses start being signaled when the main process completes.
:param: emulate_tty emulate a tty (terminal), defaults to False, but can
be overridden with the LaunchConfiguration called 'emulate_tty',
the value of which is evaluated as true or false according to
Expand Down Expand Up @@ -188,6 +195,8 @@ def __init__(
self.__shell = shell
self.__sigterm_timeout = normalize_to_list_of_substitutions(sigterm_timeout)
self.__sigkill_timeout = normalize_to_list_of_substitutions(sigkill_timeout)
self.__signal_lingering_subprocesses = normalize_to_list_of_substitutions(
signal_lingering_subprocesses)
self.__emulate_tty = emulate_tty
self.__output = os.environ.get('OVERRIDE_LAUNCH_PROCESS_OUTPUT', output)
if not isinstance(self.__output, dict):
Expand All @@ -207,6 +216,7 @@ def __init__(
self.__shutdown_future = None # type: Optional[asyncio.Future]
self.__sigterm_timer = None # type: Optional[TimerAction]
self.__sigkill_timer = None # type: Optional[TimerAction]
self.__children: List[psutil.Process] = []
self.__stdout_buffer = io.StringIO()
self.__stderr_buffer = io.StringIO()

Expand Down Expand Up @@ -279,7 +289,11 @@ def _shutdown_process(self, context, *, send_sigint):
self.__shutdown_future.set_result(None)

# Otherwise process is still running, start the shutdown procedures.
context.extend_locals({'process_name': self.process_details['name']})
context.extend_locals(
{
'process_name': self.process_details['name'],
'process_pid': self.process_details['pid'],
})
actions_to_return = self.__get_shutdown_timer_actions()
if send_sigint:
actions_to_return.append(self.__get_sigint_event())
Expand Down Expand Up @@ -356,7 +370,7 @@ def __on_process_output(
if buffer.closed:
# buffer was probably closed by __flush_buffers on shutdown. Output without
# buffering.
buffer.info(
logger.info(
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
self.__output_format.format(line=to_write, this=self)
)
else:
Expand Down Expand Up @@ -440,23 +454,17 @@ def __get_shutdown_timer_actions(self) -> List[Action]:
base_msg = \
"process[{}] failed to terminate '{}' seconds after receiving '{}', escalating to '{}'"

def printer(context, msg, timeout_substitutions):
self.__logger.error(msg.format(
context.locals.process_name,
perform_substitutions(context, timeout_substitutions),
))
def printer(context, msg):
self.__logger.error(msg.format(context.locals.process_name))

sigterm_timeout = self.__sigterm_timeout
sigkill_timeout = [PythonExpression(
('float(', *self.__sigterm_timeout, ') + float(', *self.__sigkill_timeout, ')')
)]
# Setup a timer to send us a SIGTERM if we don't shutdown quickly.
sigterm_timeout = self.__sigterm_timeout_value
self.__sigterm_timer = TimerAction(
period=sigterm_timeout,
actions=[
OpaqueFunction(
function=printer,
args=(base_msg.format('{}', '{}', 'SIGINT', 'SIGTERM'), sigterm_timeout)
args=(base_msg.format('{}', sigterm_timeout, 'SIGINT', 'SIGTERM'), )
),
EmitEvent(event=SignalProcess(
signal_number=signal.SIGTERM,
Expand All @@ -465,13 +473,14 @@ def printer(context, msg, timeout_substitutions):
],
cancel_on_shutdown=False,
)
sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value
# Setup a timer to send us a SIGKILL if we don't shutdown after SIGTERM.
self.__sigkill_timer = TimerAction(
period=sigkill_timeout,
actions=[
OpaqueFunction(
function=printer,
args=(base_msg.format('{}', '{}', 'SIGTERM', 'SIGKILL'), sigkill_timeout)
args=(base_msg.format('{}', sigkill_timeout, 'SIGTERM', 'SIGKILL'), )
),
EmitEvent(event=SignalProcess(
signal_number='SIGKILL',
Expand All @@ -480,6 +489,13 @@ def printer(context, msg, timeout_substitutions):
],
cancel_on_shutdown=False,
)
self.__children = []
pid = self._subprocess_transport.get_pid()
if pid is not None:
try:
self.__children = psutil.Process(pid).children(recursive=True)
except psutil.NoSuchProcess:
pass
return [
cast(Action, self.__sigterm_timer),
cast(Action, self.__sigkill_timer),
Expand All @@ -491,12 +507,15 @@ def __get_sigint_event(self):
process_matcher=matches_action(self),
))

def __cleanup(self):
# Cancel any pending timers we started.
def __cleanup_timers(self):
if self.__sigterm_timer is not None:
self.__sigterm_timer.cancel()
if self.__sigkill_timer is not None:
self.__sigkill_timer.cancel()

def __cleanup(self):
# Cancel any pending timers we started.
self.__cleanup_timers()
# Close subprocess transport if any.
if self._subprocess_transport is not None:
self._subprocess_transport.close()
Expand Down Expand Up @@ -529,6 +548,48 @@ def on_stdout_received(self, data: bytes) -> None:
def on_stderr_received(self, data: bytes) -> None:
self.__context.emit_event_sync(ProcessStderr(text=data, **self.__process_event_args))

async def _signal_subprocesses(self, context):
to_signal = self.__children
signaled = []
sig = signal.SIGINT
start_time = context.asyncio_loop.time()
sigterm_timeout = self.__sigterm_timeout_value
sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value
process_pid = self.process_details['pid']
process_name = self.process_details['name']
log_prefix_format = (
'subprocess[pid={}] of process['
f'{process_name}, pid={process_pid}]: ')
next_signals = iter(((signal.SIGTERM, sigterm_timeout), (signal.SIGKILL, sigkill_timeout)))
while True:
for p in to_signal:
try:
p.send_signal(sig)
except psutil.NoSuchProcess:
continue
log_prefix = log_prefix_format.format(p.pid)
self.__logger.info(
f'{log_prefix}sending {sig.name} to subprocess directly.'
)
signaled.append(p)
try:
sig, timeout = next(next_signals)
except StopIteration:
return
current_time = context.asyncio_loop.time()
while current_time < start_time + timeout:
await asyncio.sleep(min(0.5, start_time + timeout - current_time))
for p in list(signaled):
if not p.is_running():
log_prefix = log_prefix_format.format(p.pid)
self.__logger.info(f'{log_prefix}exited')
signaled.remove(p)
if not signaled:
return
current_time = context.asyncio_loop.time()
to_signal = signaled
signaled = []

async def __execute_process(self, context: LaunchContext) -> None:
process_event_args = self.__process_event_args
if process_event_args is None:
Expand Down Expand Up @@ -596,8 +657,13 @@ async def __execute_process(self, context: LaunchContext) -> None:
timeout=self.__respawn_delay
)
if not self.__shutdown_future.done():
if self.__signal_lingering_subprocesses_value:
await self._signal_subprocesses(context)
context.asyncio_loop.create_task(self.__execute_process(context))
return
self.__cleanup_timers()
if self.__signal_lingering_subprocesses_value:
await self._signal_subprocesses(context)
self.__cleanup()

def prepare(self, context: LaunchContext):
Expand Down Expand Up @@ -678,6 +744,12 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
]
for event_handler in event_handlers:
context.register_event_handler(event_handler)
self.__sigterm_timeout_value = perform_typed_substitution(
context, self.__sigterm_timeout, float)
self.__sigkill_timeout_value = perform_typed_substitution(
context, self.__sigkill_timeout, float)
self.__signal_lingering_subprocesses_value = perform_typed_substitution(
context, self.__signal_lingering_subprocesses, bool)

try:
self.__completed_future = create_future(context.asyncio_loop)
Expand Down
1 change: 1 addition & 0 deletions launch/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<exec_depend>ament_index_python</exec_depend>
<exec_depend>python3-importlib-metadata</exec_depend>
<exec_depend>python3-lark-parser</exec_depend>
<exec_depend>python3-psutil</exec_depend>
<exec_depend>python3-yaml</exec_depend>

<test_depend>ament_copyright</test_depend>
Expand Down
41 changes: 41 additions & 0 deletions launch/test/launch/test_execute_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

"""Tests for the ExecuteLocal Action."""

import asyncio
import os
import signal
import sys
import time

from launch import LaunchDescription
from launch import LaunchService
Expand All @@ -28,6 +31,8 @@
from launch.actions import TimerAction
from launch.descriptions import Executable

import psutil

import pytest


Expand Down Expand Up @@ -138,3 +143,39 @@ def test_execute_process_with_output_dictionary():
ls = LaunchService()
ls.include_launch_description(ld)
assert 0 == ls.run()


PYTHON_SCRIPT = """\
import time

while 1:
time.sleep(0.5)
"""


def test_kill_subprocesses():
"""Test launching a process with an environment variable."""
executable = ExecuteLocal(
process_description=Executable(
cmd=['python3', '-c', f'"{PYTHON_SCRIPT}"'],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sufficient to test the feature? Is it the shell=True part that makes this test useful?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shell=True will create a shell process, and that shell will create a subproceses.
So the launch process will have to kill the shell subprocess, because the shell will not trap the signals and resend them to the child.

This shows that the feature works.
It's not super complete though, if you have more test case ideas I can add them.

),
shell=True,
output='screen',
)
ld = LaunchDescription([executable])
ls = LaunchService()
ls.include_launch_description(ld)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
run_async_task = loop.create_task(ls.run_async())

async def wait_for_subprocesses():
start = time.time()
while len(psutil.Process().children(recursive=True)) != 2:
await asyncio.sleep(0.5)
assert time.time() < start + 5., 'timed out waiting for processes to setup'
wait_for_subprocesses_task = loop.create_task(wait_for_subprocesses())
loop.run_until_complete(wait_for_subprocesses_task)
os.kill(executable.process_details['pid'], signal.SIGTERM)
loop.run_until_complete(run_async_task)
assert len(psutil.Process().children(recursive=True)) == 0