diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b204268..2d8adba1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,5 +9,5 @@ ### Added -- Support Redis pubsub and Momento topics. +- Support Momento topics. - Basic HTTP/1.1 and HTTP/2.0 load generation. diff --git a/Cargo.lock b/Cargo.lock index 01299d43..a2e9ed10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -403,9 +403,9 @@ dependencies = [ "boring", "clocksource", "macros", - "metriken", + "metriken 0.1.4", "net", - "ringlog", + "ringlog 0.1.1", "serde", ] @@ -765,9 +765,9 @@ dependencies = [ [[package]] name = "heatmap" -version = "0.7.5" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdeff4026a4e69d8ae19b51fa661841da47c3448ba67ad44affe893a2c985e31" +checksum = "d8cbf90780d798f8068becac8b922ab9901af29e2406e6327c4fda70353c58fa" dependencies = [ "clocksource", "histogram", @@ -1121,7 +1121,7 @@ source = "git+https://github.com/pelikan-io/pelikan#bb18e0c51712779721fc1d1b1a48 dependencies = [ "common", "config", - "ringlog", + "ringlog 0.1.1", ] [[package]] @@ -1164,9 +1164,24 @@ dependencies = [ "clocksource", "heatmap", "linkme", - "metriken-derive", + "metriken-derive 0.1.0", + "once_cell", + "parking_lot", +] + +[[package]] +name = "metriken" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "283da0f4c6ea37b3d8f932e02d368f1c1f77b3072653a91a5cdef6931b6db889" +dependencies = [ + "heatmap", + "histogram", + "linkme", + "metriken-derive 0.2.0", "once_cell", "parking_lot", + "phf", ] [[package]] @@ -1181,6 +1196,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "metriken-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0213a7a12e01c66357d1f3491e2d7d74b2373540ea0a4bfd928a4c0e5e02f432" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "mime" version = "0.3.17" @@ -1333,7 +1360,7 @@ dependencies = [ "boring-sys", "foreign-types-shared", "libc", - "metriken", + "metriken 0.1.4", "mio 0.8.6", ] @@ -1489,6 +1516,48 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_macros", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_macros" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3444646e286606587e49f3bcf1679b8cef1dc2c5ecc29ddacaffc305180d464b" +dependencies = [ + "phf_generator", + "phf_shared", + "proc-macro2", + "quote", + "syn 2.0.15", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.0.12" @@ -1629,7 +1698,7 @@ source = "git+https://github.com/pelikan-io/pelikan#bb18e0c51712779721fc1d1b1a48 dependencies = [ "common", "logger", - "metriken", + "metriken 0.1.4", "nom", "protocol-common", ] @@ -1642,7 +1711,7 @@ dependencies = [ "common", "config", "logger", - "metriken", + "metriken 0.1.4", "protocol-common", "storage-types", ] @@ -1793,7 +1862,20 @@ dependencies = [ "ahash", "clocksource", "log", - "metriken", + "metriken 0.1.4", + "mpmc", +] + +[[package]] +name = "ringlog" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c90d3d1e4db43daedfdae26373e7b4cb4f56b26f217fc8f661e4f439827b4a46" +dependencies = [ + "ahash", + "clocksource", + "log", + "metriken 0.2.1", "mpmc", ] @@ -1814,7 +1896,7 @@ dependencies = [ "http-body-util", "humantime", "hyper 1.0.0-rc.3", - "metriken", + "metriken 0.2.1", "mio 0.8.6", "momento", "net", @@ -1826,7 +1908,7 @@ dependencies = [ "rand_xoshiro", "ratelimit", "redis", - "ringlog", + "ringlog 0.2.0", "serde", "serde_json", "session", @@ -2024,7 +2106,7 @@ dependencies = [ "bytes 1.4.0", "common", "log", - "metriken", + "metriken 0.1.4", "net", "protocol-common", ] @@ -2084,6 +2166,12 @@ dependencies = [ "time", ] +[[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + [[package]] name = "slab" version = "0.4.8" diff --git a/Cargo.toml b/Cargo.toml index 27651662..6a22b863 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ http-body-util = "0.1.0-rc.2" hyper = { version = "1.0.0-rc.3", features = ["http1", "http2", "client"]} humantime = "2.1.0" momento = "0.30.0" -metriken = "0.1.4" +metriken = "0.2.1" mio = "0.8.5" net = { git = "https://github.com/pelikan-io/pelikan" } paste = "1.0.12" @@ -34,7 +34,7 @@ rand_distr = "0.4.3" rand_xoshiro = "0.6.0" ratelimit = "0.7.0" redis = { version = "0.22.3", features = ["tokio-comp"] } -ringlog = "0.1.1" +ringlog = "0.2.0" serde = "1.0.144" serde_json = "1.0.94" session = { git = "https://github.com/pelikan-io/pelikan" } diff --git a/src/admin/mod.rs b/src/admin/mod.rs index 35363c8d..c9d3177e 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -138,8 +138,10 @@ impl<'a> TryFrom<&'a metriken::MetricEntry> for Metric<'a> { let percentiles = PERCENTILES .iter() .map(|(label, percentile)| { - let value = heatmap.percentile(*percentile).map(|b| b.high()).ok(); - + let value = match heatmap.percentile(*percentile) { + Some(Ok(bucket)) => Some(bucket.high()), + _ => None, + }; (*label, *percentile, value) }) .collect(); diff --git a/src/clients/http1.rs b/src/clients/http1.rs index ff24e65f..d222daf9 100644 --- a/src/clients/http1.rs +++ b/src/clients/http1.rs @@ -37,7 +37,7 @@ async fn task(work_receiver: Receiver, endpoint: String, config: Confi if session_requests != 0 { let stop = Instant::now(); let lifecycle_ns = (stop - session_start).as_nanos(); - let _ = SESSION_LIFECYCLE_REQUESTS.increment(stop, lifecycle_ns, 1); + let _ = SESSION_LIFECYCLE_REQUESTS.increment(stop, lifecycle_ns); } CONNECT.increment(); let stream = match timeout( @@ -168,8 +168,8 @@ async fn task(work_receiver: Receiver, endpoint: String, config: Confi let latency = stop.duration_since(start).as_nanos(); - let _ = REQUEST_LATENCY.increment(start, latency, 1); - let _ = RESPONSE_LATENCY.increment(stop, latency, 1); + let _ = REQUEST_LATENCY.increment(start, latency); + let _ = RESPONSE_LATENCY.increment(stop, latency); if let Some(header) = response .headers() diff --git a/src/clients/http2.rs b/src/clients/http2.rs index 530ae7d2..740e2987 100644 --- a/src/clients/http2.rs +++ b/src/clients/http2.rs @@ -232,8 +232,8 @@ async fn task( let latency = stop.duration_since(start).as_nanos(); - let _ = REQUEST_LATENCY.increment(start, latency, 1); - let _ = RESPONSE_LATENCY.increment(stop, latency, 1); + let _ = REQUEST_LATENCY.increment(start, latency); + let _ = RESPONSE_LATENCY.increment(stop, latency); if let Some(header) = response .headers() diff --git a/src/clients/memcache/mod.rs b/src/clients/memcache/mod.rs index b0cb13c7..d243412e 100644 --- a/src/clients/memcache/mod.rs +++ b/src/clients/memcache/mod.rs @@ -165,8 +165,8 @@ async fn task(work_receiver: Receiver, endpoint: String, config: Confi // increment success stats and latency RESPONSE_OK.increment(); - let _ = REQUEST_LATENCY.increment(start, latency_ns, 1); - let _ = RESPONSE_LATENCY.increment(stop, latency_ns, 1); + let _ = REQUEST_LATENCY.increment(start, latency_ns); + let _ = RESPONSE_LATENCY.increment(stop, latency_ns); // preserve the connection for the next request stream = Some(s); diff --git a/src/clients/momento/mod.rs b/src/clients/momento/mod.rs index 639cc723..6c21c120 100644 --- a/src/clients/momento/mod.rs +++ b/src/clients/momento/mod.rs @@ -172,8 +172,8 @@ async fn task( let latency = stop.duration_since(start).as_nanos(); - let _ = REQUEST_LATENCY.increment(start, latency, 1); - let _ = RESPONSE_LATENCY.increment(stop, latency, 1); + let _ = REQUEST_LATENCY.increment(start, latency); + let _ = RESPONSE_LATENCY.increment(stop, latency); } Err(ResponseError::Exception) => { RESPONSE_EX.increment(); diff --git a/src/clients/ping.rs b/src/clients/ping.rs index 1da06e6c..3e52ddcd 100644 --- a/src/clients/ping.rs +++ b/src/clients/ping.rs @@ -167,8 +167,8 @@ async fn task(work_receiver: Receiver, endpoint: String, config: Confi let latency = stop.duration_since(start).as_nanos(); - let _ = REQUEST_LATENCY.increment(start, latency, 1); - let _ = RESPONSE_LATENCY.increment(stop, latency, 1); + let _ = REQUEST_LATENCY.increment(start, latency); + let _ = RESPONSE_LATENCY.increment(stop, latency); } Err(ResponseError::Exception) => { // record execption diff --git a/src/clients/redis/mod.rs b/src/clients/redis/mod.rs index 2859f2c6..af96cc4d 100644 --- a/src/clients/redis/mod.rs +++ b/src/clients/redis/mod.rs @@ -167,8 +167,8 @@ async fn task(work_receiver: Receiver, endpoint: String, config: Confi connection = Some(con); RESPONSE_OK.increment(); - let _ = REQUEST_LATENCY.increment(start, latency_ns, 1); - let _ = RESPONSE_LATENCY.increment(stop, latency_ns, 1); + let _ = REQUEST_LATENCY.increment(start, latency_ns); + let _ = RESPONSE_LATENCY.increment(stop, latency_ns); } Err(ResponseError::Exception) => { CONNECT_CURR.decrement(); diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 8553b1e9..a0bd42bd 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -6,8 +6,6 @@ use metriken::Lazy; use paste::paste; use std::concat; -type Duration = clocksource::Duration>; - pub static PERCENTILES: &[(&str, f64)] = &[ ("p25", 25.0), ("p50", 50.0), @@ -91,19 +89,17 @@ macro_rules! counter { name = $name, crate = metriken )] - pub static $ident: Lazy = metriken::Lazy::new(|| { - metriken::Counter::new() - }); + pub static $ident: Lazy = + metriken::Lazy::new(|| metriken::Counter::new()); }; ($ident:ident, $name:tt, $description:tt) => { #[metriken::metric( - name = $name, - description = $description, - crate = metriken - )] - pub static $ident: Lazy = metriken::Lazy::new(|| { - metriken::Counter::new() - }); + name = $name, + description = $description, + crate = metriken + )] + pub static $ident: Lazy = + metriken::Lazy::new(|| metriken::Counter::new()); }; } @@ -115,9 +111,7 @@ macro_rules! gauge { name = $name, crate = metriken )] - pub static $ident: Lazy = metriken::Lazy::new(|| { - metriken::Gauge::new() - }); + pub static $ident: Lazy = metriken::Lazy::new(|| metriken::Gauge::new()); }; ($ident:ident, $name:tt, $description:tt) => { #[metriken::metric( @@ -125,9 +119,7 @@ macro_rules! gauge { description = $description, crate = metriken )] - pub static $ident: Lazy = metriken::Lazy::new(|| { - metriken::Gauge::new() - }); + pub static $ident: Lazy = metriken::Lazy::new(|| metriken::Gauge::new()); }; } @@ -139,9 +131,13 @@ macro_rules! heatmap { name = $name, crate = metriken )] - pub static $ident: Lazy = metriken::Lazy::new(|| { - metriken::Heatmap::new(0, 8, 64, Duration::from_secs(60), Duration::from_secs(1), None, None).unwrap() - }); + pub static $ident: metriken::Heatmap = metriken::Heatmap::new( + 0, + 8, + 64, + core::time::Duration::from_secs(60), + core::time::Duration::from_secs(1), + ); }; ($ident:ident, $name:tt, $description:tt) => { #[metriken::metric( @@ -149,9 +145,13 @@ macro_rules! heatmap { description = $description, crate = metriken )] - pub static $ident: Lazy = metriken::Lazy::new(|| { - metriken::Heatmap::new(0, 8, 64, Duration::from_secs(60), Duration::from_secs(1), None, None).unwrap() - }); + pub static $ident: metriken::Heatmap = metriken::Heatmap::new( + 0, + 8, + 64, + core::time::Duration::from_secs(60), + core::time::Duration::from_secs(1), + ); }; } diff --git a/src/output/mod.rs b/src/output/mod.rs index 94bd6712..814477dd 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -130,10 +130,10 @@ fn client_stats(snapshot: &mut Snapshot, elapsed: f64) -> u64 { let mut latencies = "Client Response Latency (us):".to_owned(); for (label, percentile) in PERCENTILES { - let value = RESPONSE_LATENCY - .percentile(*percentile) - .map(|b| format!("{}", b.high() / 1000)) - .unwrap_or_else(|_| "ERR".to_string()); + let value = match RESPONSE_LATENCY.percentile(*percentile) { + Some(Ok(b)) => format!("{}", b.high() / 1000), + _ => "ERR".to_string(), + }; latencies.push_str(&format!(" {label}: {value}")) } @@ -194,10 +194,10 @@ fn pubsub_stats(snapshot: &mut Snapshot, elapsed: f64) -> u64 { let mut latencies = "Pubsub Publish Latency (us):".to_owned(); for (label, percentile) in PERCENTILES { - let value = PUBSUB_PUBLISH_LATENCY - .percentile(*percentile) - .map(|b| format!("{}", b.high() / 1000)) - .unwrap_or_else(|_| "ERR".to_string()); + let value = match RESPONSE_LATENCY.percentile(*percentile) { + Some(Ok(b)) => format!("{}", b.high() / 1000), + _ => "ERR".to_string(), + }; latencies.push_str(&format!(" {label}: {value}")) } @@ -205,10 +205,10 @@ fn pubsub_stats(snapshot: &mut Snapshot, elapsed: f64) -> u64 { let mut latencies = "Pubsub End-to-End Latency (us):".to_owned(); for (label, percentile) in PERCENTILES { - let value = PUBSUB_LATENCY - .percentile(*percentile) - .map(|b| format!("{}", b.high() / 1000)) - .unwrap_or_else(|_| "ERR".to_string()); + let value = match RESPONSE_LATENCY.percentile(*percentile) { + Some(Ok(b)) => format!("{}", b.high() / 1000), + _ => "ERR".to_string(), + }; latencies.push_str(&format!(" {label}: {value}")) } @@ -315,7 +315,7 @@ fn heatmap_to_buckets(heatmap: &Heatmap) -> RequestLatencies { // histograms. However, this only kicks in after the entire histogram // has been populated, so for the first minute, no histograms // are returned (the histogram at offset 59 is still invalid). - if let Some(histogram) = heatmap.iter().nth(59) { + if let Some(Some(histogram)) = heatmap.iter().map(|mut i| i.nth(59)) { let p = histogram.parameters(); let mut index = Vec::new(); let mut count = Vec::new(); diff --git a/src/pubsub/momento.rs b/src/pubsub/momento.rs index 44806260..b9057127 100644 --- a/src/pubsub/momento.rs +++ b/src/pubsub/momento.rs @@ -123,7 +123,7 @@ async fn subscriber_task(client: Arc, cache_name: String, topic: St let latency = now_unix - UnixInstant::from_nanos(ts); let then = now - latency; - let _ = PUBSUB_LATENCY.increment(then, latency.as_nanos(), 1); + let _ = PUBSUB_LATENCY.increment(then, latency.as_nanos()); PUBSUB_RECEIVE.increment(); PUBSUB_RECEIVE_OK.increment(); @@ -286,7 +286,7 @@ async fn publisher_task( let latency = stop.duration_since(start).as_nanos(); PUBSUB_PUBLISH_OK.increment(); - let _ = PUBSUB_PUBLISH_LATENCY.increment(start, latency, 1); + let _ = PUBSUB_PUBLISH_LATENCY.increment(start, latency); } Err(ResponseError::Exception) => { PUBSUB_PUBLISH_EX.increment();