diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index ad3b5ee3b..09937383f 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -107,7 +107,7 @@ use std::{ time::{Duration as StdDuration, Instant}, }; use tokio::{sync::mpsc, try_join}; -use tracing::{debug, info, trace_span, warn, Level}; +use tracing::{debug, info, info_span, trace_span, warn, Level, Span}; use url::Url; #[cfg(test)] @@ -1931,6 +1931,7 @@ impl VdafOps { let (sender, mut receiver) = mpsc::unbounded_channel(); let producer_task = tokio::task::spawn_blocking({ + let parent_span = Span::current(); let global_hpke_keypairs = global_hpke_keypairs.view(); let vdaf = Arc::clone(&vdaf); let task = Arc::clone(&task); @@ -1941,6 +1942,9 @@ impl VdafOps { let agg_param = Arc::clone(&agg_param); move || { + let span = info_span!(parent: parent_span, "handle_aggregate_init_generic threadpool task"); + let _entered = span.enter(); + req .prepare_inits() .par_iter() @@ -2247,7 +2251,7 @@ impl VdafOps { }); let mut report_share_data = Vec::with_capacity(req.prepare_inits().len()); - while receiver.recv_many(&mut report_share_data, usize::MAX).await > 0 {} + while receiver.recv_many(&mut report_share_data, 10).await > 0 {} // Await the producer task to resume any panics that may have occurred, and to ensure we can // unwrap the aggregation parameter's Arc in a few lines. The only other errors that can diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 02c20c88b..201444009 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -27,7 +27,7 @@ use prio::{ use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _}; use std::{panic, sync::Arc}; use tokio::sync::mpsc; -use tracing::trace_span; +use tracing::{info_span, trace_span, Span}; impl VdafOps { /// Step the helper's aggregation job to the next step using the step `n` ping pong state in @@ -142,13 +142,17 @@ impl VdafOps { // Compute the next aggregation step. let (sender, mut receiver) = mpsc::unbounded_channel(); let aggregation_job = Arc::new(aggregation_job); - let producer_fut = tokio::task::spawn_blocking({ + let producer_task = tokio::task::spawn_blocking({ + let parent_span = Span::current(); let metrics = metrics.clone(); let task = Arc::clone(&task); let vdaf = Arc::clone(&vdaf); let aggregation_job = Arc::clone(&aggregation_job); move || { + let span = info_span!(parent: parent_span, "step_aggregation_job threadpool task"); + let _entered = span.enter(); + prep_steps_and_ras.into_par_iter().try_for_each_with( sender, |sender, (prep_step, report_aggregation, prep_state)| { @@ -240,7 +244,7 @@ impl VdafOps { }); while receiver - .recv_many(&mut report_aggregations_to_write, usize::MAX) + .recv_many(&mut report_aggregations_to_write, 10) .await > 0 {} @@ -250,7 +254,7 @@ impl VdafOps { // are: a `JoinError` indicating cancellation, which is impossible because we do not cancel // the task; and a `SendError`, which can only happen if this future is cancelled (in which // case we will not run this code at all). - let _ = producer_fut.await.map_err(|join_error| { + let _ = producer_task.await.map_err(|join_error| { if let Ok(reason) = join_error.try_into_panic() { panic::resume_unwind(reason); }