Skip to content

Commit

Permalink
Let MultiplexedInvokerStatusReader share state
Browse files Browse the repository at this point in the history
The MultiplexedInvokerStatusReader now shares the ChannelStatusReader via
an Arc<Mutex<>>.

This fixes #1961.
  • Loading branch information
tillrohrmann committed Sep 16, 2024
1 parent 1ad1a0d commit 942a045
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -242,14 +243,16 @@ impl PartitionProcessorHandle {
}
}

type ChannelStatusReaderList = Vec<(RangeInclusive<PartitionKey>, ChannelStatusReader)>;

#[derive(Debug, Clone, Default)]
pub struct MultiplexedInvokerStatusReader {
readers: Vec<(RangeInclusive<PartitionKey>, ChannelStatusReader)>,
readers: Arc<Mutex<ChannelStatusReaderList>>,
}

impl MultiplexedInvokerStatusReader {
fn push(&mut self, key_range: RangeInclusive<PartitionKey>, reader: ChannelStatusReader) {
self.readers.push((key_range, reader));
self.readers.lock().unwrap().push((key_range, reader));
}
}

Expand All @@ -258,15 +261,24 @@ impl StatusHandle for MultiplexedInvokerStatusReader {
std::iter::Flatten<std::vec::IntoIter<<ChannelStatusReader as StatusHandle>::Iterator>>;

async fn read_status(&self, keys: RangeInclusive<PartitionKey>) -> Self::Iterator {
let mut iterators = vec![];
let mut overlapping_partitions = Vec::new();

for (range, reader) in self.readers.iter() {
// first clone the readers while holding the lock, then release the lock before reading the
// status to avoid holding the lock across await points
for (range, reader) in self.readers.lock().unwrap().iter() {
if keys.start() <= range.end() && keys.end() >= range.start() {
// if this partition is actually overlapping with the search range
iterators.push(reader.read_status(keys.clone()).await)
overlapping_partitions.push(reader.clone())
}
}
iterators.into_iter().flatten()

let mut result = Vec::with_capacity(overlapping_partitions.len());

for reader in overlapping_partitions {
result.push(reader.read_status(keys.clone()).await);
}

result.into_iter().flatten()
}
}

Expand Down

0 comments on commit 942a045

Please sign in to comment.