Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pantsd] Robustify client connection logic. #5952

Merged
merged 2 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ def post_fork_child(self):
# Set context in the process title.
set_process_title('pantsd-runner [{}]'.format(' '.join(self._args)))

# Setup a SIGINT signal handler.
self._setup_sigint_handler()

# Broadcast our process group ID (in PID form - i.e. negated) to the remote client so
# they can send signals (e.g. SIGINT) to all processes in the runners process group.
NailgunProtocol.send_pid(self._socket, bytes(os.getpgrp() * -1))

# Setup a SIGINT signal handler.
self._setup_sigint_handler()

# Invoke a Pants run with stdio redirected and a proxied environment.
with self._nailgunned_stdio(self._socket) as finalizer, hermetic_environment_as(**self._env):
try:
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/bin/pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def run(self):
try:
return RemotePantsRunner(self._exiter, self._args, self._env, bootstrap_options).run()
except RemotePantsRunner.Fallback as e:
logger.debug('caught client exception: {!r}, falling back to non-daemon mode'.format(e))
logger.warn('caught client exception: {!r}, falling back to non-daemon mode'.format(e))

# N.B. Inlining this import speeds up the python thin client run by about 100ms.
from pants.bin.local_pants_runner import LocalPantsRunner
Expand Down
60 changes: 50 additions & 10 deletions src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ class RemotePantsRunner(object):
class Fallback(Exception):
"""Raised when fallback to an alternate execution mode is requested."""

class PortNotFound(Exception):
"""Raised when the pailgun port can't be found."""
class Terminated(Exception):
"""Raised when an active run is terminated mid-flight."""

PANTS_COMMAND = 'pants'
RECOVERABLE_EXCEPTIONS = (PortNotFound, NailgunClient.NailgunConnectionError)
RECOVERABLE_EXCEPTIONS = (
NailgunClient.NailgunConnectionError,
NailgunClient.NailgunExecutionError
)

def __init__(self, exiter, args, env, bootstrap_options, stdin=None, stdout=None, stderr=None):
"""
Expand Down Expand Up @@ -86,6 +89,44 @@ def _setup_logging(self):
root.setLevel(log_level)
root.addHandler(handler)

@staticmethod
def _backoff(attempt):
"""Minimal backoff strategy for daemon restarts."""
time.sleep(attempt + (attempt - 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there is any particular reason to use attempt + (attempt - 1)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just trying to keep the retry overhead to <10 seconds and this staggers nicely as 1, 3, 5 seconds for 3 retries. the backoff curve here needn't be as sharp as e.g. a distributed system where things like thundering herd are a thing.


def _run_pants_with_retry(self, port, retries=3):
"""Runs pants remotely with retry and recovery for nascent executions."""
attempt = 1
while 1:
logger.debug(
'connecting to pantsd on port {} (attempt {}/{})'.format(port, attempt, retries)
)
try:
return self._connect_and_execute(port)
except self.RECOVERABLE_EXCEPTIONS as e:
if attempt > retries:
raise self.Fallback(e)

self._backoff(attempt)
logger.warn(
'pantsd was unresponsive on port {}, retrying ({}/{})'
.format(port, attempt, retries)
)

# One possible cause of the daemon being non-responsive during an attempt might be if a
# another lifecycle operation is happening concurrently (incl teardown). To account for
# this, we won't begin attempting restarts until at least 1 second has passed (1 attempt).
if attempt > 1:
port = self._restart_pantsd()
Copy link
Contributor

@wisechengyi wisechengyi Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible there are dangling pantd processes that need to be killed before restarting one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PantsDaemon.launch() already calls ProcessManager.terminate() to handle that case here: https://github.com/pantsbuild/pants/blob/master/src/python/pants/pantsd/pants_daemon.py#L393

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. Thank you!

attempt += 1
except NailgunClient.NailgunError as e:
# Ensure a newline.
logger.fatal('')
logger.fatal('lost active connection to pantsd!')
raise self.Terminated, (
'abruptly lost active connection to pantsd runner: {!r}'.format(e)
), e.traceback

def _connect_and_execute(self, port):
# Merge the nailgun TTY capability environment variables with the passed environment dict.
ng_env = NailgunProtocol.isatty_to_env(self._stdin, self._stdout, self._stderr)
Expand All @@ -99,7 +140,8 @@ def _connect_and_execute(self, port):
ins=self._stdin,
out=self._stdout,
err=self._stderr,
exit_on_broken_pipe=True)
exit_on_broken_pipe=True,
expects_pid=True)

with self._trapped_signals(client), STTYSettings.preserved():
# Execute the command on the pailgun.
Expand All @@ -108,15 +150,13 @@ def _connect_and_execute(self, port):
# Exit.
self._exiter.exit(result)

def _restart_pantsd(self):
return PantsDaemon.Factory.restart(bootstrap_options=self._bootstrap_options)

def _maybe_launch_pantsd(self):
return PantsDaemon.Factory.maybe_launch(bootstrap_options=self._bootstrap_options)

def run(self, args=None):
self._setup_logging()
port = self._maybe_launch_pantsd()

logger.debug('connecting to pailgun on port {}'.format(port))
try:
self._connect_and_execute(port)
except self.RECOVERABLE_EXCEPTIONS as e:
raise self.Fallback(e)
self._run_pants_with_retry(port)
4 changes: 3 additions & 1 deletion src/python/pants/core_tasks/pantsd_kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class PantsDaemonKill(Task):

def execute(self):
try:
PantsDaemon.Factory.create(self.context.options, full_init=False).terminate()
pantsd = PantsDaemon.Factory.create(self.context.options, full_init=False)
with pantsd.lifecycle_lock:
pantsd.terminate()
except ProcessManager.NonResponsiveProcess as e:
raise TaskError('failure while terminating pantsd: {}'.format(e))
68 changes: 60 additions & 8 deletions src/python/pants/java/nailgun_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,38 @@ class NailgunClient(object):
class NailgunError(Exception):
"""Indicates an error interacting with a nailgun server."""

DESCRIPTION = 'Problem talking to nailgun server'

def __init__(self, address, pid, wrapped_exc, traceback):
self.address = address
self.pid = pid
self.wrapped_exc = wrapped_exc
self.traceback = traceback
super(NailgunClient.NailgunError, self).__init__(
'{} (address: {}{}): {!r}'
.format(
self.DESCRIPTION,
address,
', remote_pid: {}'.format(pid) if pid is not None else '',
self.wrapped_exc
)
)

class NailgunConnectionError(NailgunError):
"""Indicates an error upon initial connect to the nailgun server."""
DESCRIPTION = 'Problem connecting to nailgun server'

class NailgunExecutionError(NailgunError):
"""Indicates an error upon initial command execution on the nailgun server."""
DESCRIPTION = 'Problem executing command on nailgun server'

# For backwards compatibility with nails expecting the ng c client special env vars.
ENV_DEFAULTS = dict(NAILGUN_FILESEPARATOR=os.sep, NAILGUN_PATHSEPARATOR=os.pathsep)
DEFAULT_NG_HOST = '127.0.0.1'
DEFAULT_NG_PORT = 2113

def __init__(self, host=DEFAULT_NG_HOST, port=DEFAULT_NG_PORT, ins=sys.stdin, out=None, err=None,
workdir=None, exit_on_broken_pipe=False):
workdir=None, exit_on_broken_pipe=False, expects_pid=False):
"""Creates a nailgun client that can be used to issue zero or more nailgun commands.

:param string host: the nailgun server to contact (defaults to '127.0.0.1')
Expand All @@ -117,17 +139,30 @@ def __init__(self, host=DEFAULT_NG_HOST, port=DEFAULT_NG_PORT, ins=sys.stdin, ou
:param file out: a stream to write command standard output to (defaults to stdout)
:param file err: a stream to write command standard error to (defaults to stderr)
:param string workdir: the default working directory for all nailgun commands (defaults to CWD)
:param bool exit_on_broken_pipe: whether or not to exit when `Broken Pipe` errors are encountered.
:param bool exit_on_broken_pipe: whether or not to exit when `Broken Pipe` errors are encountered
:param bool expect_pid: Whether or not to expect a PID from the server (only true for pantsd)
"""
self._host = host
self._port = port
self._address = (host, port)
self._address_string = ':'.join(str(i) for i in self._address)
self._stdin = ins
self._stdout = out or sys.stdout
self._stderr = err or sys.stderr
self._workdir = workdir or os.path.abspath(os.path.curdir)
self._exit_on_broken_pipe = exit_on_broken_pipe
self._expects_pid = expects_pid
self._session = None

@property
def pid(self):
if not self._expects_pid:
return None
try:
return self._session.remote_pid
except AttributeError:
return None

def try_connect(self):
"""Creates a socket, connects it to the nailgun and returns the connected socket.

Expand All @@ -136,12 +171,16 @@ def try_connect(self):
"""
sock = RecvBufferedSocket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
try:
sock.connect((self._host, self._port))
sock.connect(self._address)
except (socket.error, socket.gaierror) as e:
logger.debug('Encountered socket exception {!r} when attempting connect to nailgun'.format(e))
sock.close()
raise self.NailgunConnectionError(
'Problem connecting to nailgun server at {}:{}: {!r}'.format(self._host, self._port, e))
address=self._address_string,
pid=self.pid,
wrapped_exc=e,
traceback=sys.exc_info()[2]
)
else:
return sock

Expand Down Expand Up @@ -178,11 +217,24 @@ def execute(self, main_class, cwd=None, *args, **environment):
try:
return self._session.execute(cwd, main_class, *args, **environment)
except socket.error as e:
raise self.NailgunError('Problem communicating with nailgun server at {}:{}: {!r}'
.format(self._host, self._port, e))
raise self.NailgunError(
address=self._address_string,
pid=self.pid,
wrapped_exc=e,
traceback=sys.exc_info()[2]
)
except NailgunProtocol.ProtocolError as e:
raise self.NailgunError('Problem in nailgun protocol with nailgun server at {}:{}: {!r}'
.format(self._host, self._port, e))
# If we get to a `ProtocolError` and we don't yet have a pid, then
# the daemon has not yet achieved a successful fork - so we can
# treat that as a separate, retryable error (usually indicating that
# the daemon is in a bad state).
exc_type = self.NailgunExecutionError if self.pid is None else self.NailgunError
raise exc_type(
address=self._address_string,
pid=self.pid,
wrapped_exc=e,
traceback=sys.exc_info()[2]
)
finally:
sock.close()
self._session = None
Expand Down
25 changes: 20 additions & 5 deletions src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ def maybe_launch(cls, bootstrap_options=None):
else:
return stub_pantsd.read_named_socket('pailgun', int)

@classmethod
def restart(cls, bootstrap_options=None):
"""Restarts a running daemon instance.

:param Options bootstrap_options: The bootstrap options, if available.
:returns: The pailgun port number of the new pantsd instance.
:rtype: int
"""
pantsd = cls.create(bootstrap_options)
with pantsd.lifecycle_lock:
# N.B. This will call `pantsd.terminate()` before starting.
return pantsd.launch()

@classmethod
def create(cls, bootstrap_options=None, full_init=True):
"""
Expand Down Expand Up @@ -402,11 +415,13 @@ def launch(self):
return listening_port

def terminate(self, include_watchman=True):
"""Terminates pantsd and watchman."""
with self.lifecycle_lock:
super(PantsDaemon, self).terminate()
if include_watchman:
self.watchman_launcher.terminate()
"""Terminates pantsd and watchman.

N.B. This should always be called under care of `self.lifecycle_lock`.
"""
super(PantsDaemon, self).terminate()
if include_watchman:
self.watchman_launcher.terminate()


def launch():
Expand Down
7 changes: 6 additions & 1 deletion tests/python/pants_test/java/test_nailgun_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ def test_execute(self, mock_session, mock_try_connect):
@mock.patch.object(NailgunClient, 'try_connect', **PATCH_OPTS)
@mock.patch('pants.java.nailgun_client.NailgunClientSession', **PATCH_OPTS)
def test_execute_propagates_connection_error_on_connect(self, mock_session, mock_try_connect):
mock_try_connect.side_effect = NailgunClient.NailgunConnectionError('oops')
mock_try_connect.side_effect = NailgunClient.NailgunConnectionError(
'127.0.0.1:31337',
31337,
Exception('oops'),
None
)

with self.assertRaises(NailgunClient.NailgunConnectionError):
self.nailgun_client.execute('test')
Expand Down