Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pd): report bind errors on startup #3555

Merged
merged 1 commit into from
Jan 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 72 additions & 28 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{net::SocketAddr, path::PathBuf};
use console_subscriber::ConsoleLayer;
use metrics_tracing_context::{MetricsLayer, TracingContextLayer};
use metrics_util::layers::Stack;
use std::error::Error;

use anyhow::Context;
use clap::{Parser, Subcommand};
Expand Down Expand Up @@ -245,26 +246,28 @@ enum TestnetCommand {

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Validate options immediately.
let opt = Opt::parse();

// Instantiate tracing layers.
// The MetricsLayer handles enriching metrics output with labels from tracing spans.
let metrics_layer = MetricsLayer::new();
// The ConsoleLayer enables collection of data for `tokio-console`.
let console_layer = ConsoleLayer::builder().with_default_env().spawn();
// The `FmtLayer` is used to print to the console.
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(atty::is(atty::Stream::Stdout))
.with_target(true);
// The `EnvFilter` layer is used to filter events based on `RUST_LOG`.
let filter_layer = EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("info"))?;

let opt = Opt::parse();

// Register the tracing subscribers, conditionally enabling tokio console support
let registry = tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.with(metrics_layer);
if opt.tokio_console {
// The ConsoleLayer enables collection of data for `tokio-console`.
// The `spawn` call will panic if AddrInUse, so we only spawn if enabled.
let console_layer = ConsoleLayer::builder().with_default_env().spawn();
registry.with(console_layer).init();
} else {
registry.init();
Expand All @@ -280,15 +283,22 @@ async fn main() -> anyhow::Result<()> {
cometbft_addr,
enable_expensive_rpc,
} => {
tracing::info!(
?abci_bind,
?grpc_bind,
?grpc_auto_https,
?metrics_bind,
%cometbft_addr,
?enable_expensive_rpc,
"starting pd"
);
// Unpack grpc bind option, defaulting to localhost, but setting 0.0.0.0:443
// if auto https is enabled. We unpack the option outside of the conditional
// below, in order to report the bind address in error handling.
let grpc_bind = if grpc_bind.is_some() {
grpc_bind.unwrap_or(
"0.0.0.0:443"
.parse()
.context("failed to parse grpc_bind address")?,
)
} else {
grpc_bind.unwrap_or(
"127.0.0.1:8080"
.parse()
.context("failed to parse grpc_bind address")?,
)
};

// Ensure we have all necessary parts in the URL
if !url_has_necessary_parts(&cometbft_addr) {
Expand All @@ -310,6 +320,16 @@ async fn main() -> anyhow::Result<()> {
.await
.context("Unable to initialize RocksDB storage")?;

tracing::info!(
?abci_bind,
?grpc_bind,
?grpc_auto_https,
?metrics_bind,
%cometbft_addr,
?enable_expensive_rpc,
"starting pd"
);

use penumbra_tower_trace::trace::request_span;

let consensus = tower::ServiceBuilder::new()
Expand Down Expand Up @@ -467,11 +487,6 @@ async fn main() -> anyhow::Result<()> {
let mut acme_cache = pd_home.clone();
acme_cache.push("rustls_acme_cache");

let grpc_bind = grpc_bind.unwrap_or(
"0.0.0.0:443"
.parse()
.context("failed to parse grpc_bind address")?,
);
let bound_listener = TcpListener::bind(grpc_bind)
.await
.context(format!("Failed to bind HTTPS listener on {}", grpc_bind))?;
Expand All @@ -492,11 +507,6 @@ async fn main() -> anyhow::Result<()> {
.spawn(grpc_server.serve_with_incoming(tls_incoming))
.expect("failed to spawn grpc server")
} else {
let grpc_bind = grpc_bind.unwrap_or(
"127.0.0.1:8080"
.parse()
.context("failed to parse grpc_bind address")?,
);
tokio::task::Builder::new()
.name("grpc_server")
.spawn(grpc_server.serve(grpc_bind))
Expand All @@ -513,7 +523,14 @@ async fn main() -> anyhow::Result<()> {
penumbra_dex::component::metrics::DEX_BUCKETS,
)?
.build()
.expect("failed to build prometheus recorder");
.map_err(|_| {
let msg = format!(
"failed to build prometheus recorder; make sure {} is available",
&metrics_bind
);
tracing::error!("{}", msg);
anyhow::anyhow!(msg)
})?;

Stack::new(recorder)
// Adding the `TracingContextLayer` will add labels from the tracing span to metrics.
Expand All @@ -528,11 +545,38 @@ async fn main() -> anyhow::Result<()> {

pd::register_metrics();

// TODO: better error reporting
// We error out if a service errors, rather than keep running
// We error out if a service errors, rather than keep running.
// A special attempt is made to detect whether binding to target socket failed;
// if so, we report that error explicitly, otherwise we fall back to reporting
// whatever the error was.
tokio::select! {
x = abci_server => x?.map_err(|e| anyhow::anyhow!(e))?,
x = grpc_server => x?.map_err(|e| anyhow::anyhow!(e))?,
x = abci_server => x?.map_err(|e| {
// The display impl on the ABCI error is sufficiently informative,
// so we don't need special handling of the failed-to-bind case.
let msg = format!("abci server on {} failed: {}", abci_bind, e);
tracing::error!("{}", msg);
anyhow::anyhow!(msg)
}
)?,

x = grpc_server => x?.map_err(|e| {
let mut msg = format!("grpc server on {} failed: {}", grpc_bind, e);
// Detect if we have a bind error. We need to unpack nested errors, from
// tonic -> hyper -> std. Otherwise, only "transport error" is reported,
// which isn't informative enough to take action.
if let Some(e) = e.source() {
if let Some(e) = e.source() {
if let Some(e) = e.downcast_ref::<std::io::Error>() {
if e.kind().to_string().contains("address in use") {
msg = format!("grpc bind socket already in use: {}", grpc_bind);
}
}
}
}
tracing::error!("{}", msg);
anyhow::anyhow!(msg)
}
)?,
};
}

Expand Down
Loading