Skip to content

Commit

Permalink
Merge branch 'main' into fix-dogstatsd-aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgallotta authored Oct 3, 2024
2 parents ae78feb + 6e7d1e6 commit fb23ded
Show file tree
Hide file tree
Showing 23 changed files with 561 additions and 211 deletions.
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ resolver = "2"
[workspace.package]
rust-version = "1.76.0"
edition = "2021"
version = "13.0.0"
version = "13.1.0"
license = "Apache-2.0"

[profile.dev]
Expand Down
3 changes: 1 addition & 2 deletions data-pipeline/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let concentrator = SpanConcentrator::new(
Duration::from_secs(10),
now,
true,
true,
vec![],
vec!["db_name".to_string(), "bucket_s3".to_string()],
);
let mut spans = vec![];
Expand Down
1 change: 1 addition & 0 deletions data-pipeline/src/agent_info/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct AgentInfoStruct {
pub config: Option<Config>,
/// List of keys mapped to peer tags
pub peer_tags: Option<Vec<String>>,
/// List of span kinds eligible for stats computation
pub span_kinds_stats_computed: Option<Vec<String>>,
}

Expand Down
16 changes: 6 additions & 10 deletions data-pipeline/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,15 @@ pub(super) struct AggregationKey {
impl AggregationKey {
/// Return an AggregationKey matching the given span.
///
/// If `peer_tags_aggregation` is true then the peer tags of the span will be included in the
/// key. The peer tags are computed based on the list of `peer_tag_keys`.
pub(super) fn from_span(
span: &pb::Span,
peer_tags_aggregation: bool,
peer_tag_keys: &[String],
) -> Self {
/// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the
/// key.
pub(super) fn from_span(span: &pb::Span, peer_tag_keys: &[String]) -> Self {
let span_kind = span
.meta
.get(TAG_SPANKIND)
.map(|s| s.to_string())
.unwrap_or_default();
let peer_tags = if peer_tags_aggregation && client_or_producer(&span_kind) {
let peer_tags = if client_or_producer(&span_kind) {
get_peer_tags(span, peer_tag_keys)
} else {
vec![]
Expand Down Expand Up @@ -504,12 +500,12 @@ mod tests {
];

for (span, expected_key) in test_cases {
assert_eq!(AggregationKey::from_span(&span, false, &[]), expected_key);
assert_eq!(AggregationKey::from_span(&span, &[]), expected_key);
}

for (span, expected_key) in test_cases_with_peer_tags {
assert_eq!(
AggregationKey::from_span(&span, true, &test_peer_tags),
AggregationKey::from_span(&span, &test_peer_tags),
expected_key
);
}
Expand Down
55 changes: 31 additions & 24 deletions data-pipeline/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,19 @@ fn align_timestamp(t: u64, bucket_size: u64) -> u64 {
}

/// Return true if the span has a span.kind that is eligible for stats computation
fn compute_stats_for_span_kind(span: &pb::Span) -> bool {
span.meta.get("span.kind").is_some_and(|span_kind| {
matches!(
span_kind.to_lowercase().as_str(),
"server" | "consumer" | "client" | "producer"
)
})
fn compute_stats_for_span_kind(span: &pb::Span, span_kinds_stats_computed: &[String]) -> bool {
!span_kinds_stats_computed.is_empty()
&& span
.meta
.get("span.kind")
.is_some_and(|span_kind| span_kinds_stats_computed.contains(&span_kind.to_lowercase()))
}

/// Return true if the span should be ignored for stats computation
fn should_ignore_span(span: &pb::Span, compute_stats_by_span_kind: bool) -> bool {
fn should_ignore_span(span: &pb::Span, span_kinds_stats_computed: &[String]) -> bool {
!(trace_utils::has_top_level(span)
|| trace_utils::is_measured(span)
|| (compute_stats_by_span_kind && compute_stats_for_span_kind(span)))
|| compute_stats_for_span_kind(span, span_kinds_stats_computed))
|| trace_utils::is_partial_snapshot(span)
}

Expand Down Expand Up @@ -69,10 +68,8 @@ pub struct SpanConcentrator {
oldest_timestamp: u64,
/// bufferLen is the number stats bucket we keep when flushing.
buffer_len: usize,
/// flag to enable aggregation of peer tags
peer_tags_aggregation: bool,
/// flag to enable computation of stats through checking the span.kind field
compute_stats_by_span_kind: bool,
/// span.kind fields eligible for stats computation
span_kinds_stats_computed: Vec<String>,
/// keys for supplementary tags that describe peer.service entities
peer_tag_keys: Vec<String>,
}
Expand All @@ -81,15 +78,12 @@ impl SpanConcentrator {
/// Return a new concentrator with the given parameters
/// - `bucket_size` is the size of the time buckets
/// - `now` the current system time, used to define the oldest bucket
/// - `peer_tags_aggregation` enables aggregation based on peer_tags
/// - `compute_stats_by_span_kind` use span_kind to determine span eligibility to stats
/// computation
/// - `peer_tags_keys` the list of keys considered as peer tags for aggregation
/// - `span_kinds_stats_computed` list of span kinds eligible for stats computation
/// - `peer_tags_keys` list of keys considered as peer tags for aggregation
pub fn new(
bucket_size: Duration,
now: SystemTime,
peer_tags_aggregation: bool,
compute_stats_by_span_kind: bool,
span_kinds_stats_computed: Vec<String>,
peer_tag_keys: Vec<String>,
) -> SpanConcentrator {
SpanConcentrator {
Expand All @@ -100,17 +94,31 @@ impl SpanConcentrator {
bucket_size.as_nanos() as u64,
),
buffer_len: 2,
peer_tags_aggregation,
compute_stats_by_span_kind,
span_kinds_stats_computed,
peer_tag_keys,
}
}

/// Set the list of span kinds eligible for stats computation
pub fn set_span_kinds(&mut self, span_kinds: Vec<String>) {
self.span_kinds_stats_computed = span_kinds;
}

/// Set the list of keys considered as peer_tags for aggregation
pub fn set_peer_tags(&mut self, peer_tags: Vec<String>) {
self.peer_tag_keys = peer_tags;
}

/// Return the bucket size used for aggregation
pub fn get_bucket_size(&self) -> Duration {
Duration::from_nanos(self.bucket_size)
}

/// Add a span into the concentrator, by computing stats if the span is elligible for stats
/// computation.
pub fn add_span(&mut self, span: &pb::Span) {
// If the span is elligible for stats computation
if !should_ignore_span(span, self.compute_stats_by_span_kind) {
if !should_ignore_span(span, self.span_kinds_stats_computed.as_slice()) {
let mut bucket_timestamp =
align_timestamp((span.start + span.duration) as u64, self.bucket_size);
// If the span is to old we aggregate it in the latest bucket instead of
Expand All @@ -119,8 +127,7 @@ impl SpanConcentrator {
bucket_timestamp = self.oldest_timestamp;
}

let agg_key =
AggregationKey::from_span(span, self.peer_tags_aggregation, &self.peer_tag_keys);
let agg_key = AggregationKey::from_span(span, &self.peer_tag_keys);

self.buckets
.entry(bucket_timestamp)
Expand Down
Loading

0 comments on commit fb23ded

Please sign in to comment.