Skip to content

Commit

Permalink
Bring the metrics-observer protobufs up to date (metrics-rs#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
ceejbot authored Apr 5, 2023
1 parent 5e595b3 commit 6d65723
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 112 deletions.
2 changes: 1 addition & 1 deletion metrics-exporter-tcp/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fn main() {
println!("cargo:rerun-if-changed=proto/event.proto");
let mut prost_build = prost_build::Config::new();
prost_build.btree_map(&["."]);
prost_build.btree_map(["."]);
prost_build.compile_protos(&["proto/event.proto"], &["proto/"]).unwrap();
}
2 changes: 1 addition & 1 deletion metrics-observer/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fn main() {
println!("cargo:rerun-if-changed=proto/event.proto");
let mut prost_build = prost_build::Config::new();
prost_build.btree_map(&["."]);
prost_build.btree_map(["."]);
prost_build.compile_protos(&["proto/event.proto"], &["proto/"]).unwrap();
}
27 changes: 7 additions & 20 deletions metrics-observer/proto/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,16 @@ message Metric {
string name = 1;
google.protobuf.Timestamp timestamp = 2;
map<string, string> labels = 3;
oneof value {
Counter counter = 4;
Gauge gauge = 5;
Histogram histogram = 6;
oneof operation {
uint64 increment_counter = 4;
uint64 set_counter = 5;
double increment_gauge = 6;
double decrement_gauge = 7;
double set_gauge = 8;
double record_histogram = 9;
}
}

message Counter {
uint64 value = 1;
}

message Gauge {
oneof value {
double absolute = 1;
double increment = 2;
double decrement = 3;
}
}

message Histogram {
double value = 1;
}

message Event {
oneof event {
Metadata metadata = 1;
Expand Down
197 changes: 107 additions & 90 deletions metrics-observer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ mod proto {
include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
}

use self::proto::{
event::Event,
metadata::{Description as DescriptionMetadata, MetricType, Unit as UnitMetadata},
Event as EventWrapper,
};
use proto::{event::Event, metadata::MetricType, metric::Operation, Event as ProstMessage};

type MetadataKey = (MetricKind, String);
type MetadataValue = (Option<Unit>, Option<String>);

#[derive(Clone)]
pub enum ClientState {
Expand All @@ -38,7 +37,7 @@ pub enum MetricData {
pub struct Client {
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<BTreeMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
metadata: Arc<RwLock<HashMap<MetadataKey, MetadataValue>>>,
}

impl Client {
Expand Down Expand Up @@ -93,15 +92,15 @@ struct Runner {
addr: String,
client_state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<BTreeMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
metadata: Arc<RwLock<HashMap<MetadataKey, MetadataValue>>>,
}

impl Runner {
pub fn new(
addr: String,
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<BTreeMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
metadata: Arc<RwLock<HashMap<MetadataKey, MetadataValue>>>,
) -> Runner {
Runner { state: RunnerState::Disconnected, addr, client_state: state, metrics, metadata }
}
Expand Down Expand Up @@ -172,98 +171,116 @@ impl Runner {
Err(e) => eprintln!("read error: {:?}", e),
};

let event = match EventWrapper::decode_length_delimited(&mut buf) {
let message = match ProstMessage::decode_length_delimited(&mut buf) {
Err(e) => {
eprintln!("decode error: {:?}", e);
continue;
}
Ok(event) => event,
Ok(v) => v,
};

if let Some(event) = event.event {
match event {
Event::Metadata(metadata) => {
let metric_type = MetricType::from_i32(metadata.metric_type)
.expect("unknown metric type over wire");
let metric_kind = match metric_type {
MetricType::Counter => MetricKind::Counter,
MetricType::Gauge => MetricKind::Gauge,
MetricType::Histogram => MetricKind::Histogram,
};
let key = (metric_kind, metadata.name);
let mut mmap = self
.metadata
.write()
.expect("failed to get metadata write lock");
let entry = mmap.entry(key).or_insert((None, None));
let (uentry, dentry) = entry;
*uentry = metadata
.unit
.map(|u| match u {
UnitMetadata::UnitValue(us) => us,
})
.and_then(|s| Unit::from_string(s.as_str()));
*dentry = metadata.description.map(|d| match d {
DescriptionMetadata::DescriptionValue(ds) => ds,
});
}
Event::Metric(metric) => {
let mut labels_raw =
metric.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw
.into_iter()
.map(|(k, v)| Label::new(k, v))
.collect::<Vec<_>>();
let key_data: Key = (metric.name, labels).into();
let event = match message.event {
Some(e) => e,
None => continue,
};

match metric.value.expect("no metric value") {
proto::metric::Value::Counter(value) => {
let key =
CompositeKey::new(MetricKind::Counter, key_data);
let mut metrics = self.metrics.write().unwrap();
let counter = metrics
.entry(key)
.or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value.value;
}
match event {
Event::Metadata(metadata) => {
let metric_type = MetricType::from_i32(metadata.metric_type)
.expect("unknown metric type over wire");
let metric_kind = match metric_type {
MetricType::Counter => MetricKind::Counter,
MetricType::Gauge => MetricKind::Gauge,
MetricType::Histogram => MetricKind::Histogram,
};
let key = (metric_kind, metadata.name);
let mut mmap = self
.metadata
.write()
.expect("failed to get metadata write lock");
let entry = mmap.entry(key).or_insert((None, None));
let (uentry, dentry) = entry;
*uentry = metadata
.unit
.map(|u| match u {
proto::metadata::Unit::UnitValue(u) => u,
})
.and_then(|s| Unit::from_string(s.as_str()));
*dentry = metadata.description.map(|d| match d {
proto::metadata::Description::DescriptionValue(ds) => ds,
});
}
Event::Metric(metric) => {
let mut labels_raw = metric.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw
.into_iter()
.map(|(k, v)| Label::new(k, v))
.collect::<Vec<_>>();
let key_data: Key = (metric.name, labels).into();

match metric.operation.expect("no metric operation") {
Operation::IncrementCounter(value) => {
let key = CompositeKey::new(MetricKind::Counter, key_data);
let mut metrics = self.metrics.write().unwrap();
let counter = metrics
.entry(key)
.or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value;
}
proto::metric::Value::Gauge(value) => {
let key =
CompositeKey::new(MetricKind::Gauge, key_data);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
match value.value {
Some(proto::gauge::Value::Absolute(val)) => {
*inner = val
}
Some(proto::gauge::Value::Increment(val)) => {
*inner += val
}
Some(proto::gauge::Value::Decrement(val)) => {
*inner -= val
}
None => {}
}
}
}
Operation::SetCounter(value) => {
let key = CompositeKey::new(MetricKind::Counter, key_data);
let mut metrics = self.metrics.write().unwrap();
let counter = metrics
.entry(key)
.or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner = value;
}
proto::metric::Value::Histogram(value) => {
let key =
CompositeKey::new(MetricKind::Histogram, key_data);
let mut metrics = self.metrics.write().unwrap();
let histogram =
metrics.entry(key).or_insert_with(|| {
let summary = Summary::with_defaults();
MetricData::Histogram(summary)
});
}
Operation::IncrementGauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner += value;
}
}
Operation::DecrementGauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner -= value;
}
}
Operation::SetGauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner = value;
}
}
Operation::RecordHistogram(value) => {
let key =
CompositeKey::new(MetricKind::Histogram, key_data);
let mut metrics = self.metrics.write().unwrap();
let histogram = metrics.entry(key).or_insert_with(|| {
let summary = Summary::with_defaults();
MetricData::Histogram(summary)
});

if let MetricData::Histogram(inner) = histogram {
inner.add(value.value);
}
if let MetricData::Histogram(inner) = histogram {
inner.add(value);
}
}
}
Expand Down

0 comments on commit 6d65723

Please sign in to comment.