diff --git a/kubert/Cargo.toml b/kubert/Cargo.toml index bb759b7..6e2dfbe 100644 --- a/kubert/Cargo.toml +++ b/kubert/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kubert" -version = "0.2.2" +version = "0.2.3" edition = "2021" license = "Apache-2.0" description = "Kubernetes runtime helpers. Based on kube-rs." @@ -10,6 +10,7 @@ rust-version = "1.56.1" keywords = ["kubernetes", "client", "runtime", "server"] [features] +admin = ["futures-util", "hyper/http1", "hyper/runtime", "hyper/server", "tokio/sync"] client = ["kube-client", "thiserror"] # TODO controller = ["client", "kube/runtime"] log = ["thiserror", "tracing-subscriber"] @@ -17,7 +18,6 @@ requeue = ["futures-core", "tokio/macros", "tokio/sync", "tokio-util/time"] # TODO runtime = ["clap", "drain", "log", "tokio/signal"] server = [ "drain", - "hyper", "hyper/http1", "hyper/http2", "hyper/runtime", @@ -46,6 +46,7 @@ features = [ [dependencies] drain = { version = "0.1.0", optional = true, default-features = false } futures-core = { version = "0.3", optional = true, default-features = false } +futures-util = { version = "0.3", optional = true, default-features = false } hyper = { version = "0.14.17", optional = true, default-features = false } rustls-pemfile = { version = "0.3.0", optional = true } thiserror = { version = "1.0.30", optional = true } diff --git a/kubert/src/admin.rs b/kubert/src/admin.rs new file mode 100644 index 0000000..d453466 --- /dev/null +++ b/kubert/src/admin.rs @@ -0,0 +1,202 @@ +//! Admin server utilities. +use futures_util::future; +use hyper::{Body, Request, Response}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tracing::{debug, info_span, Instrument}; + +/// Command-line arguments used to configure an admin server +#[cfg(feature = "clap")] +#[derive(Clone, Debug, clap::Parser)] +#[cfg_attr(docsrs, doc(cfg(all(feature = "admin", feature = "clap"))))] +pub struct AdminArgs { + /// The admin server's address + #[cfg_attr(feature = "clap", clap(long, default_value = "0.0.0.0:8080"))] + pub admin_addr: SocketAddr, +} + +/// Supports configuring and running an admin server +#[cfg_attr(docsrs, doc(cfg(feature = "admin")))] +#[derive(Debug)] +pub struct Builder { + addr: SocketAddr, + ready: Readiness, +} + +/// Controls how the admin server advertises readiness +#[cfg_attr(docsrs, doc(cfg(feature = "admin")))] +#[derive(Clone, Debug)] +pub struct Readiness(Arc); + +/// A handle to a running admin server +#[derive(Debug)] +#[cfg_attr(docsrs, doc(cfg(feature = "admin")))] +pub struct Server { + addr: SocketAddr, + ready: Readiness, + task: tokio::task::JoinHandle>, +} + +// === impl AdminArgs === + +#[cfg(feature = "clap")] +impl AdminArgs { + /// Creates a new [`Builder`] frm the command-line arguments + pub fn into_builder(self) -> Builder { + Builder::new(self.admin_addr) + } + + /// Binds and runs the server on a background task, returning a handle + /// + /// The server starts unready by default and it's up to the caller to mark it as ready. + pub fn spawn(self) -> Server { + self.into_builder().spawn() + } +} + +// === impl Builder === + +impl Builder { + /// Creates a new [`Builder`] with the given server address + /// + /// The server starts unready by default and it's up to the caller to mark it as ready. + pub fn new(addr: SocketAddr) -> Self { + Self { + addr, + ready: Readiness(Arc::new(false.into())), + } + } + + /// Returns a readiness handle + pub fn readiness(&self) -> Readiness { + self.ready.clone() + } + + /// Sets the initial readiness state to ready + pub fn set_ready(&self) { + self.ready.set(true); + } + + /// Binds and runs the server on a background task, returning a handle + pub fn spawn(self) -> Server { + let Self { addr, ready } = self; + + let http = hyper::server::Server::bind(&addr) + // Allow weird clients (like netcat). + .http1_half_close(true) + // Prevent port scanners, etc, from holding connections ope.n + .http1_header_read_timeout(Duration::from_secs(2)) + // Use a small buffer, since we don't really transfer much data. + .http1_max_buf_size(8 * 1024); + + let server = { + let ready = ready.clone(); + http.serve(hyper::service::make_service_fn(move |_conn| { + let ready = ready.clone(); + future::ok::<_, hyper::Error>(hyper::service::service_fn( + move |req: hyper::Request| match req.uri().path() { + "/live" => future::ok(handle_live(req)), + "/ready" => future::ok(handle_ready(&ready, req)), + _ => future::ok::<_, hyper::Error>( + hyper::Response::builder() + .status(hyper::StatusCode::NOT_FOUND) + .body(hyper::Body::default()) + .unwrap(), + ), + }, + )) + })) + }; + + let addr = server.local_addr(); + + let task = tokio::spawn( + async move { + debug!("Serving"); + server.await + } + .instrument(info_span!("admin", port = %addr.port())), + ); + + Server { addr, ready, task } + } +} + +// === impl Readiness === + +impl Readiness { + /// Gets the current readiness state + pub fn get(&self) -> bool { + self.0.load(Ordering::Acquire) + } + + /// Sets the readiness state + pub fn set(&self, ready: bool) { + self.0.store(ready, Ordering::Release); + } +} + +// === impl Server === + +impl Server { + /// Returns the bound local address of the server + pub fn local_addr(&self) -> SocketAddr { + self.addr + } + + /// Returns a readiness handle + pub fn readiness(&self) -> Readiness { + self.ready.clone() + } + + /// Returns the server tasks's join handle + pub fn into_join_handle(self) -> tokio::task::JoinHandle> { + self.task + } +} + +// === handlers === + +fn handle_live(req: Request) -> Response { + match *req.method() { + hyper::Method::GET | hyper::Method::HEAD => Response::builder() + .status(hyper::StatusCode::OK) + .header(hyper::header::CONTENT_TYPE, "text/plain") + .body("alive\n".into()) + .unwrap(), + _ => Response::builder() + .status(hyper::StatusCode::METHOD_NOT_ALLOWED) + .body(Body::default()) + .unwrap(), + } +} + +fn handle_ready(Readiness(ready): &Readiness, req: Request) -> Response { + match *req.method() { + hyper::Method::GET | hyper::Method::HEAD => { + if ready.load(Ordering::Acquire) { + return Response::builder() + .status(hyper::StatusCode::OK) + .header(hyper::header::CONTENT_TYPE, "text/plain") + .body("ready\n".into()) + .unwrap(); + } + + Response::builder() + .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) + .header(hyper::header::CONTENT_TYPE, "text/plain") + .body("not ready\n".into()) + .unwrap() + } + _ => Response::builder() + .status(hyper::StatusCode::METHOD_NOT_ALLOWED) + .body(Body::default()) + .unwrap(), + } +} diff --git a/kubert/src/client.rs b/kubert/src/client.rs index 0901dd7..4a89b72 100644 --- a/kubert/src/client.rs +++ b/kubert/src/client.rs @@ -6,7 +6,7 @@ use thiserror::Error; /// Configures a Kubernetes client // TODO configure a --kubeconfig #[derive(Clone, Debug)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "client")))] #[cfg_attr(feature = "clap", derive(clap::Parser))] pub struct ClientArgs { /// The name of the kubeconfig cluster to use @@ -24,7 +24,7 @@ pub struct ClientArgs { /// Indicates an error occurred while configuring the Kubernetes client #[derive(Debug, Error)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "client")))] pub enum ConfigError { /// Indicates that the kubeconfig file could not be read #[error(transparent)] diff --git a/kubert/src/lib.rs b/kubert/src/lib.rs index 8bed2c5..48cd7e9 100644 --- a/kubert/src/lib.rs +++ b/kubert/src/lib.rs @@ -6,26 +6,32 @@ #![forbid(unsafe_code)] #![cfg_attr(docsrs, feature(doc_cfg))] +#[cfg(feature = "admin")] +#[cfg_attr(docsrs, doc(cfg(feature = "admin")))] +pub mod admin; + #[cfg(feature = "client")] -#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "client")))] pub mod client; -#[cfg(all(feature = "client"))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))] -pub use self::client::ClientArgs; - #[cfg(feature = "log")] -#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "log")))] pub mod log; -#[cfg(feature = "shutdown")] -#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] -pub mod shutdown; - #[cfg(feature = "requeue")] -#[cfg_attr(docsrs, doc(cfg(any(feature = "requeue"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "requeue")))] pub mod requeue; #[cfg(feature = "server")] -#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "server")))] pub mod server; + +#[cfg(feature = "shutdown")] +#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))] +pub mod shutdown; + +#[cfg(all(feature = "admin", feature = "clap"))] +pub use self::admin::AdminArgs; + +#[cfg(all(feature = "client"))] +pub use self::client::ClientArgs; diff --git a/kubert/src/log.rs b/kubert/src/log.rs index 24f7f32..689e5b1 100644 --- a/kubert/src/log.rs +++ b/kubert/src/log.rs @@ -8,7 +8,7 @@ pub use tracing_subscriber::EnvFilter; /// Configures whether logs should be emitted in plaintext (the default) or as JSON-encoded /// messages #[derive(Clone, Debug)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "log")))] pub enum LogFormat { /// The default plaintext format Plain, @@ -20,7 +20,7 @@ pub enum LogFormat { /// Indicates that an invalid log format was specified #[derive(Debug, Error)] #[error("invalid log level: {0} must be 'plain' or 'json'")] -#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "log")))] pub struct InvalidLogFormat(String); // === impl LogFormat === diff --git a/kubert/src/server.rs b/kubert/src/server.rs index cb56475..97be143 100644 --- a/kubert/src/server.rs +++ b/kubert/src/server.rs @@ -14,7 +14,7 @@ use tracing::{debug, error, info, info_span, Instrument}; /// Command-line arguments used to configure a server #[derive(Clone, Debug)] #[cfg_attr(feature = "clap", derive(clap::Parser))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "server")))] pub struct ServerArgs { /// The server's address #[cfg_attr(feature = "clap", clap(long, default_value = "0.0.0.0:443"))] @@ -31,7 +31,7 @@ pub struct ServerArgs { /// A running server #[derive(Debug)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "server")))] pub struct SpawnedServer { local_addr: SocketAddr, task: tokio::task::JoinHandle<()>, @@ -39,17 +39,17 @@ pub struct SpawnedServer { /// The path to the server's TLS private key #[derive(Clone, Debug)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "server")))] pub struct TlsKeyPath(PathBuf); /// The path to the server's TLS certificate bundle #[derive(Clone, Debug)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "server")))] pub struct TlsCertPath(PathBuf); /// Describes an error that occurred while initializing a server #[derive(Debug, Error)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "server")))] pub enum Error { /// No TLS key path was configured #[error("--server-tls-key must be set")] diff --git a/kubert/src/shutdown.rs b/kubert/src/shutdown.rs index d576824..d72374e 100644 --- a/kubert/src/shutdown.rs +++ b/kubert/src/shutdown.rs @@ -3,18 +3,18 @@ use tokio::signal::unix::{signal, SignalKind}; use tracing::debug; -#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))] pub use drain::Watch; /// Drives shutdown by watching signals #[derive(Debug)] #[must_use = "call `Shutdown::on_signal` to await a signal"] -#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))] pub struct Shutdown(drain::Signal); /// Indicates whether shutdown completed gracefully or was forced by a second signal #[derive(Copy, Clone, Debug, Eq, PartialEq)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))] pub enum Completion { /// Indicates that shutdown completed gracefully Graceful, @@ -30,7 +30,7 @@ pub enum Completion { /// signal is received while waiting for watches to be dropped, the shutdown is aborted. /// /// If a second signal is received while waiting for shutdown to complete, the process -#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))] pub fn channel() -> (Shutdown, Watch) { let (drain_tx, drain_rx) = drain::channel(); (Shutdown(drain_tx), drain_rx)