Skip to content

Commit

Permalink
feat(pd): report bind errors on startup
Browse files Browse the repository at this point in the history
A common problem for node operators is sorting out which ports are
required to be available on the system prior to running. We've recently
improved the documentation to be more explicit about this [0], but here
we add explicit error messages when binds fail. This addition should
make reasoning about network surface more straightforward, and eliminate
guessing games to figure out which port needs to be overridden or freed.

[0] c0445f3
  • Loading branch information
conorsch authored and hdevalence committed Jan 4, 2024
1 parent 9933baa commit 8680e83
Showing 1 changed file with 72 additions and 28 deletions.
100 changes: 72 additions & 28 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{net::SocketAddr, path::PathBuf};
use console_subscriber::ConsoleLayer;
use metrics_tracing_context::{MetricsLayer, TracingContextLayer};
use metrics_util::layers::Stack;
use std::error::Error;

use anyhow::Context;
use clap::{Parser, Subcommand};
Expand Down Expand Up @@ -245,26 +246,28 @@ enum TestnetCommand {

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Validate options immediately.
let opt = Opt::parse();

// Instantiate tracing layers.
// The MetricsLayer handles enriching metrics output with labels from tracing spans.
let metrics_layer = MetricsLayer::new();
// The ConsoleLayer enables collection of data for `tokio-console`.
let console_layer = ConsoleLayer::builder().with_default_env().spawn();
// The `FmtLayer` is used to print to the console.
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(atty::is(atty::Stream::Stdout))
.with_target(true);
// The `EnvFilter` layer is used to filter events based on `RUST_LOG`.
let filter_layer = EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("info"))?;

let opt = Opt::parse();

// Register the tracing subscribers, conditionally enabling tokio console support
let registry = tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.with(metrics_layer);
if opt.tokio_console {
// The ConsoleLayer enables collection of data for `tokio-console`.
// The `spawn` call will panic if AddrInUse, so we only spawn if enabled.
let console_layer = ConsoleLayer::builder().with_default_env().spawn();
registry.with(console_layer).init();
} else {
registry.init();
Expand All @@ -280,15 +283,22 @@ async fn main() -> anyhow::Result<()> {
cometbft_addr,
enable_expensive_rpc,
} => {
tracing::info!(
?abci_bind,
?grpc_bind,
?grpc_auto_https,
?metrics_bind,
%cometbft_addr,
?enable_expensive_rpc,
"starting pd"
);
// Unpack grpc bind option, defaulting to localhost, but setting 0.0.0.0:443
// if auto https is enabled. We unpack the option outside of the conditional
// below, in order to report the bind address in error handling.
let grpc_bind = if grpc_bind.is_some() {
grpc_bind.unwrap_or(
"0.0.0.0:443"
.parse()
.context("failed to parse grpc_bind address")?,
)
} else {
grpc_bind.unwrap_or(
"127.0.0.1:8080"
.parse()
.context("failed to parse grpc_bind address")?,
)
};

// Ensure we have all necessary parts in the URL
if !url_has_necessary_parts(&cometbft_addr) {
Expand All @@ -310,6 +320,16 @@ async fn main() -> anyhow::Result<()> {
.await
.context("Unable to initialize RocksDB storage")?;

tracing::info!(
?abci_bind,
?grpc_bind,
?grpc_auto_https,
?metrics_bind,
%cometbft_addr,
?enable_expensive_rpc,
"starting pd"
);

use penumbra_tower_trace::trace::request_span;

let consensus = tower::ServiceBuilder::new()
Expand Down Expand Up @@ -467,11 +487,6 @@ async fn main() -> anyhow::Result<()> {
let mut acme_cache = pd_home.clone();
acme_cache.push("rustls_acme_cache");

let grpc_bind = grpc_bind.unwrap_or(
"0.0.0.0:443"
.parse()
.context("failed to parse grpc_bind address")?,
);
let bound_listener = TcpListener::bind(grpc_bind)
.await
.context(format!("Failed to bind HTTPS listener on {}", grpc_bind))?;
Expand All @@ -492,11 +507,6 @@ async fn main() -> anyhow::Result<()> {
.spawn(grpc_server.serve_with_incoming(tls_incoming))
.expect("failed to spawn grpc server")
} else {
let grpc_bind = grpc_bind.unwrap_or(
"127.0.0.1:8080"
.parse()
.context("failed to parse grpc_bind address")?,
);
tokio::task::Builder::new()
.name("grpc_server")
.spawn(grpc_server.serve(grpc_bind))
Expand All @@ -513,7 +523,14 @@ async fn main() -> anyhow::Result<()> {
penumbra_dex::component::metrics::DEX_BUCKETS,
)?
.build()
.expect("failed to build prometheus recorder");
.map_err(|_| {
let msg = format!(
"failed to build prometheus recorder; make sure {} is available",
&metrics_bind
);
tracing::error!("{}", msg);
anyhow::anyhow!(msg)
})?;

Stack::new(recorder)
// Adding the `TracingContextLayer` will add labels from the tracing span to metrics.
Expand All @@ -528,11 +545,38 @@ async fn main() -> anyhow::Result<()> {

pd::register_metrics();

// TODO: better error reporting
// We error out if a service errors, rather than keep running
// We error out if a service errors, rather than keep running.
// A special attempt is made to detect whether binding to target socket failed;
// if so, we report that error explicitly, otherwise we fall back to reporting
// whatever the error was.
tokio::select! {
x = abci_server => x?.map_err(|e| anyhow::anyhow!(e))?,
x = grpc_server => x?.map_err(|e| anyhow::anyhow!(e))?,
x = abci_server => x?.map_err(|e| {
// The display impl on the ABCI error is sufficiently informative,
// so we don't need special handling of the failed-to-bind case.
let msg = format!("abci server on {} failed: {}", abci_bind, e);
tracing::error!("{}", msg);
anyhow::anyhow!(msg)
}
)?,

x = grpc_server => x?.map_err(|e| {
let mut msg = format!("grpc server on {} failed: {}", grpc_bind, e);
// Detect if we have a bind error. We need to unpack nested errors, from
// tonic -> hyper -> std. Otherwise, only "transport error" is reported,
// which isn't informative enough to take action.
if let Some(e) = e.source() {
if let Some(e) = e.source() {
if let Some(e) = e.downcast_ref::<std::io::Error>() {
if e.kind().to_string().contains("address in use") {
msg = format!("grpc bind socket already in use: {}", grpc_bind);
}
}
}
}
tracing::error!("{}", msg);
anyhow::anyhow!(msg)
}
)?,
};
}

Expand Down

0 comments on commit 8680e83

Please sign in to comment.