Skip to content

Commit

Permalink
Decommission node gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Nov 10, 2023
1 parent 9e95ec9 commit 0e1a943
Show file tree
Hide file tree
Showing 19 changed files with 1,247 additions and 579 deletions.
23 changes: 21 additions & 2 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::fmt;
use std::pin::Pin;

use futures::{stream, Stream, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream};
use tracing::warn;

pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Unpin + 'static>>;
Expand Down Expand Up @@ -57,6 +57,15 @@ where T: Send + 'static
}
}

impl<T> ServiceStream<T>
where T: Clone + Send + Sync + 'static
{
pub fn new_watch(init: T) -> (watch::Sender<T>, Self) {
let (sender, receiver) = watch::channel(init);
(sender, receiver.into())
}
}

impl<T, E> ServiceStream<Result<T, E>>
where
T: Send + 'static,
Expand Down Expand Up @@ -104,6 +113,16 @@ where T: Send + 'static
}
}

impl<T> From<watch::Receiver<T>> for ServiceStream<T>
where T: Clone + Send + Sync + 'static
{
fn from(receiver: watch::Receiver<T>) -> Self {
Self {
inner: Box::pin(WatchStream::new(receiver)),
}
}
}

/// Adapts a server-side tonic::Streaming into a ServiceStream of `Result<T, tonic::Status>`. Once
/// an error is encountered, the stream will be closed and subsequent calls to `poll_next` will
/// return `None`.
Expand Down
10 changes: 3 additions & 7 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use quickwit_proto::control_plane::{
GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess,
};
use quickwit_proto::ingest::ingester::{IngesterService, PingRequest};
use quickwit_proto::ingest::{ClosedShards, IngestV2Error, ShardState};
use quickwit_proto::ingest::{IngestV2Error, ShardIds, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId};
Expand Down Expand Up @@ -172,11 +172,7 @@ impl IngestController {
None
}

fn handle_closed_shards(
&self,
closed_shards: Vec<ClosedShards>,
model: &mut ControlPlaneModel,
) {
fn handle_closed_shards(&self, closed_shards: Vec<ShardIds>, model: &mut ControlPlaneModel) {
for closed_shard in closed_shards {
let index_uid: IndexUid = closed_shard.index_uid.into();
let source_id = closed_shard.source_id;
Expand Down Expand Up @@ -764,7 +760,7 @@ mod tests {

let request = GetOrCreateOpenShardsRequest {
subrequests: Vec::new(),
closed_shards: vec![ClosedShards {
closed_shards: vec![ShardIds {
index_uid: index_uid.clone().into(),
source_id: source_id.clone(),
shard_ids: vec![1, 2],
Expand Down
16 changes: 15 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,9 @@ mod tests {

use bytes::Bytes;
use mrecordlog::MultiRecordLog;
use quickwit_proto::ingest::ingester::IngesterServiceClient;
use quickwit_proto::ingest::ingester::{
IngesterServiceClient, IngesterStatus, ObservationMessage,
};
use quickwit_proto::types::queue_id;
use tokio::time::timeout;

Expand Down Expand Up @@ -625,12 +627,15 @@ mod tests {
to_position_inclusive: None,
};
let (new_records_tx, new_records_rx) = watch::channel(());
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (mut fetch_stream, fetch_task_handle) = FetchTask::spawn(
open_fetch_stream_request,
Expand Down Expand Up @@ -795,12 +800,15 @@ mod tests {
to_position_inclusive: None,
};
let (_new_records_tx, new_records_rx) = watch::channel(());
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (mut fetch_stream, fetch_task_handle) = FetchTask::spawn(
open_fetch_stream_request,
Expand Down Expand Up @@ -835,12 +843,15 @@ mod tests {
from_position_exclusive: None,
to_position_inclusive: Some(Position::from(0u64)),
};
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, fetch_task_handle) = FetchTask::spawn(
Expand Down Expand Up @@ -902,12 +913,15 @@ mod tests {
from_position_exclusive: None,
to_position_inclusive: Some(Position::from(2u64)),
};
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, _fetch_task_handle) =
Expand Down
Loading

0 comments on commit 0e1a943

Please sign in to comment.