diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dcf8dd..8e6ae1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## v0.7.0 - unreleased - Build binaries for Linux Musl libc target. #83 +- Move "Bytes tx" under "Gets" and "Bytes rx" under "Sets". #84 +- Create a `check` subcommand for `mc` to test connections to a server. #86 ## v0.6.9 - 2023-10-11 diff --git a/mtop/src/bin/mc.rs b/mtop/src/bin/mc.rs index 62ca245..cdc4ca9 100644 --- a/mtop/src/bin/mc.rs +++ b/mtop/src/bin/mc.rs @@ -1,9 +1,9 @@ use clap::{Args, Parser, Subcommand, ValueHint}; +use mtop::check::{Checker, MeasurementBundle}; use mtop_client::{MemcachedPool, Meta, PoolConfig, TLSConfig, Value}; -use std::error; -use std::io; use std::path::PathBuf; -use std::{env, process}; +use std::time::Duration; +use std::{env, error, io, process}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::runtime::Handle; use tracing::Level; @@ -59,6 +59,23 @@ enum Action { Keys(KeysCommand), Set(SetCommand), Touch(TouchCommand), + Check(CheckCommand), +} + +/// Run health checks against the cache. +#[derive(Debug, Args)] +struct CheckCommand { + /// How long to run the checks for in seconds. + #[arg(long, default_value_t = 60)] + time_secs: u64, + + /// Timeout for each portion of the check (DNS, connection, set, get) in seconds. + #[arg(long, default_value_t = 5)] + timeout_secs: u64, + + /// How long to wait between each health check in milliseconds. + #[arg(long, default_value_t = 100)] + delay_millis: u64, } /// Delete an item in the cache. @@ -153,6 +170,17 @@ async fn main() -> Result<(), Box> { }); match opts.mode { + Action::Check(c) => { + let checker = Checker::new( + &pool, + Duration::from_millis(c.delay_millis), + Duration::from_secs(c.timeout_secs), + ); + let results = checker.run(&opts.host, Duration::from_secs(c.time_secs)).await; + if let Err(e) = print_check_results(&results).await { + tracing::warn!(message = "error writing output", error = %e); + } + } Action::Delete(c) => { if let Err(e) = client.delete(c.key.clone()).await { tracing::error!(message = "unable to delete item", key = c.key, host = opts.host, error = %e); @@ -220,15 +248,94 @@ async fn print_data(val: &Value) -> io::Result<()> { async fn print_keys(metas: &[Meta], show_details: bool) -> io::Result<()> { let mut output = BufWriter::new(tokio::io::stdout()); - for meta in metas { - let line = if show_details { - format!("{}\t{}\t{}\n", meta.key, meta.expires, meta.size) - } else { - format!("{}\n", meta.key) - }; - - output.write_all(line.as_bytes()).await?; + + if show_details { + for meta in metas { + output + .write_all(format!("{}\t{}\t{}\n", meta.key, meta.expires, meta.size).as_bytes()) + .await?; + } + } else { + for meta in metas { + output.write_all(format!("{}\n", meta.key).as_bytes()).await?; + } } output.flush().await } + +async fn print_check_results(results: &MeasurementBundle) -> io::Result<()> { + let mut output = BufWriter::new(tokio::io::stdout()); + + output + .write_all( + format!( + "type=min total={:.9} dns={:.9} connection={:.9} set={:.9} get={:.9}\n", + results.total.min.as_secs_f64(), + results.dns.min.as_secs_f64(), + results.connections.min.as_secs_f64(), + results.sets.min.as_secs_f64(), + results.gets.min.as_secs_f64() + ) + .as_bytes(), + ) + .await?; + + output + .write_all( + format!( + "type=max total={:.9} dns={:.9} connection={:.9} set={:.9} get={:.9}\n", + results.total.max.as_secs_f64(), + results.dns.max.as_secs_f64(), + results.connections.max.as_secs_f64(), + results.sets.max.as_secs_f64(), + results.gets.max.as_secs_f64(), + ) + .as_bytes(), + ) + .await?; + + output + .write_all( + format!( + "type=avg total={:.9} dns={:.9} connection={:.9} set={:.9} get={:.9}\n", + results.total.avg.as_secs_f64(), + results.dns.avg.as_secs_f64(), + results.connections.avg.as_secs_f64(), + results.sets.avg.as_secs_f64(), + results.gets.avg.as_secs_f64() + ) + .as_bytes(), + ) + .await?; + + output + .write_all( + format!( + "type=stddev total={:.9} dns={:.9} connection={:.9} set={:.9} get={:.9}\n", + results.total.std_dev.as_secs_f64(), + results.dns.std_dev.as_secs_f64(), + results.connections.std_dev.as_secs_f64(), + results.sets.std_dev.as_secs_f64(), + results.gets.std_dev.as_secs_f64() + ) + .as_bytes(), + ) + .await?; + + output + .write_all( + format!( + "type=failures total={} dns={} connection={} set={} get={}\n", + results.failures.total, + results.failures.dns, + results.failures.connections, + results.failures.sets, + results.failures.gets, + ) + .as_bytes(), + ) + .await?; + + output.flush().await +} diff --git a/mtop/src/check.rs b/mtop/src/check.rs new file mode 100644 index 0000000..d65c446 --- /dev/null +++ b/mtop/src/check.rs @@ -0,0 +1,239 @@ +use mtop_client::{MemcachedPool, MtopError}; +use std::time::{Duration, Instant}; +use std::{cmp, fmt}; +use tokio::net::ToSocketAddrs; +use tokio::time; + +const KEY: &str = "mc-check-test"; +const VALUE: &[u8] = "test".as_bytes(); + +/// Repeatedly make connections to a Memcached server to verify connectivity. +#[derive(Debug)] +pub struct Checker<'a> { + pool: &'a MemcachedPool, + delay: Duration, + timeout: Duration, +} + +impl<'a> Checker<'a> { + /// Create a new `Checker` that uses connections created from the provided `pool`. `delay` + /// is the amount of time to wait between each test. `timeout` is how long each individual + /// part of the test may take (DNS resolution, connecting, setting a value, and fetching + /// a value). + pub fn new(pool: &'a MemcachedPool, delay: Duration, timeout: Duration) -> Self { + Self { pool, delay, timeout } + } + + /// Perform connection tests for a particular hosts in a loop and return information + /// about the time taken for each step of the test (DNS resolution, connecting, setting + /// a value, and fetching a value) and counts of failures or timeouts during each step. + /// + /// Note that each test run performs a DNS lookup and creates a brand new connection. + pub async fn run(&self, host: &str, time: Duration) -> MeasurementBundle { + let mut dns_builder = MeasurementBuilder::default(); + let mut conn_builder = MeasurementBuilder::default(); + let mut set_builder = MeasurementBuilder::default(); + let mut get_builder = MeasurementBuilder::default(); + let mut total_builder = MeasurementBuilder::default(); + let mut failures = Failures::default(); + let start = Instant::now(); + + // Note that we don't return the connection to the pool each iteration. This ensures + // we're creating a new connection each time and thus actually testing the network + // when doing the check. + loop { + if start.elapsed() > time { + break; + } + + time::sleep(self.delay).await; + + let dns_start = Instant::now(); + let ip_addr = match time::timeout(self.timeout, resolve_host(host)).await { + Ok(Ok(v)) => v, + Ok(Err(e)) => { + tracing::warn!(message = "failed to resolve host", host = host, err = %e); + failures.total += 1; + failures.dns += 1; + continue; + } + Err(_) => { + tracing::warn!(message = "timeout resolving host", host = host); + failures.total += 1; + failures.dns += 1; + continue; + } + }; + + let dns_time = dns_start.elapsed(); + let conn_start = Instant::now(); + let mut conn = match time::timeout(self.timeout, self.pool.get(&ip_addr)).await { + Ok(Ok(v)) => v, + Ok(Err(e)) => { + tracing::warn!(message = "failed to connect to host", host = host, addr = ip_addr, err = %e); + failures.total += 1; + failures.connections += 1; + continue; + } + Err(_) => { + tracing::warn!(message = "timeout connecting to host", host = host, addr = ip_addr); + failures.total += 1; + failures.connections += 1; + continue; + } + }; + + let conn_time = conn_start.elapsed(); + let set_start = Instant::now(); + match time::timeout(self.timeout, conn.set(KEY.to_owned(), 0, 60, VALUE.to_vec())).await { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!(message = "failed to set key", host = host, addr = ip_addr, err = %e); + failures.total += 1; + failures.sets += 1; + continue; + } + Err(_) => { + tracing::warn!(message = "timeout setting key", host = host, addr = ip_addr); + failures.total += 1; + failures.sets += 1; + continue; + } + } + + let set_time = set_start.elapsed(); + let get_start = Instant::now(); + match time::timeout(self.timeout, conn.get(&[KEY.to_owned()])).await { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::warn!(message = "failed to get key", host = host, addr = ip_addr, err = %e); + failures.total += 1; + failures.gets += 1; + continue; + } + Err(_) => { + tracing::warn!(message = "timeout getting key", host = host, addr = ip_addr); + failures.total += 1; + failures.gets += 1; + continue; + } + } + + let get_time = get_start.elapsed(); + let total_time = dns_start.elapsed(); + + tracing::info!( + timeout = ?self.timeout, + total = ?total_time, + dns = ?dns_time, + connection = ?conn_time, + set = ?set_time, + get = ?get_time, + ); + + dns_builder.add(dns_time); + conn_builder.add(conn_time); + set_builder.add(set_time); + get_builder.add(get_time); + total_builder.add(total_time); + } + + let dns = dns_builder.build(); + let connections = conn_builder.build(); + let gets = get_builder.build(); + let sets = set_builder.build(); + let total = total_builder.build(); + + MeasurementBundle { + total, + dns, + connections, + sets, + gets, + failures, + } + } +} + +async fn resolve_host(host: A) -> Result +where + A: ToSocketAddrs + fmt::Display, +{ + tokio::net::lookup_host(&host) + .await + .map_err(|e| MtopError::from((host.to_string(), e))) + .and_then(|mut i| { + i.next() + .map(|a| a.to_string()) + .ok_or_else(|| MtopError::configuration(format!("hostname returned no addresses: {}", host))) + }) +} + +/// Accumulate measurements of how long a particular operation takes and compute +/// the min, max, average, and standard deviation of them. +#[derive(Debug, Default)] +pub struct MeasurementBuilder { + times: Vec, +} + +impl MeasurementBuilder { + pub fn add(&mut self, d: Duration) { + self.times.push(d); + } + + pub fn build(self) -> Measurement { + let mut min = Duration::MAX; + let mut max = Duration::ZERO; + let mut sum = Duration::ZERO; + let count = self.times.len(); + + for d in self.times.iter().cloned() { + min = cmp::min(min, d); + max = cmp::max(max, d); + sum += d; + } + + let avg = sum.checked_div(count as u32).unwrap_or(Duration::ZERO); + let avg_f64 = avg.as_secs_f64(); + let mut deviance = 0f64; + + for d in self.times.iter() { + deviance += (d.as_secs_f64() - avg_f64).powi(2) + } + + let std_dev = if count != 0 { + Duration::from_secs_f64((deviance / count as f64).sqrt()) + } else { + Duration::ZERO + }; + + Measurement { min, max, avg, std_dev } + } +} + +#[derive(Debug, Default, Clone, Eq, PartialEq)] +pub struct Measurement { + pub min: Duration, + pub max: Duration, + pub avg: Duration, + pub std_dev: Duration, +} + +#[derive(Debug, Default, Clone, Eq, PartialEq)] +pub struct MeasurementBundle { + pub total: Measurement, + pub dns: Measurement, + pub connections: Measurement, + pub sets: Measurement, + pub gets: Measurement, + pub failures: Failures, +} + +#[derive(Debug, Default, Clone, Eq, PartialEq)] +pub struct Failures { + pub total: u64, + pub dns: u64, + pub connections: u64, + pub sets: u64, + pub gets: u64, +} diff --git a/mtop/src/lib.rs b/mtop/src/lib.rs index 0c86ab0..fd0b150 100644 --- a/mtop/src/lib.rs +++ b/mtop/src/lib.rs @@ -1,5 +1,6 @@ #![allow(clippy::uninlined_format_args)] +pub mod check; pub mod queue; pub mod tracing; pub mod ui; diff --git a/tools/lossy-down.sh b/tools/lossy-down.sh new file mode 100755 index 0000000..d33605b --- /dev/null +++ b/tools/lossy-down.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +set -o errexit + +exec tc qdisc delete dev lo root diff --git a/tools/lossy-up.sh b/tools/lossy-up.sh new file mode 100755 index 0000000..8ce076a --- /dev/null +++ b/tools/lossy-up.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +set -o errexit + +exec tc qdisc add dev lo root netem loss 5% 25%