Skip to content

Commit

Permalink
Fix leak of channel sender (#3406)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored Sep 12, 2024
1 parent 2687103 commit dcef7eb
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions aggregator/src/aggregator/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
collections::BTreeMap,
mem::forget,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand Down Expand Up @@ -182,21 +181,41 @@ impl LIFORequestQueue {
.map_err(|_| Error::Internal("dispatcher task died".to_string()))?;

/// Sends a cancellation message over the given channel when the guard is dropped, unless
/// it's forgotten with [`forget`].
struct CancelDropGuard(u64, mpsc::UnboundedSender<DispatcherMessage>);
/// [`Self::disarm`] is called.
struct CancelDropGuard {
id: u64,
sender: mpsc::UnboundedSender<DispatcherMessage>,
armed: bool,
}

impl CancelDropGuard {
fn new(id: u64, sender: mpsc::UnboundedSender<DispatcherMessage>) -> Self {
Self {
id,
sender,
armed: true,
}
}

fn disarm(&mut self) {
self.armed = false;
}
}

impl Drop for CancelDropGuard {
fn drop(&mut self) {
let _ = self
.1
.send(DispatcherMessage::Cancel(self.0))
.map_err(|err| warn!("failed to send cancellation message: {:?}", err));
if self.armed {
let _ = self
.sender
.send(DispatcherMessage::Cancel(self.id))
.map_err(|err| warn!("failed to send cancellation message: {:?}", err));
}
}
}

let drop_guard = CancelDropGuard(id, self.dispatcher_tx.clone());
let mut drop_guard = CancelDropGuard::new(id, self.dispatcher_tx.clone());
let permit = permit_rx.await;
forget(drop_guard);
drop_guard.disarm();

// If the rx channel is prematurely dropped, we'll reach this error, indicating that
// something has gone wrong with the dispatcher task or it has shutdown. If the drop guard
Expand Down

0 comments on commit dcef7eb

Please sign in to comment.