Skip to content

Commit

Permalink
Use futures select for stopping gRPC nodes (#985)
Browse files Browse the repository at this point in the history
This change:
- Adds futures select for Tokio Runtime
- Temporary disables gRPC client tests in `abitest` since they now require TLS certificates (will be re-enabled after examples start using Rust Loader)

Fixes #982
  • Loading branch information
ipetr0v authored May 14, 2020
1 parent e2a0fdd commit 6312906
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 239 deletions.
4 changes: 2 additions & 2 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ steps:
entrypoint: 'bash'
args: ['./scripts/run_tests']

# TODO(942): Reenable `run_tests_tsan`.
# TODO(#942): Reenable `run_tests_tsan`.
# - name: 'gcr.io/oak-ci/oak:latest'
# id: run_tests_tsan
# waitFor: ['run_tests']
Expand All @@ -88,7 +88,7 @@ steps:
timeout: 60m
entrypoint: 'bash'
args: ['./scripts/run_examples', '-s', 'base']
# TODO(942): Reenable `run_examples` with `asan` and `tsan`.
# TODO(#942): Reenable `run_examples` with `asan` and `tsan`.
# - name: 'gcr.io/oak-ci/oak:latest'
# id: run_examples_asan
# waitFor: ['run_examples']
Expand Down
326 changes: 167 additions & 159 deletions examples/abitest/module_0/rust/src/lib.rs

Large diffs are not rendered by default.

62 changes: 29 additions & 33 deletions oak/server/rust/oak_runtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use crate::{
node,
node::load_wasm,
node::{check_uri, load_certificate, load_wasm},
proto::oak::application::{
node_configuration::ConfigType, ApplicationConfiguration, GrpcServerConfiguration,
LogConfiguration, NodeConfiguration, WebAssemblyConfiguration,
node_configuration::ConfigType, ApplicationConfiguration, GrpcClientConfiguration,
GrpcServerConfiguration, LogConfiguration, NodeConfiguration, WebAssemblyConfiguration,
},
runtime, RuntimeProxy,
};
Expand Down Expand Up @@ -99,36 +99,32 @@ pub fn from_protobuf(
})?,
tls_identity: Identity::from_pem(grpc_tls_certificate, grpc_tls_private_key),
},
// TODO(#982): Currently Runtime uses the C++ version of gRPC client.
// Uncomment after refactoring thread creation and ensuring that threads that use
// Tokio Runtime can be successfully stopped.
//
// Some(ConfigType::GrpcClientConfig(GrpcClientConfiguration {
// uri,
// root_tls_certificate,
// address,
// })) => node::Configuration::GrpcClientNode {
// uri: uri
// .parse()
// .map_err(|error| {
// error!("Error parsing URI {}: {:?}", uri, error);
// OakStatus::ErrInvalidArgs
// })
// .and_then(|uri| match check_uri(&uri) {
// Ok(_) => Ok(uri),
// Err(error) => {
// error!("Incorrect URI {}: {:?}", uri, error);
// Err(OakStatus::ErrInvalidArgs)
// }
// })?,
// root_tls_certificate: load_certificate(root_tls_certificate).map_err(
// |error| {
// error!("Error loading root certificate: {:?}", error);
// OakStatus::ErrInvalidArgs
// },
// )?,
// address: address.to_string(),
// },
Some(ConfigType::GrpcClientConfig(GrpcClientConfiguration {
uri,
root_tls_certificate,
address,
})) => node::Configuration::GrpcClientNode {
uri: uri
.parse()
.map_err(|error| {
error!("Error parsing URI {}: {:?}", uri, error);
OakStatus::ErrInvalidArgs
})
.and_then(|uri| match check_uri(&uri) {
Ok(_) => Ok(uri),
Err(error) => {
error!("Incorrect URI {}: {:?}", uri, error);
Err(OakStatus::ErrInvalidArgs)
}
})?,
root_tls_certificate: load_certificate(root_tls_certificate).map_err(
|error| {
error!("Error loading root certificate: {:?}", error);
OakStatus::ErrInvalidArgs
},
)?,
address: address.to_string(),
},
Some(ConfigType::WasmConfig(WebAssemblyConfiguration { module_bytes, .. })) => {
load_wasm(&module_bytes).map_err(|error| {
error!("Error loading Wasm module: {}", error);
Expand Down
8 changes: 7 additions & 1 deletion oak/server/rust/oak_runtime/src/node/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{NodeId, RuntimeProxy};
use lazy_static::lazy_static;
use log::info;
use std::{sync::RwLock, thread};
use tokio::sync::oneshot;

/// Function pointer type for callbacks from Rust code into C/C++ code for
/// pseudo-Node creation.
Expand Down Expand Up @@ -54,7 +55,12 @@ impl PseudoNode {
}

impl super::Node for PseudoNode {
fn run(self: Box<Self>, runtime: RuntimeProxy, handle: oak_abi::Handle) {
fn run(
self: Box<Self>,
runtime: RuntimeProxy,
handle: oak_abi::Handle,
_notify_receiver: oneshot::Receiver<()>,
) {
let factory_fn: NodeFactory = FACTORY
.read()
.expect("unlock failed")
Expand Down
18 changes: 12 additions & 6 deletions oak/server/rust/oak_runtime/src/node/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use oak_abi::{
proto::oak::encap::{GrpcRequest, GrpcResponse},
Handle, OakStatus,
};
use tokio::sync::oneshot;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Uri};

/// Struct that represents a gRPC client pseudo-Node.
Expand Down Expand Up @@ -100,7 +101,12 @@ impl GrpcClientNode {

/// Oak Node implementation for the gRPC client pseudo-Node.
impl Node for GrpcClientNode {
fn run(self: Box<Self>, runtime: RuntimeProxy, handle: Handle) {
fn run(
self: Box<Self>,
runtime: RuntimeProxy,
handle: Handle,
notify_receiver: oneshot::Receiver<()>,
) {
// Create an Async runtime for executing futures.
// https://docs.rs/tokio/
let mut async_runtime = tokio::runtime::Builder::new()
Expand All @@ -121,11 +127,11 @@ impl Node for GrpcClientNode {
"{}: Starting gRPC client pseudo-Node thread",
self.node_name
);
let result = async_runtime.block_on(self.handle_loop(runtime, handle));
info!(
"{}: Exiting gRPC client pseudo-Node thread {:?}",
self.node_name, result
);
async_runtime.block_on(futures::future::select(
Box::pin(self.handle_loop(runtime, handle)),
notify_receiver,
));
info!("{}: Exiting gRPC client pseudo-Node thread", self.node_name);
}
}

Expand Down
14 changes: 12 additions & 2 deletions oak/server/rust/oak_runtime/src/node/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::{
net::SocketAddr,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tonic::{
codegen::BoxFuture,
server::{Grpc, UnaryService},
Expand Down Expand Up @@ -106,7 +107,12 @@ impl GrpcServerNode {

/// Oak Node implementation for the gRPC server.
impl Node for GrpcServerNode {
fn run(self: Box<Self>, runtime: RuntimeProxy, handle: oak_abi::Handle) {
fn run(
self: Box<Self>,
runtime: RuntimeProxy,
handle: oak_abi::Handle,
notify_receiver: oneshot::Receiver<()>,
) {
// Receive a `channel_writer` handle used to pass handles for temporary channels.
info!("{}: Waiting for a channel writer", self.node_name);
let channel_writer = GrpcServerNode::get_channel_writer(&runtime, handle)
Expand All @@ -122,7 +128,10 @@ impl Node for GrpcServerNode {
let server = tonic::transport::Server::builder()
.tls_config(tonic::transport::ServerTlsConfig::new().identity(self.tls_identity))
.add_service(handler)
.serve(self.address);
.serve_with_shutdown(self.address, async {
// Treat notification failure the same as a notification.
let _ = notify_receiver.await;
});

// Create an Async runtime for executing futures.
// https://docs.rs/tokio/
Expand All @@ -149,6 +158,7 @@ impl Node for GrpcServerNode {
"{}: Exiting gRPC server pseudo-Node thread {:?}",
self.node_name, result
);
info!("{}: Exiting gRPC server pseudo-Node thread", self.node_name);
}
}

Expand Down
8 changes: 7 additions & 1 deletion oak/server/rust/oak_runtime/src/node/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use log::{error, info, log};
use oak_abi::proto::oak::log::{Level, LogMessage};
use prost::Message;
use std::string::String;
use tokio::sync::oneshot;

/// Logging pseudo-Node.
pub struct LogNode {
Expand All @@ -37,7 +38,12 @@ impl LogNode {
impl super::Node for LogNode {
/// Main execution loop for the logging pseudo-Node just waits for incoming
/// `LogMessage`s and outputs them.
fn run(self: Box<Self>, runtime: RuntimeProxy, handle: oak_abi::Handle) {
fn run(
self: Box<Self>,
runtime: RuntimeProxy,
handle: oak_abi::Handle,
_notify_receiver: oneshot::Receiver<()>,
) {
loop {
// An error indicates the Runtime is terminating. We ignore it here and keep trying to
// read in case a Wasm Node wants to emit remaining messages. We will return
Expand Down
13 changes: 11 additions & 2 deletions oak/server/rust/oak_runtime/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{
string::String,
sync::Arc,
};
use tokio::sync::oneshot;
use tonic::transport::{Certificate, Identity, Uri};

pub mod external;
Expand All @@ -30,9 +31,17 @@ mod wasm;

/// Trait encapsulating execution of a Node or pseudo-Node.
pub trait Node {
/// Execute the Node, using the provided `Runtime` reference and initial handle. The method
/// Execute the Node, using the provided `Runtime` reference and initial handle. The method
/// should continue execution until the Node terminates.
fn run(self: Box<Self>, runtime: RuntimeProxy, handle: oak_abi::Handle);
///
/// `notify_receiver` receives a notification from the Runtime upon termination. This
/// notification can be used by the Node to gracefully shut down.
fn run(
self: Box<Self>,
runtime: RuntimeProxy,
handle: oak_abi::Handle,
notify_receiver: oneshot::Receiver<()>,
);
}

/// A [`Configuration`] corresponds to a [`NodeConfiguration`] protobuf message, with parsed
Expand Down
8 changes: 7 additions & 1 deletion oak/server/rust/oak_runtime/src/node/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use log::{debug, error, info, warn};
use oak_abi::{label::Label, ChannelReadStatus, OakStatus};
use rand::RngCore;
use std::{string::String, sync::Arc};
use tokio::sync::oneshot;
use wasmi::ValueType;

#[cfg(test)]
Expand Down Expand Up @@ -815,7 +816,12 @@ impl WasmNode {

impl super::Node for WasmNode {
/// Runs this instance of a Wasm Node.
fn run(self: Box<Self>, runtime: RuntimeProxy, handle: oak_abi::Handle) {
fn run(
self: Box<Self>,
runtime: RuntimeProxy,
handle: oak_abi::Handle,
_notify_receiver: oneshot::Receiver<()>,
) {
debug!(
"{}: running entrypoint '{}'",
self.node_name, self.entrypoint
Expand Down
Loading

0 comments on commit 6312906

Please sign in to comment.