From f89adf93d46065f55fd023c79e33b125c7412e41 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Tue, 9 Nov 2021 18:28:43 +0000 Subject: [PATCH] devlib: Protect against too many BackgroundCommand Since some servers (SSH) limit the number of active commands, ensure that we will not go over the limit for both the user-API Target.background() and using Target.execute.asyn() either. Achieve that by partitionning the slots in two bins: * User calls to Target.background(). Going over the limit will raise a TooManyBackgroundCommandsError exception. * Internal uses in Target.execute.asyn(). The number of background commands is tracked using a semaphore, and it will fallback to the blocking path if necessary. --- devlib/connection.py | 113 ++++++++++++++++++++++++++++++++++- devlib/exception.py | 7 +++ devlib/target.py | 138 +++++++++++++++++++++++++++---------------- 3 files changed, 205 insertions(+), 53 deletions(-) diff --git a/devlib/connection.py b/devlib/connection.py index 4772bd48b..ce6c05dda 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -29,8 +29,9 @@ import logging import select import fcntl +import asyncio -from devlib.utils.misc import InitCheckpoint +from devlib.utils.misc import InitCheckpoint, memoized _KILL_TIMEOUT = 3 @@ -61,17 +62,127 @@ class ConnectionBase(InitCheckpoint): """ Base class for all connections. """ + + _MAX_BACKGROUND_CMD = 50 + """ + Check for up to this amount of available background commands. + """ + def __init__(self): self._current_bg_cmds = WeakSet() self._closed = False self._close_lock = threading.Lock() self.busybox = None + self._bg_spawn_lock = threading.RLock() def cancel_running_command(self): bg_cmds = set(self._current_bg_cmds) for bg_cmd in bg_cmds: bg_cmd.cancel() + @property + def _bg_async_sem(self): + # If we have not installed busybox yet, we won't be able to poll for + # the max number of background commands so we return a dummy semaphore. + if self.busybox is None: + return asyncio.BoundedSemaphore(1) + else: + return self._get_bg_async_sem() + + # Memoization ensures we will get the same semaphore back all the time. + @memoized + def _get_bg_async_sem(self): + # Ensure we have a running asyncio loop, otherwise the semaphore will + # be useless + asyncio.get_running_loop() + return asyncio.BoundedSemaphore(self._max_async_bg) + + @property + @memoized + def _max_bg(self): + """ + Find the maximum number of background command that can be spawned at + once on this connection. + + .. note:: This is done as a cached property so that it will only be + done on first use, leaving the opportunity to the caller to check + that the connection is in working order before we get here. + """ + # We need busybox in order to kill the commands we spawn + assert self.busybox is not None + + kill_list = [] + + # Keep a command alive for the whole duration of the probing so that we + # don't need to spawn a new command to kill the sleeps. Otherwise, we + # would end up not being able to since we reach the server's limit + killer_bg = self.background( + '{} xargs kill -9'.format(self.busybox), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + as_root=False, + ) + + try: + with killer_bg: + for i in range(self._MAX_BACKGROUND_CMD - 1): + try: + # Do not pollute the log with misleading connection + # failures. + logging.disable() + bg = self.background( + # Sleep for a year + '{} sleep 31536000'.format(self.busybox), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + as_root=False, + ) + # We reached the limit of commands to spawn + except Exception: + break + else: + kill_list.append(bg) + finally: + logging.disable(logging.NOTSET) + + to_kill = ' '.join( + # Prefix with "-" to kill the process group + '-{}'.format(bg.pid) + for bg in kill_list + ) + killer_bg.communicate(input=to_kill.encode()) + + finally: + for bg in kill_list: + bg.__exit__(None, None, None) + + # avoid off-by-one since range() starts from 0 + max_ = i + 1 + + # Add the killer_bg command + max_ = i + 1 + + # Ensure we can always do side things requiring a channel on some + # connections like connecting to SFTP or running synchronously a + # command. + max_ -= 1 + return max_ + + @property + @memoized + def _max_user_bg(self): + # Allocate half of the available background commands to the user, half + # for the exclusive use of _execute_async(). Partitioning is necessary + # to avoid deadlocks where the user would spawn a number of + # BackgroundCommand manually and then try to use an execute() which + # would require another command as well. + return int(self._max_bg / 2) + + @property + @memoized + def _max_async_bg(self): + return self._max_bg - self._max_user_bg + @abstractmethod def _close(self): """ diff --git a/devlib/exception.py b/devlib/exception.py index a7884c800..c1472d5c5 100644 --- a/devlib/exception.py +++ b/devlib/exception.py @@ -164,6 +164,13 @@ class KernelConfigKeyError(KeyError, IndexError, DevlibError): pass +class TooManyBackgroundCommandsError(DevlibError): + """ + Exception raised when too many background commands are started at once. + """ + pass + + def get_traceback(exc=None): """ Returns the string with the traceback for the specifiec exc diff --git a/devlib/target.py b/devlib/target.py index bfdeafa37..e0117cc9d 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -51,7 +51,8 @@ from devlib.exception import (DevlibTransientError, TargetStableError, TargetNotRespondingError, TimeoutError, TargetTransientError, KernelConfigKeyError, - TargetError, HostError, TargetCalledProcessError) # pylint: disable=redefined-builtin + TargetError, HostError, TargetCalledProcessError, + TooManyBackgroundCommandsError) # pylint: disable=redefined-builtin from devlib.utils.ssh import SshConnection from devlib.utils.android import AdbConnection, AndroidProperties, LogcatMonitor, adb_command, adb_disconnect, INTENT_FLAGS from devlib.utils.misc import memoized, isiterable, convert_new_lines, groupby_value @@ -370,6 +371,9 @@ def connect(self, timeout=None, check_boot_completed=True): self.execute('mkdir -p {}'.format(quote(self.executables_directory))) self.busybox = self.install(os.path.join(PACKAGE_BIN_DIRECTORY, self.abi, 'busybox'), timeout=30) self.conn.busybox = self.busybox + # Hit the cached property early but after having checked the connection + # works, and after having set self.busybox + self.conn._max_bg self.platform.update_from_target(self) self._update_modules('connected') if self.platform.big_core and self.load_default_modules: @@ -755,53 +759,70 @@ def _prepare_cmd(self, command, force_locale): @call_conn @asyn.asyncf async def _execute_async(self, command, timeout=None, as_root=False, strip_colors=True, will_succeed=False, check_exit_code=True, force_locale='C'): - bg = self.background( - command=command, - as_root=as_root, - force_locale=force_locale, - ) - - def process(streams): - # Make sure we don't accidentally end up with "\n" if both streams - # are empty - res = b'\n'.join(x for x in streams if x).decode() - if strip_colors: - res = strip_bash_colors(res) - return res - - def thread_f(): - streams = (None, None) - excep = None - try: - with bg as _bg: - streams = _bg.communicate(timeout=timeout) - except BaseException as e: - excep = e - - if isinstance(excep, subprocess.CalledProcessError): - if check_exit_code: - excep = TargetStableError(excep) - else: - streams = (excep.output, excep.stderr) - excep = None + sem = self.conn._bg_async_sem + # If there is no BackgroundCommand slot available, fall back on the + # blocking path. This ensures complete separation between the internal + # use of background() to provide the async API and the external uses of + # background() + if sem.locked(): + return self._execute( + command, + timeout=timeout, + as_root=as_root, + strip_colors=strip_colors, + will_succeed=will_succeed, + check_exit_code=check_exit_code, + force_locale='C' + ) + else: + async with sem: + bg = self.background( + command=command, + as_root=as_root, + force_locale=force_locale, + ) - if will_succeed and isinstance(excep, TargetStableError): - excep = TargetTransientError(excep) + def process(streams): + # Make sure we don't accidentally end up with "\n" if both streams + # are empty + res = b'\n'.join(x for x in streams if x).decode() + if strip_colors: + res = strip_bash_colors(res) + return res - if excep is None: - res = process(streams) - loop.call_soon_threadsafe(future.set_result, res) - else: - loop.call_soon_threadsafe(future.set_exception, excep) + def thread_f(loop, future): + streams = (None, None) + excep = None + try: + with bg as _bg: + streams = _bg.communicate(timeout=timeout) + except BaseException as e: + excep = e + + if isinstance(excep, subprocess.CalledProcessError): + if check_exit_code: + excep = TargetStableError(excep) + else: + streams = (excep.output, excep.stderr) + excep = None + + if will_succeed and isinstance(excep, TargetStableError): + excep = TargetTransientError(excep) + + if excep is None: + res = process(streams) + loop.call_soon_threadsafe(future.set_result, res) + else: + loop.call_soon_threadsafe(future.set_exception, excep) - loop = asyncio.get_running_loop() - future = asyncio.Future() - thread = threading.Thread( - target=thread_f, - daemon=True, - ) - thread.start() - return await future + future = asyncio.Future() + thread = threading.Thread( + target=thread_f, + args=(asyncio.get_running_loop(), future), + daemon=True, + ) + thread.start() + return await future @call_conn def _execute(self, command, timeout=None, check_exit_code=True, @@ -821,13 +842,26 @@ def _execute(self, command, timeout=None, check_exit_code=True, @call_conn def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False, force_locale='C', timeout=None): - command = self._prepare_cmd(command, force_locale) - bg_cmd = self.conn.background(command, stdout, stderr, as_root) - if timeout is not None: - timer = threading.Timer(timeout, function=bg_cmd.cancel) - timer.daemon = True - timer.start() - return bg_cmd + conn = self.conn + # Make sure only one thread tries to spawn a background command at the + # same time, so the count of _current_bg_cmds is accurate. + with conn._bg_spawn_lock: + alive = list(conn._current_bg_cmds) + alive = [bg for bg in alive if bg.poll() is None] + # Since the async path self-regulates using a + # asyncio.BoundedSemaphore(), going over the combined max means the + # culprit is the user spawning too many BackgroundCommand. + if len(alive) >= conn._max_bg: + raise TooManyBackgroundCommandsError( + '{} sessions allowed for one connection on this server. Modify MaxSessions parameter for OpenSSH to allow more.'.format(conn._max_bg)) + + command = self._prepare_cmd(command, force_locale) + bg_cmd = self.conn.background(command, stdout, stderr, as_root) + if timeout is not None: + timer = threading.Timer(timeout, function=bg_cmd.cancel) + timer.daemon = True + timer.start() + return bg_cmd def invoke(self, binary, args=None, in_directory=None, on_cpus=None, redirect_stderr=False, as_root=False, timeout=30):