From 7c635897c33bf445326a3fe46782725beaeace5f Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Wed, 20 Nov 2019 13:54:04 +0000 Subject: [PATCH 1/3] Print welcome message, sleep after sending metrics --- src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index a1a4ba7..f531b6c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -98,9 +98,8 @@ fn main() { let mut previous_total_processed: Option = None; + debug!("Welcome to datadog-sidekiq"); loop { - sleep(Duration::new(interval, 0)); - // Redis was down. Try to re-establish a connection. if reconnect { info!("Trying to connect to Redis again."); @@ -151,6 +150,8 @@ fn main() { Ok(_) => {}, Err(err) => { error!("An error occured while sending the series to DataDog: {}", err); } } + + sleep(Duration::new(interval, 0)); } } From 9174b0db72cae16ca46dc03ac26bd1573a0e0f81 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Wed, 20 Nov 2019 13:55:13 +0000 Subject: [PATCH 2/3] Push per-queue metrics with queue_name tag --- src/main.rs | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index f531b6c..f15fb7d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -118,11 +118,26 @@ fn main() { let mut series = Series::new(); - match get_total_enqueued(&conn, &redis_ns) { - Ok(Some(total_enqueued)) => { - series.push(Metric::gauge("sidekiq.enqueued", total_enqueued, tags.clone())); - }, - Ok(None) => {}, + match get_enqueued(&conn, &redis_ns) { + Ok(ref enqueued_count_by_queue) => { + for (queue_name, count) in enqueued_count_by_queue { + let mut extra = vec!["queue_name:".to_string() + queue_name]; + let mut new_tags = tags.clone(); + new_tags.append(&mut extra); + series.push(Metric::gauge("sidekiq.enqueued", *count, new_tags)); + } + + let mut total_enqueued = 0; + for (_queue_name, count) in enqueued_count_by_queue { + total_enqueued += count; + } + series.push(Metric::gauge( + "sidekiq.total_enqueued", + total_enqueued, + tags.clone(), + )); + } + Err(redis_err) => { error!("A Redis error occured: {}", redis_err); reconnect = true; @@ -159,18 +174,28 @@ fn establish_redis_connection(client: &RedisClient) -> Result Result, RedisError> { +fn get_enqueued( + conn: &Connection, + redis_ns: &Namespace, +) -> Result, RedisError> { + let mut actual_result: HashMap = HashMap::new(); let queues: HashSet = conn.smembers(redis_ns.wrap("queues"))?; let mut pipe = redis::pipe(); - for queue in queues { + + for queue in &queues { let queue_name = ["queue", &queue].join(":"); pipe.add_command(cmd("LLEN").arg(redis_ns.wrap(queue_name))); } let total_per_queue: Vec = pipe.query(conn)?; + let mut i: usize = 0; + for queue in &queues { + actual_result.insert(queue.to_string(), total_per_queue[i]); + i += 1; + } - Ok(Some(total_per_queue.iter().sum())) + Ok(actual_result) } fn get_total_processed(conn: &Connection, redis_ns: &Namespace) -> Result, RedisError> { From 0240ffac0e8d23beba67a952f283a97d8ee24e62 Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Wed, 20 Nov 2019 13:55:21 +0000 Subject: [PATCH 3/3] Format --- src/main.rs | 125 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 88 insertions(+), 37 deletions(-) diff --git a/src/main.rs b/src/main.rs index f15fb7d..52c9dec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,24 @@ -extern crate redis; extern crate curl; +extern crate redis; extern crate serde; -#[macro_use] extern crate serde_derive; +#[macro_use] +extern crate serde_derive; extern crate serde_json; extern crate time; -#[macro_use] extern crate log; +#[macro_use] +extern crate log; extern crate env_logger; +use curl::easy::{Easy, List as HeaderList}; +use redis::{cmd, Client as RedisClient, Commands, Connection, RedisError}; +use std::collections::HashMap; +use std::collections::HashSet; use std::env; -use std::io::Read; +use std::error::Error; use std::fmt::Display; +use std::io::Read; use std::thread::sleep; use std::time::Duration; -use std::collections::HashSet; -use std::error::Error; -use redis::{Client as RedisClient, Commands, cmd, Connection, RedisError}; -use curl::easy::{Easy, List as HeaderList}; const DD_SERIES_URL: &str = "https://app.datadoghq.com/api/v1/series"; const DEFAULT_INTERVAL: u64 = 60; @@ -39,18 +42,28 @@ impl Series { struct Metric { metric: String, points: Vec<(i64, u32)>, - #[serde(rename="type")] + #[serde(rename = "type")] metric_type: String, tags: Vec, } impl Metric { - fn new, MT: Into>(metric: M, value: u32, metric_type: MT, tags: Vec) -> Self { + fn new, MT: Into>( + metric: M, + value: u32, + metric_type: MT, + tags: Vec, + ) -> Self { let metric = metric.into(); let points = vec![(time::get_time().sec, value)]; let metric_type = metric_type.into(); - Metric { metric, points, metric_type, tags } + Metric { + metric, + points, + metric_type, + tags, + } } fn gauge>(metric: M, value: u32, tags: Vec) -> Self { @@ -79,13 +92,19 @@ fn main() { env_logger::init().unwrap(); let interval = env::var("INTERVAL") - .map(|value| value.parse::().expect("INTERVAL is not a valid number.")) + .map(|value| { + value + .parse::() + .expect("INTERVAL is not a valid number.") + }) .unwrap_or(DEFAULT_INTERVAL); let redis_url = env::var("REDIS_URL").expect("REDIS_URL is missing."); let redis_ns = Namespace::new(env::var("REDIS_NAMESPACE").ok()); - let redis = RedisClient::open(&*redis_url).expect("Could not connect to the provided REDIS_URL."); - let mut conn = establish_redis_connection(&redis).expect("Could not establish a connection to Redis."); + let redis = + RedisClient::open(&*redis_url).expect("Could not connect to the provided REDIS_URL."); + let mut conn = + establish_redis_connection(&redis).expect("Could not establish a connection to Redis."); let mut reconnect = false; let dd_api_key = env::var("DD_API_KEY").expect("DD_API_KEY is missing."); @@ -108,9 +127,12 @@ fn main() { Ok(new_connection) => { conn = new_connection; reconnect = false; - }, + } Err(redis_err) => { - error!("An error occured while trying to re-establish the Redis connection: {}", redis_err); + error!( + "An error occured while trying to re-establish the Redis connection: {}", + redis_err + ); continue; } } @@ -148,22 +170,32 @@ fn main() { match get_total_processed(&conn, &redis_ns) { Ok(total_processed) => { if previous_total_processed.is_some() && total_processed.is_some() { - let current_processed = total_processed.unwrap() - previous_total_processed.unwrap(); - series.push(Metric::gauge("sidekiq.processed", current_processed, tags.clone())); + let current_processed = + total_processed.unwrap() - previous_total_processed.unwrap(); + series.push(Metric::gauge( + "sidekiq.processed", + current_processed, + tags.clone(), + )); } previous_total_processed = total_processed; - }, + } Err(redis_err) => { error!("A Redis error occured: {}", redis_err); reconnect = true; continue; - }, + } } match deliver_series(&url, series) { - Ok(_) => {}, - Err(err) => { error!("An error occured while sending the series to DataDog: {}", err); } + Ok(_) => {} + Err(err) => { + error!( + "An error occured while sending the series to DataDog: {}", + err + ); + } } sleep(Duration::new(interval, 0)); @@ -204,38 +236,57 @@ fn get_total_processed(conn: &Connection, redis_ns: &Namespace) -> Result