Skip to content

Commit

Permalink
Tracing spans.
Browse files Browse the repository at this point in the history
Also,
 * Receive only 10 (arbitrarily chosen) messages per call to
   `recv_many`. This will give more await points to cancel on during
   VDAF computations.
 * Rename a variable.
  • Loading branch information
branlwyd committed May 15, 2024
1 parent 52f4d84 commit d756a2f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
8 changes: 6 additions & 2 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ use std::{
time::{Duration as StdDuration, Instant},
};
use tokio::{sync::mpsc, try_join};
use tracing::{debug, info, trace_span, warn, Level};
use tracing::{debug, info, info_span, trace_span, warn, Level, Span};
use url::Url;

#[cfg(test)]
Expand Down Expand Up @@ -1931,6 +1931,7 @@ impl VdafOps {

let (sender, mut receiver) = mpsc::unbounded_channel();
let producer_task = tokio::task::spawn_blocking({
let parent_span = Span::current();
let global_hpke_keypairs = global_hpke_keypairs.view();
let vdaf = Arc::clone(&vdaf);
let task = Arc::clone(&task);
Expand All @@ -1941,6 +1942,9 @@ impl VdafOps {
let agg_param = Arc::clone(&agg_param);

move || {
let span = info_span!(parent: parent_span, "handle_aggregate_init_generic threadpool task");
let _entered = span.enter();

req
.prepare_inits()
.par_iter()
Expand Down Expand Up @@ -2247,7 +2251,7 @@ impl VdafOps {
});

let mut report_share_data = Vec::with_capacity(req.prepare_inits().len());
while receiver.recv_many(&mut report_share_data, usize::MAX).await > 0 {}
while receiver.recv_many(&mut report_share_data, 10).await > 0 {}

// Await the producer task to resume any panics that may have occurred, and to ensure we can
// unwrap the aggregation parameter's Arc in a few lines. The only other errors that can
Expand Down
12 changes: 8 additions & 4 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use prio::{
use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _};
use std::{panic, sync::Arc};
use tokio::sync::mpsc;
use tracing::trace_span;
use tracing::{info_span, trace_span, Span};

impl VdafOps {
/// Step the helper's aggregation job to the next step using the step `n` ping pong state in
Expand Down Expand Up @@ -142,13 +142,17 @@ impl VdafOps {
// Compute the next aggregation step.
let (sender, mut receiver) = mpsc::unbounded_channel();
let aggregation_job = Arc::new(aggregation_job);
let producer_fut = tokio::task::spawn_blocking({
let producer_task = tokio::task::spawn_blocking({
let parent_span = Span::current();
let metrics = metrics.clone();
let task = Arc::clone(&task);
let vdaf = Arc::clone(&vdaf);
let aggregation_job = Arc::clone(&aggregation_job);

move || {
let span = info_span!(parent: parent_span, "step_aggregation_job threadpool task");
let _entered = span.enter();

prep_steps_and_ras.into_par_iter().try_for_each_with(
sender,
|sender, (prep_step, report_aggregation, prep_state)| {
Expand Down Expand Up @@ -240,7 +244,7 @@ impl VdafOps {
});

while receiver
.recv_many(&mut report_aggregations_to_write, usize::MAX)
.recv_many(&mut report_aggregations_to_write, 10)
.await
> 0
{}
Expand All @@ -250,7 +254,7 @@ impl VdafOps {
// are: a `JoinError` indicating cancellation, which is impossible because we do not cancel
// the task; and a `SendError`, which can only happen if this future is cancelled (in which
// case we will not run this code at all).
let _ = producer_fut.await.map_err(|join_error| {
let _ = producer_task.await.map_err(|join_error| {
if let Ok(reason) = join_error.try_into_panic() {
panic::resume_unwind(reason);
}
Expand Down

0 comments on commit d756a2f

Please sign in to comment.