diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index c74e71bcd5..13e9d35cbc 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -57,3 +57,103 @@ pub use meter_provider::*; pub use periodic_reader::*; pub use pipeline::Pipeline; pub use view::*; + +#[cfg(all(test, feature = "testing"))] +mod tests { + use super::*; + use crate::{runtime, testing::metrics::InMemoryMetricsExporter}; + use opentelemetry::{ + metrics::{MeterProvider as _, Unit}, + KeyValue, + }; + + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation() { + // Run this test with stdout enabled to see output. + // cargo test counter --features=metrics,testing -- --nocapture + + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .init(); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + + counter.add(1, &[KeyValue::new("key1", "value2")]); + counter.add(1, &[KeyValue::new("key1", "value2")]); + counter.add(1, &[KeyValue::new("key1", "value2")]); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 2 time-series. + assert_eq!(sum.data_points.len(), 2); + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!( + sum.temporality, + data::Temporality::Cumulative, + "Should produce cumulative by default." + ); + + // find and validate key1=value1 datapoint + let mut data_point1 = None; + for datapoint in &sum.data_points { + if datapoint + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1") + { + data_point1 = Some(datapoint); + } + } + assert_eq!( + data_point1 + .expect("datapoint with key1=value1 expected") + .value, + 5 + ); + + // find and validate key1=value2 datapoint + let mut data_point1 = None; + for datapoint in &sum.data_points { + if datapoint + .attributes + .iter() + .any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2") + { + data_point1 = Some(datapoint); + } + } + assert_eq!( + data_point1 + .expect("datapoint with key1=value2 expected") + .value, + 3 + ); + } +}