Skip to content

Commit

Permalink
devlib: Protect against too many BackgroundCommand
Browse files Browse the repository at this point in the history
Since some servers (SSH) limit the number of active commands, ensure
that we will not go over the limit for both the user-API
Target.background() and using Target.execute.asyn() either.

Achieve that by partitionning the slots in two bins:
    * User calls to Target.background(). Going over the limit will raise
      a TooManyBackgroundCommandsError exception.
    * Internal uses in Target.execute.asyn(). The number of background
      commands is tracked using a semaphore, and it will fallback to the
      blocking path if necessary.
  • Loading branch information
douglas-raillard-arm committed Nov 11, 2021
1 parent f6fc902 commit f89adf9
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 53 deletions.
113 changes: 112 additions & 1 deletion devlib/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
import logging
import select
import fcntl
import asyncio

from devlib.utils.misc import InitCheckpoint
from devlib.utils.misc import InitCheckpoint, memoized

_KILL_TIMEOUT = 3

Expand Down Expand Up @@ -61,17 +62,127 @@ class ConnectionBase(InitCheckpoint):
"""
Base class for all connections.
"""

_MAX_BACKGROUND_CMD = 50
"""
Check for up to this amount of available background commands.
"""

def __init__(self):
self._current_bg_cmds = WeakSet()
self._closed = False
self._close_lock = threading.Lock()
self.busybox = None
self._bg_spawn_lock = threading.RLock()

def cancel_running_command(self):
bg_cmds = set(self._current_bg_cmds)
for bg_cmd in bg_cmds:
bg_cmd.cancel()

@property
def _bg_async_sem(self):
# If we have not installed busybox yet, we won't be able to poll for
# the max number of background commands so we return a dummy semaphore.
if self.busybox is None:
return asyncio.BoundedSemaphore(1)
else:
return self._get_bg_async_sem()

# Memoization ensures we will get the same semaphore back all the time.
@memoized
def _get_bg_async_sem(self):
# Ensure we have a running asyncio loop, otherwise the semaphore will
# be useless
asyncio.get_running_loop()
return asyncio.BoundedSemaphore(self._max_async_bg)

@property
@memoized
def _max_bg(self):
"""
Find the maximum number of background command that can be spawned at
once on this connection.
.. note:: This is done as a cached property so that it will only be
done on first use, leaving the opportunity to the caller to check
that the connection is in working order before we get here.
"""
# We need busybox in order to kill the commands we spawn
assert self.busybox is not None

kill_list = []

# Keep a command alive for the whole duration of the probing so that we
# don't need to spawn a new command to kill the sleeps. Otherwise, we
# would end up not being able to since we reach the server's limit
killer_bg = self.background(
'{} xargs kill -9'.format(self.busybox),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
as_root=False,
)

try:
with killer_bg:
for i in range(self._MAX_BACKGROUND_CMD - 1):
try:
# Do not pollute the log with misleading connection
# failures.
logging.disable()
bg = self.background(
# Sleep for a year
'{} sleep 31536000'.format(self.busybox),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
as_root=False,
)
# We reached the limit of commands to spawn
except Exception:
break
else:
kill_list.append(bg)
finally:
logging.disable(logging.NOTSET)

to_kill = ' '.join(
# Prefix with "-" to kill the process group
'-{}'.format(bg.pid)
for bg in kill_list
)
killer_bg.communicate(input=to_kill.encode())

finally:
for bg in kill_list:
bg.__exit__(None, None, None)

# avoid off-by-one since range() starts from 0
max_ = i + 1

# Add the killer_bg command
max_ = i + 1

# Ensure we can always do side things requiring a channel on some
# connections like connecting to SFTP or running synchronously a
# command.
max_ -= 1
return max_

@property
@memoized
def _max_user_bg(self):
# Allocate half of the available background commands to the user, half
# for the exclusive use of _execute_async(). Partitioning is necessary
# to avoid deadlocks where the user would spawn a number of
# BackgroundCommand manually and then try to use an execute() which
# would require another command as well.
return int(self._max_bg / 2)

@property
@memoized
def _max_async_bg(self):
return self._max_bg - self._max_user_bg

@abstractmethod
def _close(self):
"""
Expand Down
7 changes: 7 additions & 0 deletions devlib/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ class KernelConfigKeyError(KeyError, IndexError, DevlibError):
pass


class TooManyBackgroundCommandsError(DevlibError):
"""
Exception raised when too many background commands are started at once.
"""
pass


def get_traceback(exc=None):
"""
Returns the string with the traceback for the specifiec exc
Expand Down
138 changes: 86 additions & 52 deletions devlib/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
from devlib.exception import (DevlibTransientError, TargetStableError,
TargetNotRespondingError, TimeoutError,
TargetTransientError, KernelConfigKeyError,
TargetError, HostError, TargetCalledProcessError) # pylint: disable=redefined-builtin
TargetError, HostError, TargetCalledProcessError,
TooManyBackgroundCommandsError) # pylint: disable=redefined-builtin
from devlib.utils.ssh import SshConnection
from devlib.utils.android import AdbConnection, AndroidProperties, LogcatMonitor, adb_command, adb_disconnect, INTENT_FLAGS
from devlib.utils.misc import memoized, isiterable, convert_new_lines, groupby_value
Expand Down Expand Up @@ -370,6 +371,9 @@ def connect(self, timeout=None, check_boot_completed=True):
self.execute('mkdir -p {}'.format(quote(self.executables_directory)))
self.busybox = self.install(os.path.join(PACKAGE_BIN_DIRECTORY, self.abi, 'busybox'), timeout=30)
self.conn.busybox = self.busybox
# Hit the cached property early but after having checked the connection
# works, and after having set self.busybox
self.conn._max_bg
self.platform.update_from_target(self)
self._update_modules('connected')
if self.platform.big_core and self.load_default_modules:
Expand Down Expand Up @@ -755,53 +759,70 @@ def _prepare_cmd(self, command, force_locale):
@call_conn
@asyn.asyncf
async def _execute_async(self, command, timeout=None, as_root=False, strip_colors=True, will_succeed=False, check_exit_code=True, force_locale='C'):
bg = self.background(
command=command,
as_root=as_root,
force_locale=force_locale,
)

def process(streams):
# Make sure we don't accidentally end up with "\n" if both streams
# are empty
res = b'\n'.join(x for x in streams if x).decode()
if strip_colors:
res = strip_bash_colors(res)
return res

def thread_f():
streams = (None, None)
excep = None
try:
with bg as _bg:
streams = _bg.communicate(timeout=timeout)
except BaseException as e:
excep = e

if isinstance(excep, subprocess.CalledProcessError):
if check_exit_code:
excep = TargetStableError(excep)
else:
streams = (excep.output, excep.stderr)
excep = None
sem = self.conn._bg_async_sem
# If there is no BackgroundCommand slot available, fall back on the
# blocking path. This ensures complete separation between the internal
# use of background() to provide the async API and the external uses of
# background()
if sem.locked():
return self._execute(
command,
timeout=timeout,
as_root=as_root,
strip_colors=strip_colors,
will_succeed=will_succeed,
check_exit_code=check_exit_code,
force_locale='C'
)
else:
async with sem:
bg = self.background(
command=command,
as_root=as_root,
force_locale=force_locale,
)

if will_succeed and isinstance(excep, TargetStableError):
excep = TargetTransientError(excep)
def process(streams):
# Make sure we don't accidentally end up with "\n" if both streams
# are empty
res = b'\n'.join(x for x in streams if x).decode()
if strip_colors:
res = strip_bash_colors(res)
return res

if excep is None:
res = process(streams)
loop.call_soon_threadsafe(future.set_result, res)
else:
loop.call_soon_threadsafe(future.set_exception, excep)
def thread_f(loop, future):
streams = (None, None)
excep = None
try:
with bg as _bg:
streams = _bg.communicate(timeout=timeout)
except BaseException as e:
excep = e

if isinstance(excep, subprocess.CalledProcessError):
if check_exit_code:
excep = TargetStableError(excep)
else:
streams = (excep.output, excep.stderr)
excep = None

if will_succeed and isinstance(excep, TargetStableError):
excep = TargetTransientError(excep)

if excep is None:
res = process(streams)
loop.call_soon_threadsafe(future.set_result, res)
else:
loop.call_soon_threadsafe(future.set_exception, excep)

loop = asyncio.get_running_loop()
future = asyncio.Future()
thread = threading.Thread(
target=thread_f,
daemon=True,
)
thread.start()
return await future
future = asyncio.Future()
thread = threading.Thread(
target=thread_f,
args=(asyncio.get_running_loop(), future),
daemon=True,
)
thread.start()
return await future

@call_conn
def _execute(self, command, timeout=None, check_exit_code=True,
Expand All @@ -821,13 +842,26 @@ def _execute(self, command, timeout=None, check_exit_code=True,
@call_conn
def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False,
force_locale='C', timeout=None):
command = self._prepare_cmd(command, force_locale)
bg_cmd = self.conn.background(command, stdout, stderr, as_root)
if timeout is not None:
timer = threading.Timer(timeout, function=bg_cmd.cancel)
timer.daemon = True
timer.start()
return bg_cmd
conn = self.conn
# Make sure only one thread tries to spawn a background command at the
# same time, so the count of _current_bg_cmds is accurate.
with conn._bg_spawn_lock:
alive = list(conn._current_bg_cmds)
alive = [bg for bg in alive if bg.poll() is None]
# Since the async path self-regulates using a
# asyncio.BoundedSemaphore(), going over the combined max means the
# culprit is the user spawning too many BackgroundCommand.
if len(alive) >= conn._max_bg:
raise TooManyBackgroundCommandsError(
'{} sessions allowed for one connection on this server. Modify MaxSessions parameter for OpenSSH to allow more.'.format(conn._max_bg))

command = self._prepare_cmd(command, force_locale)
bg_cmd = self.conn.background(command, stdout, stderr, as_root)
if timeout is not None:
timer = threading.Timer(timeout, function=bg_cmd.cancel)
timer.daemon = True
timer.start()
return bg_cmd

def invoke(self, binary, args=None, in_directory=None, on_cpus=None,
redirect_stderr=False, as_root=False, timeout=30):
Expand Down

0 comments on commit f89adf9

Please sign in to comment.