Skip to content

Commit

Permalink
Replace stream attribute filter with allowed keys (#1161)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtescher authored Jul 21, 2023
1 parent b28e41a commit 25cce7a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 33 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ fn counters(c: &mut Criterion) {
Some(
new_view(
Instrument::new().name("*"),
Stream::new().attribute_filter(|kv| kv.key == Key::new("K")),
Stream::new().allowed_attribute_keys([Key::new("K")]),
)
.unwrap(),
),
Expand Down
41 changes: 16 additions & 25 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{any::Any, borrow::Cow, fmt, hash::Hash, marker, sync::Arc};
use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync::Arc};

use opentelemetry_api::{
metrics::{
AsyncInstrument, MetricsError, Result, SyncCounter, SyncHistogram, SyncUpDownCounter, Unit,
},
KeyValue,
Key, KeyValue,
};

use crate::{
Expand Down Expand Up @@ -158,7 +158,7 @@ impl Instrument {
/// let view = new_view(criteria, mask);
/// # drop(view);
/// ```
#[derive(Default)]
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct Stream {
/// The human-readable identifier of the stream.
Expand All @@ -169,12 +169,14 @@ pub struct Stream {
pub unit: Unit,
/// Aggregation the stream uses for an instrument.
pub aggregation: Option<Aggregation>,
/// applied to all attributes recorded for an instrument.
pub attribute_filter: Option<Filter>,
/// An allow-list of attribute keys that will be preserved for the stream.
///
/// Any attribute recorded for the stream with a key not in this set will be
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
/// attributes will be kept.
pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
}

type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;

impl Stream {
/// Create a new stream with empty values.
pub fn new() -> Self {
Expand Down Expand Up @@ -205,25 +207,14 @@ impl Stream {
self
}

/// Set the stream attribute filter.
pub fn attribute_filter(
mut self,
filter: impl Fn(&KeyValue) -> bool + Send + Sync + 'static,
) -> Self {
self.attribute_filter = Some(Arc::new(filter));
self
}
}
/// Set the stream allowed attribute keys.
///
/// Any attribute recorded for the stream with a key not in this set will be
/// dropped. If this set is empty all attributes will be dropped.
pub fn allowed_attribute_keys(mut self, attribute_keys: impl IntoIterator<Item = Key>) -> Self {
self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));

impl fmt::Debug for Stream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Stream")
.field("name", &self.name)
.field("description", &self.description)
.field("unit", &self.unit)
.field("aggregation", &self.aggregation)
.field("attribute_filter", &self.attribute_filter.is_some())
.finish()
self
}
}

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ where
description: inst.description,
unit: inst.unit,
aggregation: None,
attribute_filter: None,
allowed_attribute_keys: None,
};

match self.cached_aggregator(&inst.scope, kind, stream) {
Expand Down Expand Up @@ -369,8 +369,8 @@ where
other => return other, // Drop aggregator or error
};

if let Some(filter) = &stream.attribute_filter {
agg = internal::new_filter(agg, Arc::clone(filter));
if let Some(allowed) = stream.allowed_attribute_keys.as_ref().map(Arc::clone) {
agg = internal::new_filter(agg, Arc::new(move |kv| allowed.contains(&kv.key)));
}

self.pipeline.add_sync(
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ impl View for Box<dyn View> {
/// The [Stream] mask only applies updates for non-empty fields. By default, the
/// [Instrument] the [View] matches against will be use for the name,
/// description, and unit of the returned [Stream] and no `aggregation` or
/// `attribute_filter` are set. All non-empty fields of mask are used instead of
/// the default. If you need to set a an empty value in the returned stream,
/// create a custom [View] directly.
/// `allowed_attribute_keys` are set. All non-empty fields of mask are used
/// instead of the default. If you need to set a an empty value in the returned
/// stream, create a custom [View] directly.
///
/// # Example
///
Expand Down Expand Up @@ -167,7 +167,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> Result<Box<dyn View>> {
i.unit.clone()
},
aggregation: agg.clone(),
attribute_filter: mask.attribute_filter.clone(),
allowed_attribute_keys: mask.allowed_attribute_keys.clone(),
})
} else {
None
Expand Down

0 comments on commit 25cce7a

Please sign in to comment.