Skip to content

Commit

Permalink
Fix dogstatsd aggregator (#654)
Browse files Browse the repository at this point in the history
* fix: id public and use optional for tags

* change: utility to easily create values
  • Loading branch information
alexgallotta authored Oct 3, 2024
1 parent 6e7d1e6 commit d01cf11
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 30 deletions.
44 changes: 26 additions & 18 deletions dogstatsd/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use ustr::Ustr;
impl MetricValue {
fn aggregate(&mut self, metric: Metric) {
// safe because we know there's at least one value when we parse
// TODO aggregating different types should return error
match self {
MetricValue::Count(count) => *count += metric.value.get_value().unwrap_or_default(),
MetricValue::Gauge(gauge) => *gauge = metric.value.get_value().unwrap_or_default(),
Expand Down Expand Up @@ -128,7 +129,7 @@ impl Aggregator {
.map
.iter()
.filter_map(|entry| match entry.value {
MetricValue::Distribution(_) => build_sketch(now, entry, &self.tags),
MetricValue::Distribution(_) => build_sketch(now, entry, self.tags.clone()),
_ => None,
})
.collect();
Expand All @@ -154,7 +155,7 @@ impl Aggregator {
}
false
})
.filter_map(|entry| build_sketch(now, &entry, &self.tags))
.filter_map(|entry| build_sketch(now, &entry, self.tags.clone()))
{
let next_chunk_size = sketch.compute_size();

Expand Down Expand Up @@ -188,7 +189,7 @@ impl Aggregator {
.iter()
.filter_map(|entry| match entry.value {
MetricValue::Distribution(_) => None,
_ => build_metric(entry, &self.tags),
_ => build_metric(entry, self.tags.clone()),
})
.for_each(|metric| series_payload.series.push(metric));
series_payload
Expand All @@ -209,7 +210,7 @@ impl Aggregator {
}
true
})
.filter_map(|entry| build_metric(&entry, &self.tags))
.filter_map(|entry| build_metric(&entry, self.tags.clone()))
{
// TODO serialization is made twice for each point. If we return a Vec<u8> we can avoid
// that
Expand Down Expand Up @@ -247,13 +248,13 @@ impl Aggregator {
batched_payloads
}

pub fn get_entry_by_id(&self, name: Ustr, tags: &SortedTags) -> Option<&Metric> {
pub fn get_entry_by_id(&self, name: Ustr, tags: &Option<SortedTags>) -> Option<&Metric> {
let id = metric::id(name, tags);
self.map.find(id, |m| m.id == id)
}
}

fn build_sketch(now: i64, entry: &Metric, base_tag_vec: &SortedTags) -> Option<Sketch> {
fn build_sketch(now: i64, entry: &Metric, mut base_tag_vec: SortedTags) -> Option<Sketch> {
let sketch = entry.value.get_sketch()?;
let mut dogsketch = Dogsketch::default();
sketch.merge_to_dogsketch(&mut dogsketch);
Expand All @@ -263,14 +264,20 @@ fn build_sketch(now: i64, entry: &Metric, base_tag_vec: &SortedTags) -> Option<S
sketch.set_dogsketches(vec![dogsketch]);
let name = entry.name.to_string();
sketch.set_metric(name.clone().into());
let mut tags = entry.tags.clone();
tags.extend(base_tag_vec);
sketch.set_tags(tags.to_chars());
if let Some(tags) = entry.tags.clone() {
base_tag_vec.extend(&tags);
}
sketch.set_tags(base_tag_vec.to_chars());
Some(sketch)
}

fn build_metric(entry: &Metric, base_tag_vec: &SortedTags) -> Option<MetricToShip> {
let resources = entry.tags.to_resources();
fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<MetricToShip> {
let resources;
if let Some(tags) = entry.tags.clone() {
resources = tags.to_resources();
} else {
resources = Vec::new();
}
let kind = match entry.value {
MetricValue::Count(_) => datadog::DdMetricKind::Count,
MetricValue::Gauge(_) => datadog::DdMetricKind::Gauge,
Expand All @@ -285,15 +292,16 @@ fn build_metric(entry: &Metric, base_tag_vec: &SortedTags) -> Option<MetricToShi
.as_secs(),
};

let mut tags = entry.tags.clone();
tags.extend(base_tag_vec);
if let Some(tags) = entry.tags.clone() {
base_tag_vec.extend(&tags);
}

Some(MetricToShip {
metric: entry.name.as_str(),
resources,
kind,
points: [point; 1],
tags: tags.to_strings(),
tags: base_tag_vec.to_strings(),
})
}

Expand Down Expand Up @@ -323,7 +331,7 @@ pub mod tests {
) {
let aggregator = aggregator_mutex.lock().unwrap();
if let Some(e) =
aggregator.get_entry_by_id(metric_id.into(), &SortedTags::parse(tags).unwrap())
aggregator.get_entry_by_id(metric_id.into(), &Some(SortedTags::parse(tags).unwrap()))
{
let metric = e.value.get_value().unwrap();
assert!((metric - value).abs() < PRECISION);
Expand All @@ -334,7 +342,7 @@ pub mod tests {

pub fn assert_sketch(aggregator_mutex: &Mutex<Aggregator>, metric_id: &str, value: f64) {
let aggregator = aggregator_mutex.lock().unwrap();
if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &EMPTY_TAGS) {
if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None) {
let metric = e.value.get_sketch().unwrap();
assert!((metric.max().unwrap() - value).abs() < PRECISION);
assert!((metric.min().unwrap() - value).abs() < PRECISION);
Expand Down Expand Up @@ -418,15 +426,15 @@ pub mod tests {

assert_eq!(aggregator.map.len(), 2);
if let Some(v) =
aggregator.get_entry_by_id("foo".into(), &SortedTags::parse("k2:v2").unwrap())
aggregator.get_entry_by_id("foo".into(), &Some(SortedTags::parse("k2:v2").unwrap()))
{
assert_eq!(v.value.get_value().unwrap(), 5f64);
} else {
panic!("failed to get value by id");
}

if let Some(v) =
aggregator.get_entry_by_id("test".into(), &SortedTags::parse("k1:v1").unwrap())
aggregator.get_entry_by_id("test".into(), &Some(SortedTags::parse("k1:v1").unwrap()))
{
assert_eq!(v.value.get_value().unwrap(), 3f64);
} else {
Expand Down
54 changes: 42 additions & 12 deletions dogstatsd/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ pub enum MetricValue {
Distribution(DDSketch),
}

impl MetricValue {
pub fn count(v: f64) -> MetricValue {
MetricValue::Count(v)
}

pub fn gauge(v: f64) -> MetricValue {
MetricValue::Gauge(v)
}

pub fn distribution(v: f64) -> MetricValue {
let sketch = &mut DDSketch::default();
sketch.insert(v);
MetricValue::Distribution(sketch.to_owned())
}
}

#[derive(Clone, Debug)]
pub struct SortedTags {
// We sort tags. This is in feature parity with DogStatsD and also means
Expand Down Expand Up @@ -121,10 +137,22 @@ pub struct Metric {
/// the parser. We assume here that tags are not sent in random order by the
/// clien or that, if they are, the API will tidy that up. That is `a:1,b:2`
/// is a different tagset from `b:2,a:1`.
pub tags: SortedTags,
pub tags: Option<SortedTags>,

/// ID given a name and tagset.
pub(crate) id: u64,
pub id: u64,
}

impl Metric {
pub fn new(name: Ustr, value: MetricValue, tags: Option<SortedTags>) -> Metric {
let id = id(name, &tags);
Metric {
name,
value,
tags,
id,
}
}
}

/// Parse a metric from given input.
Expand All @@ -148,9 +176,9 @@ pub fn parse(input: &str) -> Result<Metric, ParseError> {

let tags;
if let Some(tags_section) = caps.name("tags") {
tags = SortedTags::parse(tags_section.as_str())?;
tags = Some(SortedTags::parse(tags_section.as_str())?);
} else {
tags = EMPTY_TAGS;
tags = None;
}
let val = first_value(caps.name("values").unwrap().as_str())?;
let metric_value = match caps.name("type").unwrap().as_str() {
Expand Down Expand Up @@ -202,13 +230,15 @@ fn first_value(values: &str) -> Result<f64, ParseError> {
/// from the point of view of this function.
#[inline]
#[must_use]
pub fn id(name: Ustr, tags: &SortedTags) -> u64 {
pub fn id(name: Ustr, tags: &Option<SortedTags>) -> u64 {
let mut hasher = FnvHasher::default();

name.hash(&mut hasher);
for kv in tags.values.iter() {
kv.0.as_bytes().hash(&mut hasher);
kv.1.as_bytes().hash(&mut hasher);
if let Some(tags_present) = tags {
for kv in tags_present.values.iter() {
kv.0.as_bytes().hash(&mut hasher);
kv.1.as_bytes().hash(&mut hasher);
}
}
hasher.finish()
}
Expand Down Expand Up @@ -274,7 +304,7 @@ mod tests {
assert_eq!(name, metric.name.as_str());

if let Some(tags) = tagset {
let parsed_metric_tags : SortedTags= metric.tags.clone();
let parsed_metric_tags : SortedTags = metric.tags.unwrap();
assert_eq!(tags.split(',').count(), parsed_metric_tags.values.len());
tags.split(',').for_each(|kv| {
let (original_key, original_value) = kv.split_once(':').unwrap();
Expand All @@ -288,7 +318,7 @@ mod tests {
assert!(found);
});
} else {
assert!(metric.tags.is_empty());
assert!(metric.tags.is_none());
}

match mtype.as_str() {
Expand Down Expand Up @@ -409,8 +439,8 @@ mod tests {
tagset2.pop();
}

let id1 = id(Ustr::from(&name), &SortedTags::parse(&tagset1).unwrap());
let id2 = id(Ustr::from(&name), &SortedTags::parse(&tagset2).unwrap());
let id1 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset1).unwrap()));
let id2 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset2).unwrap()));

assert_eq!(id1, id2);
}
Expand Down

0 comments on commit d01cf11

Please sign in to comment.