Skip to content

Commit

Permalink
Add an admin server (#10)
Browse files Browse the repository at this point in the history
Controllers need an HTTP admin server for Kubernetes liveness and
readiness probes.

This change adds an `admin` module that configures a server with
`/ready` and `/live` endpoints.
  • Loading branch information
olix0r committed Feb 22, 2022
1 parent 752f3b9 commit 7a192b9
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 27 deletions.
5 changes: 3 additions & 2 deletions kubert/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kubert"
version = "0.2.2"
version = "0.2.3"
edition = "2021"
license = "Apache-2.0"
description = "Kubernetes runtime helpers. Based on kube-rs."
Expand All @@ -10,14 +10,14 @@ rust-version = "1.56.1"
keywords = ["kubernetes", "client", "runtime", "server"]

[features]
admin = ["futures-util", "hyper/http1", "hyper/runtime", "hyper/server", "tokio/sync"]
client = ["kube-client", "thiserror"]
# TODO controller = ["client", "kube/runtime"]
log = ["thiserror", "tracing-subscriber"]
requeue = ["futures-core", "tokio/macros", "tokio/sync", "tokio-util/time"]
# TODO runtime = ["clap", "drain", "log", "tokio/signal"]
server = [
"drain",
"hyper",
"hyper/http1",
"hyper/http2",
"hyper/runtime",
Expand Down Expand Up @@ -46,6 +46,7 @@ features = [
[dependencies]
drain = { version = "0.1.0", optional = true, default-features = false }
futures-core = { version = "0.3", optional = true, default-features = false }
futures-util = { version = "0.3", optional = true, default-features = false }
hyper = { version = "0.14.17", optional = true, default-features = false }
rustls-pemfile = { version = "0.3.0", optional = true }
thiserror = { version = "1.0.30", optional = true }
Expand Down
202 changes: 202 additions & 0 deletions kubert/src/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
//! Admin server utilities.
use futures_util::future;
use hyper::{Body, Request, Response};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tracing::{debug, info_span, Instrument};

/// Command-line arguments used to configure an admin server
#[cfg(feature = "clap")]
#[derive(Clone, Debug, clap::Parser)]
#[cfg_attr(docsrs, doc(cfg(all(feature = "admin", feature = "clap"))))]
pub struct AdminArgs {
/// The admin server's address
#[cfg_attr(feature = "clap", clap(long, default_value = "0.0.0.0:8080"))]
pub admin_addr: SocketAddr,
}

/// Supports configuring and running an admin server
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
#[derive(Debug)]
pub struct Builder {
addr: SocketAddr,
ready: Readiness,
}

/// Controls how the admin server advertises readiness
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
#[derive(Clone, Debug)]
pub struct Readiness(Arc<AtomicBool>);

/// A handle to a running admin server
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
pub struct Server {
addr: SocketAddr,
ready: Readiness,
task: tokio::task::JoinHandle<hyper::Result<()>>,
}

// === impl AdminArgs ===

#[cfg(feature = "clap")]
impl AdminArgs {
/// Creates a new [`Builder`] frm the command-line arguments
pub fn into_builder(self) -> Builder {
Builder::new(self.admin_addr)
}

/// Binds and runs the server on a background task, returning a handle
///
/// The server starts unready by default and it's up to the caller to mark it as ready.
pub fn spawn(self) -> Server {
self.into_builder().spawn()
}
}

// === impl Builder ===

impl Builder {
/// Creates a new [`Builder`] with the given server address
///
/// The server starts unready by default and it's up to the caller to mark it as ready.
pub fn new(addr: SocketAddr) -> Self {
Self {
addr,
ready: Readiness(Arc::new(false.into())),
}
}

/// Returns a readiness handle
pub fn readiness(&self) -> Readiness {
self.ready.clone()
}

/// Sets the initial readiness state to ready
pub fn set_ready(&self) {
self.ready.set(true);
}

/// Binds and runs the server on a background task, returning a handle
pub fn spawn(self) -> Server {
let Self { addr, ready } = self;

let http = hyper::server::Server::bind(&addr)
// Allow weird clients (like netcat).
.http1_half_close(true)
// Prevent port scanners, etc, from holding connections ope.n
.http1_header_read_timeout(Duration::from_secs(2))
// Use a small buffer, since we don't really transfer much data.
.http1_max_buf_size(8 * 1024);

let server = {
let ready = ready.clone();
http.serve(hyper::service::make_service_fn(move |_conn| {
let ready = ready.clone();
future::ok::<_, hyper::Error>(hyper::service::service_fn(
move |req: hyper::Request<hyper::Body>| match req.uri().path() {
"/live" => future::ok(handle_live(req)),
"/ready" => future::ok(handle_ready(&ready, req)),
_ => future::ok::<_, hyper::Error>(
hyper::Response::builder()
.status(hyper::StatusCode::NOT_FOUND)
.body(hyper::Body::default())
.unwrap(),
),
},
))
}))
};

let addr = server.local_addr();

let task = tokio::spawn(
async move {
debug!("Serving");
server.await
}
.instrument(info_span!("admin", port = %addr.port())),
);

Server { addr, ready, task }
}
}

// === impl Readiness ===

impl Readiness {
/// Gets the current readiness state
pub fn get(&self) -> bool {
self.0.load(Ordering::Acquire)
}

/// Sets the readiness state
pub fn set(&self, ready: bool) {
self.0.store(ready, Ordering::Release);
}
}

// === impl Server ===

impl Server {
/// Returns the bound local address of the server
pub fn local_addr(&self) -> SocketAddr {
self.addr
}

/// Returns a readiness handle
pub fn readiness(&self) -> Readiness {
self.ready.clone()
}

/// Returns the server tasks's join handle
pub fn into_join_handle(self) -> tokio::task::JoinHandle<hyper::Result<()>> {
self.task
}
}

// === handlers ===

fn handle_live(req: Request<Body>) -> Response<Body> {
match *req.method() {
hyper::Method::GET | hyper::Method::HEAD => Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("alive\n".into())
.unwrap(),
_ => Response::builder()
.status(hyper::StatusCode::METHOD_NOT_ALLOWED)
.body(Body::default())
.unwrap(),
}
}

fn handle_ready(Readiness(ready): &Readiness, req: Request<Body>) -> Response<Body> {
match *req.method() {
hyper::Method::GET | hyper::Method::HEAD => {
if ready.load(Ordering::Acquire) {
return Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("ready\n".into())
.unwrap();
}

Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("not ready\n".into())
.unwrap()
}
_ => Response::builder()
.status(hyper::StatusCode::METHOD_NOT_ALLOWED)
.body(Body::default())
.unwrap(),
}
}
4 changes: 2 additions & 2 deletions kubert/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use thiserror::Error;
/// Configures a Kubernetes client
// TODO configure a --kubeconfig
#[derive(Clone, Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
pub struct ClientArgs {
/// The name of the kubeconfig cluster to use
Expand All @@ -24,7 +24,7 @@ pub struct ClientArgs {

/// Indicates an error occurred while configuring the Kubernetes client
#[derive(Debug, Error)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
pub enum ConfigError {
/// Indicates that the kubeconfig file could not be read
#[error(transparent)]
Expand Down
30 changes: 18 additions & 12 deletions kubert/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@
#![forbid(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(feature = "admin")]
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
pub mod admin;

#[cfg(feature = "client")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
pub mod client;

#[cfg(all(feature = "client"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "client"))))]
pub use self::client::ClientArgs;

#[cfg(feature = "log")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "log")))]
pub mod log;

#[cfg(feature = "shutdown")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
pub mod shutdown;

#[cfg(feature = "requeue")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "requeue"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "requeue")))]
pub mod requeue;

#[cfg(feature = "server")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub mod server;

#[cfg(feature = "shutdown")]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub mod shutdown;

#[cfg(all(feature = "admin", feature = "clap"))]
pub use self::admin::AdminArgs;

#[cfg(all(feature = "client"))]
pub use self::client::ClientArgs;
4 changes: 2 additions & 2 deletions kubert/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use tracing_subscriber::EnvFilter;
/// Configures whether logs should be emitted in plaintext (the default) or as JSON-encoded
/// messages
#[derive(Clone, Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "log")))]
pub enum LogFormat {
/// The default plaintext format
Plain,
Expand All @@ -20,7 +20,7 @@ pub enum LogFormat {
/// Indicates that an invalid log format was specified
#[derive(Debug, Error)]
#[error("invalid log level: {0} must be 'plain' or 'json'")]
#[cfg_attr(docsrs, doc(cfg(any(feature = "log"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "log")))]
pub struct InvalidLogFormat(String);

// === impl LogFormat ===
Expand Down
10 changes: 5 additions & 5 deletions kubert/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, error, info, info_span, Instrument};
/// Command-line arguments used to configure a server
#[derive(Clone, Debug)]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub struct ServerArgs {
/// The server's address
#[cfg_attr(feature = "clap", clap(long, default_value = "0.0.0.0:443"))]
Expand All @@ -31,25 +31,25 @@ pub struct ServerArgs {

/// A running server
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub struct SpawnedServer {
local_addr: SocketAddr,
task: tokio::task::JoinHandle<()>,
}

/// The path to the server's TLS private key
#[derive(Clone, Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub struct TlsKeyPath(PathBuf);

/// The path to the server's TLS certificate bundle
#[derive(Clone, Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub struct TlsCertPath(PathBuf);

/// Describes an error that occurred while initializing a server
#[derive(Debug, Error)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub enum Error {
/// No TLS key path was configured
#[error("--server-tls-key must be set")]
Expand Down
8 changes: 4 additions & 4 deletions kubert/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
use tokio::signal::unix::{signal, SignalKind};
use tracing::debug;

#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub use drain::Watch;

/// Drives shutdown by watching signals
#[derive(Debug)]
#[must_use = "call `Shutdown::on_signal` to await a signal"]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub struct Shutdown(drain::Signal);

/// Indicates whether shutdown completed gracefully or was forced by a second signal
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub enum Completion {
/// Indicates that shutdown completed gracefully
Graceful,
Expand All @@ -30,7 +30,7 @@ pub enum Completion {
/// signal is received while waiting for watches to be dropped, the shutdown is aborted.
///
/// If a second signal is received while waiting for shutdown to complete, the process
#[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "shutdown")))]
pub fn channel() -> (Shutdown, Watch) {
let (drain_tx, drain_rx) = drain::channel();
(Shutdown(drain_tx), drain_rx)
Expand Down

0 comments on commit 7a192b9

Please sign in to comment.