Skip to content

Commit

Permalink
WIP Port nailgun client to rust
Browse files Browse the repository at this point in the history
  • Loading branch information
gshuflin committed Oct 1, 2020
1 parent 09da7e5 commit 398d719
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 29 deletions.
12 changes: 10 additions & 2 deletions src/python/pants/base/exception_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,28 @@ def __init__(self, *, pantsd_instance: bool):

def _handle_sigint_if_enabled(self, signum: int, _frame):
with self._ignore_sigint_lock:
logger.warning(f"Handling signint with handler {self}")
logger.warning(f"ignoring sigint? {self._ignoring_sigint}")
if not self._ignoring_sigint:
logger.warning("Yup we made it down past this check")
self.handle_sigint(signum, _frame)

def _toggle_ignoring_sigint(self, toggle: bool) -> None:
if not self._pantsd_instance:
with self._ignore_sigint_lock:
self._ignoring_sigint = toggle

def handle_sigint(self, signum: int, _frame):
def _send_signal_to_children(self, _received_signal: int) -> None:
self_process = psutil.Process()
children = self_process.children()
logger.debug(f"Sending SIGINT to child processes: {children}")
logger.warning(f"Sending SIGINT to child processes: {children}")
for child_process in children:
child_process.send_signal(signal.SIGINT)

def handle_sigint(self, signum: int, _frame):
logger.warning("Calling handle_sigint in pantsd")
ExceptionSink._signal_sent = signum
self._send_signal_to_children(signum)
raise KeyboardInterrupt("User interrupted execution with control-c!")

# TODO(#7406): figure out how to let sys.exit work in a signal handler instead of having to raise
Expand All @@ -93,10 +99,12 @@ def __init__(self, signum, signame):

def handle_sigquit(self, signum, _frame):
ExceptionSink._signal_sent = signum
self._send_signal_to_children(signum)
raise self.SignalHandledNonLocalExit(signum, "SIGQUIT")

def handle_sigterm(self, signum, _frame):
ExceptionSink._signal_sent = signum
self._send_signal_to_children(signum)
raise self.SignalHandledNonLocalExit(signum, "SIGTERM")


Expand Down
40 changes: 23 additions & 17 deletions src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import psutil
import logging
import sys
import termios
Expand All @@ -10,6 +11,7 @@

from pants.base.exception_sink import ExceptionSink, SignalHandler
from pants.base.exiter import ExitCode
from pants.engine.internals.native import Native
from pants.nailgun.nailgun_client import NailgunClient
from pants.nailgun.nailgun_protocol import NailgunProtocol
from pants.option.options_bootstrapper import OptionsBootstrapper
Expand Down Expand Up @@ -53,24 +55,15 @@ def restore_tty_flags(self):


class PailgunClientSignalHandler(SignalHandler):
def __init__(self, pailgun_client: NailgunClient, pid: int, timeout: float = 1):
self._pailgun_client = pailgun_client
self._timeout = timeout
def __init__(self, pid: int):
self.pid = pid
super().__init__(pantsd_instance=False)

def _forward_signal_with_timeout(self, signum, signame):
# TODO Consider not accessing the private function _maybe_last_pid here, or making it public.
logger.info(
"Sending {} to pantsd with pid {}, waiting up to {} seconds before sending SIGKILL...".format(
signame, self.pid, self._timeout
)
)
self._pailgun_client.set_exit_timeout(
timeout=self._timeout,
reason=KeyboardInterrupt("Sending user interrupt to pantsd"),
)
self._pailgun_client.maybe_send_signal(signum)
ExceptionSink._signal_sent = signum
logger.info(f"Sending {signame} to pantsd with pid {self.pid}")
pantsd_process = psutil.Process(pid=self.pid)
pantsd_process.send_signal(signum)

def handle_sigint(self, signum, _frame):
self._forward_signal_with_timeout(signum, "SIGINT")
Expand Down Expand Up @@ -159,6 +152,9 @@ def run(self) -> ExitCode:
raise self._extract_remote_exception(pantsd_handle.pid, e).with_traceback(traceback)

def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitCode:

native = Native()

port = pantsd_handle.port
pid = pantsd_handle.pid

Expand Down Expand Up @@ -186,11 +182,21 @@ def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitC
metadata_base_dir=pantsd_handle.metadata_base_dir,
)

command = self._args[0]
args = self._args[1:]

def signal_fn() -> bool:
signal_sent = ExceptionSink.signal_sent()
return ExceptionSink.signal_sent() is not None

rust_client = native.new_nailgun_client()
logger.info(f"rust client: {rust_client}")

timeout = global_options.pantsd_pailgun_quit_timeout
pantsd_signal_handler = PailgunClientSignalHandler(client, pid=pid, timeout=timeout)
pantsd_signal_handler = PailgunClientSignalHandler(pid=pid)

with ExceptionSink.trapped_signals(pantsd_signal_handler), STTYSettings.preserved():
# Execute the command on the pailgun.
return client.execute(self._args[0], self._args[1:], modified_env)
return rust_client.execute(signal_fn, port, command, args, modified_env)

def _extract_remote_exception(self, pantsd_pid, nailgun_error):
"""Given a NailgunError, returns a Terminated exception with additional info (where
Expand Down
6 changes: 5 additions & 1 deletion src/python/pants/engine/internals/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import logging
import os
from typing import Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast
from typing import Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast

from typing_extensions import Protocol

Expand All @@ -17,6 +17,7 @@
PyGeneratorResponseBreak,
PyGeneratorResponseGet,
PyGeneratorResponseGetMulti,
PyNailgunClient,
PyNailgunServer,
PyRemotingOptions,
PyScheduler,
Expand Down Expand Up @@ -196,6 +197,9 @@ def new_nailgun_server(self, port: int, runner: RawFdRunner) -> PyNailgunServer:
"""
return cast(PyNailgunServer, self.lib.nailgun_server_create(self._executor, port, runner))

def new_nailgun_client(self) -> PyNailgunClient:
return cast(PyNailgunClient, self.lib.nailgun_client_create(self._executor))

def new_tasks(self) -> PyTasks:
return PyTasks()

Expand Down
109 changes: 106 additions & 3 deletions src/rust/engine/nailgun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
// Arc<Mutex> can be more clear than needing to grok Orderings:
#![allow(clippy::mutex_atomic)]
#![type_length_limit = "2058438"]
#![allow(dead_code)]
#![allow(unused_variables)]

#[cfg(test)]
mod tests;
Expand All @@ -37,20 +39,121 @@ use std::net::Ipv4Addr;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;

use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt, TryStreamExt};
use log::{debug, error, info};
use futures::{future, sink, stream, FutureExt, SinkExt, Stream, StreamExt, TryStreamExt};
use log::{debug, error, info, warn};
pub use nails::execution::ExitCode;
use nails::execution::{self, sink_for, stream_for, ChildInput, ChildOutput};
use nails::{Child, Nail};
use nails::{Child, Config, Nail};
use tokio::fs::File;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::{Notify, RwLock};

use task_executor::Executor;

async fn handle_client_output(
mut stdio_read: impl Stream<Item = ChildOutput> + Unpin,
) -> Result<(), io::Error> {
let mut stdout = tokio::io::stdout();
let mut stderr = tokio::io::stderr();
while let Some(output) = stdio_read.next().await {
match output {
ChildOutput::Stdout(bytes) => stdout.write_all(&bytes).await?,
ChildOutput::Stderr(bytes) => stderr.write_all(&bytes).await?,
ChildOutput::Exit(_) => {
// NB: We ignore exit here and allow the main thread to handle exiting.
break;
}
}
}
Ok(())
}

async fn handle_client_input(mut stdin_write: mpsc::Sender<ChildInput>) -> Result<(), io::Error> {
use nails::execution::send_to_io;
let mut stdin = stream_for(tokio::io::stdin());
while let Some(input_bytes) = stdin.next().await {
stdin_write
.send(ChildInput::Stdin(input_bytes?))
.await
.map_err(send_to_io)?;
}
stdin_write
.send(ChildInput::StdinEOF)
.await
.map_err(send_to_io)?;
Ok(())
}

async fn client_execute_helper(
port: u16,
command: String,
args: Vec<String>,
env: Vec<(String, String)>,
) -> Result<i32, ()> {
use nails::execution::{child_channel, Command};

let config = Config::default();
let command = Command {
command,
args,
env,
working_dir: std::path::PathBuf::from("/dev/null"),
};

let (stdio_write, stdio_read) = child_channel::<ChildOutput>();
let (stdin_write, stdin_read) = child_channel::<ChildInput>();

let output_handler = tokio::spawn(handle_client_output(stdio_read));
let _input_handler = tokio::spawn(handle_client_input(stdin_write));

let localhost = std::net::Ipv4Addr::new(127, 0, 0, 1);
let addr = (localhost, port);

let socket = TcpStream::connect(addr).await.unwrap();
let exit_code: ExitCode =
nails::client_handle_connection(config, socket, command, stdio_write, stdin_read)
.await
.unwrap();

warn!("Exit code from internal future is: {:?}", exit_code);

output_handler.await.unwrap().unwrap();
Ok(exit_code.0)
}

pub async fn client_execute(
port: u16,
command: String,
args: Vec<String>,
env: Vec<(String, String)>,
exit_receiver: oneshot::Receiver<()>,
) -> Result<i32, ()> {
use future::Either;

let execution_future = client_execute_helper(port, command, args, env).boxed();

match future::select(execution_future, exit_receiver).await {
Either::Left((execution_result, _exit_receiver_fut)) => {
warn!(
"Exiting the future because it's done w/ result: {:?}",
execution_result
);
execution_result
}
Either::Right((exited, execution_result_future)) => {
warn!("Exiting the future because told to quit");
std::mem::drop(execution_result_future);
warn!("Exited: {:?} ", exited);
Err(())
}
}
}

pub struct Server {
exit_sender: oneshot::Sender<()>,
exited_receiver: oneshot::Receiver<Result<(), String>>,
Expand Down
Loading

0 comments on commit 398d719

Please sign in to comment.