Skip to content

Commit

Permalink
Add an admin server
Browse files Browse the repository at this point in the history
Controllers need an HTTP admin server for Kubernetes liveness and
readiness probes.

This change adds an `admin` module that configures a server with
`/ready` and `/live` endpoints.
  • Loading branch information
olix0r committed Feb 22, 2022
1 parent 752f3b9 commit ddda0eb
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 27 deletions.
5 changes: 3 additions & 2 deletions kubert/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kubert"
version = "0.2.2"
version = "0.2.3"
edition = "2021"
license = "Apache-2.0"
description = "Kubernetes runtime helpers. Based on kube-rs."
Expand All @@ -10,14 +10,14 @@ rust-version = "1.56.1"
keywords = ["kubernetes", "client", "runtime", "server"]

[features]
admin = ["futures-util", "hyper/http1", "hyper/runtime", "hyper/server", "tokio/sync"]
client = ["kube-client", "thiserror"]
# TODO controller = ["client", "kube/runtime"]
log = ["thiserror", "tracing-subscriber"]
requeue = ["futures-core", "tokio/macros", "tokio/sync", "tokio-util/time"]
# TODO runtime = ["clap", "drain", "log", "tokio/signal"]
server = [
"drain",
"hyper",
"hyper/http1",
"hyper/http2",
"hyper/runtime",
Expand Down Expand Up @@ -46,6 +46,7 @@ features = [
[dependencies]
drain = { version = "0.1.0", optional = true, default-features = false }
futures-core = { version = "0.3", optional = true, default-features = false }
futures-util = { version = "0.3", optional = true, default-features = false }
hyper = { version = "0.14.17", optional = true, default-features = false }
rustls-pemfile = { version = "0.3.0", optional = true }
thiserror = { version = "1.0.30", optional = true }
Expand Down
202 changes: 202 additions & 0 deletions kubert/src/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
//! Admin server utilities.
use futures_util::future;
use hyper::{Body, Request, Response};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tracing::{debug, info_span, Instrument};

/// Command-line arguments used to configure an admin server
#[cfg(feature = "clap")]
#[derive(Clone, Debug, clap::Parser)]
#[cfg_attr(docsrs, doc(cfg(all(feature = "admin", feature = "clap"))))]
pub struct AdminArgs {
/// The admin server's address
#[cfg_attr(feature = "clap", clap(long, default_value = "0.0.0.0:8080"))]
pub admin_addr: SocketAddr,
}

/// Supports configuring and running an admin server
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
#[derive(Debug)]
pub struct Builder {
addr: SocketAddr,
ready: Readiness,
}

/// Controls how the admin server advertises readiness
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
#[derive(Clone, Debug)]
pub struct Readiness(Arc<AtomicBool>);

/// A handle to a running admin server
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
pub struct Server {
addr: SocketAddr,
ready: Readiness,
task: tokio::task::JoinHandle<hyper::Result<()>>,
}

// === impl AdminArgs ===

#[cfg(feature = "clap")]
impl AdminArgs {
/// Creates a new [`Builder`] frm the command-line arguments
pub fn into_builder(self) -> Builder {
Builder::new(self.admin_addr)
}

/// Binds and runs the server on a background task, returning a handle
///
/// The server starts unready by default and it's up to the caller to mark it as ready.
pub fn spawn(self) -> Server {
self.into_builder().spawn()
}
}

// === impl Builder ===

impl Builder {
/// Creates a new [`Builder`] with the given server address
///
/// The server starts unready by default and it's up to the caller to mark it as ready.
pub fn new(addr: SocketAddr) -> Self {
Self {
addr,
ready: Readiness(Arc::new(false.into())),
}
}

/// Returns a readiness handle
pub fn readiness(&self) -> Readiness {
self.ready.clone()
}

/// Sets the initial readiness state to ready
pub fn set_ready(&self) {
self.ready.set(true);
}

/// Binds and runs the server on a background task, returning a handle
pub fn spawn(self) -> Server {
let Self { addr, ready } = self;

let http = hyper::server::Server::bind(&addr)
// Allow weird clients (like netcat).
.http1_half_close(true)
// Prevent port scanners, etc, from holding connections ope.n
.http1_header_read_timeout(Duration::from_secs(2))
// Use a small buffer, since we don't really transfer much data.
.http1_max_buf_size(8 * 1024);

let server = {
let ready = ready.clone();
http.serve(hyper::service::make_service_fn(move |_conn| {
let ready = ready.clone();
future::ok::<_, hyper::Error>(hyper::service::service_fn(
move |req: hyper::Request<hyper::Body>| match req.uri().path() {
"/live" => future::ok(handle_live(req)),
"/ready" => future::ok(handle_ready(&ready, req)),
_ => future::ok::<_, hyper::Error>(
hyper::Response::builder()
.status(hyper::StatusCode::NOT_FOUND)
.body(hyper::Body::default())
.unwrap(),
),
},
))
}))
};

let addr = server.local_addr();

let task = tokio::spawn(
async move {
debug!("Serving");
server.await
}
.instrument(info_span!("admin", port = %addr.port())),
);

Server { addr, ready, task }
}
}

// === impl Readiness ===

impl Readiness {
/// Gets the current readiness state
pub fn get(&self) -> bool {
self.0.load(Ordering::Acquire)
}

/// Sets the readiness state
pub fn set(&self, ready: bool) {
self.0.store(ready, Ordering::Release);
}
}

// === impl Server ===

impl Server {
/// Returns the bound local address of the server
pub fn local_addr(&self) -> SocketAddr {
self.addr
}

/// Returns a readiness handle
pub fn readiness(&self) -> Readiness {
self.ready.clone()
}

/// Returns the server tasks's join handle
pub fn into_join_handle(self) -> tokio::task::JoinHandle<hyper::Result<()>> {
self.task
}
}

// === handlers ===

fn handle_live(req: Request<Body>) -> Response<Body> {
match *req.method() {
hyper::Method::GET | hyper::Method::HEAD => Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("alive\n".into())
.unwrap(),
_ => Response::builder()
.status(hyper::StatusCode::METHOD_NOT_ALLOWED)
.body(Body::default())
.unwrap(),
}
}

fn handle_ready(Readiness(ready): &Readiness, req: Request<Body>) -> Response<Body> {
match *req.method() {
hyper::Method::GET | hyper::Method::HEAD => {
if ready.load(Ordering::Acquire) {
return Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("ready\n".into())
.unwrap();
}

Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("not ready\n".into())
.unwrap()
}
_ => Response::builder()
.status(hyper::StatusCode::METHOD_NOT_ALLOWED)
.body(Body::default())
.unwrap(),
}
}
4 changes: 2 additions & 2 deletions kubert/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use thiserror::Error;
/// Configures a Kubernetes client
// TODO configure a --kubeconfig
#[derive(Clone, Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
pub struct ClientArgs {
/// The name of the kubeconfig cluster to use
Expand All @@ -24,7 +24,7 @@ pub struct ClientArgs {

/// Indicates an error occurred while configuring the Kubernetes client
#[derive(Debug, Error)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
pub enum ConfigError {
/// Indicates that the kubeconfig file could not be read
#[error(transparent)]
Expand Down
30 changes: 18 additions & 12 deletions kubert/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@
#![forbid(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(feature = "admin")]
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
pub mod admin;

#[cfg(feature = "client")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
pub mod client;

#[cfg(all(feature = "client"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))]
pub use self::client::ClientArgs;

#[cfg(feature = "log")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "log")))]
pub mod log;

#[cfg(feature = "shutdown")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
pub mod shutdown;

#[cfg(feature = "requeue")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "requeue"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "requeue")))]
pub mod requeue;

#[cfg(feature = "server")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub mod server;

#[cfg(feature = "shutdown")]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub mod shutdown;

#[cfg(all(feature = "admin", feature = "clap"))]
pub use self::admin::AdminArgs;

#[cfg(all(feature = "client"))]
pub use self::client::ClientArgs;
4 changes: 2 additions & 2 deletions kubert/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use tracing_subscriber::EnvFilter;
/// Configures whether logs should be emitted in plaintext (the default) or as JSON-encoded
/// messages
#[derive(Clone, Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "log")))]
pub enum LogFormat {
/// The default plaintext format
Plain,
Expand All @@ -20,7 +20,7 @@ pub enum LogFormat {
/// Indicates that an invalid log format was specified
#[derive(Debug, Error)]
#[error("invalid log level: {0} must be 'plain' or 'json'")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "log")))]
pub struct InvalidLogFormat(String);

// === impl LogFormat ===
Expand Down
10 changes: 5 additions & 5 deletions kubert/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, error, info, info_span, Instrument};
/// Command-line arguments used to configure a server
#[derive(Clone, Debug)]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub struct ServerArgs {
/// The server's address
#[cfg_attr(feature = "clap", clap(long, default_value = "0.0.0.0:443"))]
Expand All @@ -31,25 +31,25 @@ pub struct ServerArgs {

/// A running server
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub struct SpawnedServer {
local_addr: SocketAddr,
task: tokio::task::JoinHandle<()>,
}

/// The path to the server's TLS private key
#[derive(Clone, Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub struct TlsKeyPath(PathBuf);

/// The path to the server's TLS certificate bundle
#[derive(Clone, Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub struct TlsCertPath(PathBuf);

/// Describes an error that occurred while initializing a server
#[derive(Debug, Error)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub enum Error {
/// No TLS key path was configured
#[error("--server-tls-key must be set")]
Expand Down
8 changes: 4 additions & 4 deletions kubert/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
use tokio::signal::unix::{signal, SignalKind};
use tracing::debug;

#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub use drain::Watch;

/// Drives shutdown by watching signals
#[derive(Debug)]
#[must_use = "call `Shutdown::on_signal` to await a signal"]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub struct Shutdown(drain::Signal);

/// Indicates whether shutdown completed gracefully or was forced by a second signal
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub enum Completion {
/// Indicates that shutdown completed gracefully
Graceful,
Expand All @@ -30,7 +30,7 @@ pub enum Completion {
/// signal is received while waiting for watches to be dropped, the shutdown is aborted.
///
/// If a second signal is received while waiting for shutdown to complete, the process
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub fn channel() -> (Shutdown, Watch) {
let (drain_tx, drain_rx) = drain::channel();
(Shutdown(drain_tx), drain_rx)
Expand Down

0 comments on commit ddda0eb

Please sign in to comment.