From 094f91f8a4ca63ded42ff3575d50e40493ff7885 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 30 Nov 2023 18:06:09 +0900 Subject: [PATCH] Bugfix: IndexPipeline remembers list of shards and applies it after respawn Bugfix Assign shard logic. If no shard is removed, we just kill the pipeline. If shards are added we reinitiate the shards. The pipeline supervisor keeps track of the shard, and reassigns them if the pipeline is respawned. Closes #4184 Closes #4174 --- quickwit/quickwit-cluster/src/cluster.rs | 2 +- .../src/actors/indexing_pipeline.rs | 27 +- .../src/actors/indexing_service.rs | 121 +++--- .../quickwit-indexing/src/actors/publisher.rs | 7 +- .../src/models/indexing_statistics.rs | 5 + .../src/source/ingest/mod.rs | 359 +++++++++++++++--- quickwit/quickwit-indexing/src/source/mod.rs | 5 +- 7 files changed, 412 insertions(+), 114 deletions(-) diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 460575fddc4..3131e70d250 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -447,7 +447,7 @@ pub fn parse_indexing_tasks(node_state: &NodeState) -> Vec { /// /// If previous indexing tasks were present in the node state but were not in the given tasks, they /// are marked for deletion. -pub fn set_indexing_tasks_in_node_state( +pub(crate) fn set_indexing_tasks_in_node_state( indexing_tasks: &[IndexingTask], node_state: &mut NodeState, ) { diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index f18be528af5..b23727e09a6 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::BTreeSet; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -37,6 +38,7 @@ use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::{ IndexMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient, }; +use quickwit_proto::types::ShardId; use quickwit_storage::{Storage, StorageResolver}; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; @@ -50,7 +52,9 @@ use crate::actors::uploader::UploaderType; use crate::actors::{Indexer, Packager, Publisher, Uploader}; use crate::merge_policy::MergePolicy; use crate::models::IndexingStatistics; -use crate::source::{quickwit_supported_sources, AssignShards, SourceActor, SourceRuntimeArgs}; +use crate::source::{ + quickwit_supported_sources, AssignShards, Assignment, SourceActor, SourceRuntimeArgs, +}; use crate::split_store::IndexingSplitStore; use crate::SplitsUpdateMailbox; @@ -119,6 +123,10 @@ pub struct IndexingPipeline { handles_opt: Option, // Killswitch used for the actors in the pipeline. This is not the supervisor killswitch. kill_switch: KillSwitch, + // The set of shard is something that can change dynamically without necessarily + // requiring a respawn of the pipeline. + // We keep the list of shards here however, to reassign them after a respawn. + shard_ids: BTreeSet, } #[async_trait] @@ -153,12 +161,13 @@ impl Actor for IndexingPipeline { impl IndexingPipeline { pub fn new(params: IndexingPipelineParams) -> Self { - Self { + IndexingPipeline { params, previous_generations_statistics: Default::default(), handles_opt: None, kill_switch: KillSwitch::default(), statistics: IndexingStatistics::default(), + shard_ids: Default::default(), } } @@ -258,6 +267,7 @@ impl IndexingPipeline { .set_num_spawn_attempts(self.statistics.num_spawn_attempts); let pipeline_metrics_opt = handles.indexer.last_observation().pipeline_metrics_opt; self.statistics.pipeline_metrics_opt = pipeline_metrics_opt; + self.statistics.shard_ids = self.shard_ids.clone(); ctx.observe(self); } @@ -453,6 +463,10 @@ impl IndexingPipeline { .set_mailboxes(source_mailbox, source_inbox) .set_kill_switch(self.kill_switch.clone()) .spawn(actor_source); + let assign_shard_message = AssignShards(Assignment { + shard_ids: self.shard_ids.clone(), + }); + source_mailbox.send_message(assign_shard_message).await?; // Increment generation once we are sure there will be no spawning error. self.previous_generations_statistics = self.statistics.clone(); @@ -543,18 +557,23 @@ impl Handler for IndexingPipeline { async fn handle( &mut self, assign_shards_message: AssignShards, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + self.shard_ids = assign_shards_message.0.shard_ids.clone(); + // If the pipeline is running, we forward the message to its source. + // If it is not, then it will soon be respawn, and the shard will be assign after that. if let Some(handles) = &mut self.handles_opt { info!( shard_ids=?assign_shards_message.0.shard_ids, - "assigning shards to indexing pipeline." + "assigning shards to indexing pipeline" ); handles .source_mailbox .send_message(assign_shards_message) .await?; } + // We perform observe to make sure the set of shard ids is up to date. + self.perform_observe(ctx); Ok(()) } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 57970df36a6..0a0a54748ee 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -466,38 +466,34 @@ impl IndexingService { Ok(merge_planner_mailbox) } - /// Applies the indexing plan by: - /// - Stopping the running pipelines not present in the provided plan. - /// - Starting the pipelines that are not running. - /// Note: the indexing is a list of `IndexingTask` and has no ordinal - /// like a pipeline. We assign an ordinal for each `IndexingTask` from - /// [0, n) with n the number of indexing tasks given a (index_id, source_id). - async fn apply_indexing_plan( - &mut self, - ctx: &ActorContext, - physical_indexing_plan_request: ApplyIndexingPlanRequest, - ) -> Result { - let pipelines_uid_in_plan: FnvHashSet = physical_indexing_plan_request - .indexing_tasks + async fn find_and_shutdown_decommissioned_pipelines(&mut self, tasks: &[IndexingTask]) { + let pipeline_uids_in_plan: FnvHashSet = tasks .iter() .map(|indexing_task| indexing_task.pipeline_uid()) .collect(); - let pipeline_to_add: FnvHashSet<&IndexingTask> = physical_indexing_plan_request - .indexing_tasks - .iter() - .filter(|indexing_task| { - let pipeline_uid = indexing_task.pipeline_uid(); - !self.indexing_pipelines.contains_key(&pipeline_uid) - }) - .collect::>(); - let pipeline_uid_to_remove: Vec = self + + let pipeline_uids_to_remove: Vec = self .indexing_pipelines .keys() .cloned() - .filter(|pipeline_uid| !pipelines_uid_in_plan.contains(pipeline_uid)) + .filter(|pipeline_uid| !pipeline_uids_in_plan.contains(pipeline_uid)) .collect::>(); - let indexing_pipeline_ids_to_add: Vec = pipeline_to_add + + // Shut down currently running pipelines that are missing in the new plan. + self.shutdown_pipelines(&pipeline_uids_to_remove).await; + } + + async fn find_and_spawn_new_pipelines( + &mut self, + tasks: &[IndexingTask], + ctx: &ActorContext, + ) -> Result, IndexingError> { + let pipeline_ids_to_add: Vec = tasks .iter() + .filter(|indexing_task| { + let pipeline_uid = indexing_task.pipeline_uid(); + !self.indexing_pipelines.contains_key(&pipeline_uid) + }) .flat_map(|indexing_task| { let pipeline_uid = indexing_task.pipeline_uid(); let index_uid = IndexUid::parse(indexing_task.index_uid.clone()).ok()?; @@ -509,23 +505,25 @@ impl IndexingService { }) }) .collect(); + self.spawn_pipelines(&pipeline_ids_to_add[..], ctx).await + } - // Spawn new pipeline in the new plan that are not currently running - let failed_spawning_pipeline_ids = self - .spawn_pipelines(ctx, &indexing_pipeline_ids_to_add[..]) - .await?; - - // TODO: Temporary hack to assign shards to pipelines. - for indexing_task in &physical_indexing_plan_request.indexing_tasks { - if indexing_task.shard_ids.is_empty() { + /// For all Ingest V2 pipelines, assigns the set of shards they should be working on. + /// This is done regardless of whether there has been a change in their shard list + /// or not. + /// + /// If a pipeline actor has failed, this function just logs an error. + async fn assign_shards_to_pipelines(&mut self, tasks: &[IndexingTask]) { + for task in tasks { + if task.shard_ids.is_empty() { continue; } - let pipeline_uid = indexing_task.pipeline_uid(); + let pipeline_uid = task.pipeline_uid(); let Some(pipeline_handle) = self.indexing_pipelines.get(&pipeline_uid) else { continue; }; let assignment = Assignment { - shard_ids: indexing_task.shard_ids.clone(), + shard_ids: task.shard_ids.iter().copied().collect(), }; let message = AssignShards(assignment); @@ -533,27 +531,37 @@ impl IndexingService { error!(error=%error, "failed to assign shards to indexing pipeline"); } } + } - // Shut down currently running pipelines that are missing in the new plan. - self.shutdown_pipelines(&pipeline_uid_to_remove).await; - + /// Applies the indexing plan by: + /// - Stopping the running pipelines not present in the provided plan. + /// - Starting the pipelines that are not running. + /// Note: the indexing is a list of `IndexingTask` and has no ordinal + /// like a pipeline. We assign an ordinal for each `IndexingTask` from + /// [0, n) with n the number of indexing tasks given a (index_id, source_id). + async fn apply_indexing_plan( + &mut self, + tasks: &[IndexingTask], + ctx: &ActorContext, + ) -> Result<(), IndexingError> { + self.find_and_shutdown_decommissioned_pipelines(tasks).await; + let failed_spawning_pipeline_ids = self.find_and_spawn_new_pipelines(tasks, ctx).await?; + self.assign_shards_to_pipelines(tasks).await; self.update_cluster_running_indexing_tasks_in_chitchat() .await; - if !failed_spawning_pipeline_ids.is_empty() { return Err(IndexingError::SpawnPipelinesError { pipeline_ids: failed_spawning_pipeline_ids, }); } - - Ok(ApplyIndexingPlanResponse {}) + Ok(()) } /// Spawns the pipelines with supplied ids and returns a list of failed pipelines. async fn spawn_pipelines( &mut self, - ctx: &ActorContext, added_pipeline_ids: &[IndexingPipelineId], + ctx: &ActorContext, ) -> Result, IndexingError> { // We fetch the new indexes metadata. let indexes_metadata_futures = added_pipeline_ids @@ -645,21 +653,23 @@ impl IndexingService { } async fn update_cluster_running_indexing_tasks_in_chitchat(&self) { - let indexing_tasks = self + let mut indexing_tasks: Vec = self .indexing_pipelines .values() - .map(|pipeline_handle| &pipeline_handle.indexing_pipeline_id) - .map(|pipeline_id| IndexingTask { - index_uid: pipeline_id.index_uid.to_string(), - source_id: pipeline_id.source_id.clone(), - pipeline_uid: Some(pipeline_id.pipeline_uid), - shard_ids: Vec::new(), - }) - // Sort indexing tasks so it's more readable for debugging purpose. - .sorted_by(|left, right| { - (&left.index_uid, &left.source_id).cmp(&(&right.index_uid, &right.source_id)) + .map(|handle| IndexingTask { + index_uid: handle.indexing_pipeline_id.index_uid.to_string(), + source_id: handle.indexing_pipeline_id.source_id.clone(), + pipeline_uid: Some(handle.indexing_pipeline_id.pipeline_uid), + shard_ids: handle + .handle + .last_observation() + .shard_ids + .iter() + .copied() + .collect(), }) - .collect_vec(); + .collect(); + indexing_tasks.sort_unstable_by_key(|task| task.pipeline_uid); if let Err(error) = self .cluster @@ -820,7 +830,10 @@ impl Handler for IndexingService { plan_request: ApplyIndexingPlanRequest, ctx: &ActorContext, ) -> Result { - Ok(self.apply_indexing_plan(ctx, plan_request).await) + Ok(self + .apply_indexing_plan(&plan_request.indexing_tasks, ctx) + .await + .map(|_| ApplyIndexingPlanResponse {})) } } diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index aad20a9b665..35a2184d005 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -23,7 +23,7 @@ use fail::fail_point; use quickwit_actors::{Actor, ActorContext, Handler, Mailbox, QueueCapacity}; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, PublishSplitsRequest}; use serde::Serialize; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; use crate::actors::MergePlanner; use crate::models::{NewSplits, SplitsUpdate}; @@ -156,12 +156,15 @@ impl Handler for Publisher { // considered an error. For instance, if the source is a // FileSource, it will terminate upon EOF and drop its // mailbox. - let _ = ctx + let suggest_truncate_res = ctx .send_message( source_mailbox, SuggestTruncate(checkpoint.source_delta.get_source_checkpoint()), ) .await; + if let Err(send_truncate_err) = suggest_truncate_res { + warn!(error=?send_truncate_err, "failed to send truncate message from publisher to source"); + } } } diff --git a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs index bd404021d45..9335bec1cfb 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs @@ -17,9 +17,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::BTreeSet; use std::sync::atomic::Ordering; use quickwit_proto::indexing::PipelineMetrics; +use quickwit_proto::types::ShardId; use serde::Serialize; use crate::actors::{DocProcessorCounters, IndexerCounters, PublisherCounters, UploaderCounters}; @@ -51,6 +53,9 @@ pub struct IndexingStatistics { pub num_spawn_attempts: usize, // Pipeline metrics. pub pipeline_metrics_opt: Option, + // List of shard ids. + #[schema(value_type = Vec)] + pub shard_ids: BTreeSet, } impl IndexingStatistics { diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 749df2f8bd0..fef78c66bf6 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::BTreeSet; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -24,7 +25,6 @@ use std::time::Duration; use anyhow::{bail, Context}; use async_trait::async_trait; use fnv::FnvHashMap; -use itertools::Itertools; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_common::pubsub::EventBroker; use quickwit_common::retry::RetryParams; @@ -116,10 +116,10 @@ impl ClientId { enum IndexingStatus { #[default] Active, - // We have emitted all documents of the pipeline until EOF. - // Disclaimer: a complete status does not mean that all documents have been indexed. - // Some document might be still travelling in the pipeline, and may not have been published - // yet. + // We have received all documents from the stream. Note they + // are not necessarily published yet. + EofReached, + // All documents have been indexed AND published. Complete, Error, } @@ -128,6 +128,7 @@ enum IndexingStatus { struct AssignedShard { leader_id: NodeId, follower_id_opt: Option, + // This is just the shard id converted to a partition id object. partition_id: PartitionId, current_position_inclusive: Position, status: IndexingStatus, @@ -217,7 +218,7 @@ impl IngestSource { batch_builder.force_commit(); } MRecord::Eof => { - assigned_shard.status = IndexingStatus::Complete; + assigned_shard.status = IndexingStatus::EofReached; break; } MRecord::Unknown => { @@ -239,7 +240,9 @@ impl IngestSource { fn process_fetch_stream_error(&mut self, fetch_stream_error: FetchStreamError) { if let Some(shard) = self.assigned_shards.get_mut(&fetch_stream_error.shard_id) { - if shard.status != IndexingStatus::Complete { + if shard.status != IndexingStatus::Complete + || shard.status != IndexingStatus::EofReached + { shard.status = IndexingStatus::Error; } } @@ -250,8 +253,21 @@ impl IngestSource { self.client_id.source_uid.clone(), truncate_positions.clone(), ); + + // Let's record all shards that have reached Eof as complete. + for (shard, truncate_position) in &truncate_positions { + if truncate_position == &Position::Eof { + if let Some(assigned_shard) = self.assigned_shards.get_mut(shard) { + assigned_shard.status = IndexingStatus::Complete; + } + } + } + + // We publish the event to the event broker. self.event_broker.publish(shard_positions_update); + // Finally, we push the information to ingesters in a best effort manner. + // If the request fail, we just log an error. let mut per_ingester_truncate_subrequests: FnvHashMap< &NodeId, Vec, @@ -328,6 +344,64 @@ impl IngestSource { } false } + + /// If the new assignment removes a shard that we were in the middle of indexing (ie they have + /// not reached `IndexingStatus::Complete` status yet), we need to reset the pipeline: + /// + /// Ongoing work and splits travelling through the pipeline will be dropped. + async fn reset_if_needed( + &mut self, + new_assigned_shard_ids: &BTreeSet, + doc_processor_mailbox: &Mailbox, + ctx: &SourceContext, + ) -> anyhow::Result<()> { + // If we removed one shard that has not reach the complete status yet, + // we need to reset the pipeline. + let mut removed_shard_ids = self + .assigned_shards + .keys() + .copied() + .filter(|shard_id| !new_assigned_shard_ids.contains(shard_id)); + + let reset_needed: bool = removed_shard_ids.any(|removed_shard_id| { + let Some(assigned_shard) = self.assigned_shards.get(&removed_shard_id) else { + return false; + }; + assigned_shard.status != IndexingStatus::Complete + }); + + if !reset_needed { + // Not need to reset the fetch streams, we can just remove the shard that have been + // completely indexed. + self.assigned_shards.retain(|shard_id, assignment| { + if new_assigned_shard_ids.contains(shard_id) { + true + } else { + assert_eq!(assignment.status, IndexingStatus::Complete); + false + } + }); + return Ok(()); + } + + warn!("resetting pipeline"); + self.assigned_shards.clear(); + self.fetch_stream.reset(); + self.publish_lock.kill().await; + self.publish_lock = PublishLock::default(); + self.publish_token = self.client_id.new_publish_token(); + ctx.send_message( + doc_processor_mailbox, + NewPublishLock(self.publish_lock.clone()), + ) + .await?; + ctx.send_message( + doc_processor_mailbox, + NewPublishToken(self.publish_token.clone()), + ) + .await?; + Ok(()) + } } #[async_trait] @@ -376,47 +450,37 @@ impl Source for IngestSource { async fn assign_shards( &mut self, - mut new_assigned_shard_ids: Vec, + new_assigned_shard_ids: BTreeSet, doc_processor_mailbox: &Mailbox, ctx: &SourceContext, ) -> anyhow::Result<()> { - // TODO: Remove this check once the control plane stops sending identical assignments. - let current_assigned_shard_ids = self + self.reset_if_needed(&new_assigned_shard_ids, doc_processor_mailbox, ctx) + .await?; + + // As enforced by `reset_if_needed`, at this point, all currently assigned shards should be + // in the new_assigned_shards. + debug_assert!(self .assigned_shards .keys() - .copied() - .sorted() - .collect::>(); - - new_assigned_shard_ids.sort(); + .all(|shard_id| new_assigned_shard_ids.contains(shard_id))); - if current_assigned_shard_ids == new_assigned_shard_ids { + if self.assigned_shards.len() == new_assigned_shard_ids.len() { + // Nothing to do. + // The set shards is unchanged. return Ok(()); } - info!("new shard assignment: `{:?}`", new_assigned_shard_ids); - self.assigned_shards.clear(); - self.fetch_stream.reset(); - self.publish_lock.kill().await; - self.publish_lock = PublishLock::default(); - self.publish_token = self.client_id.new_publish_token(); - - ctx.send_message( - doc_processor_mailbox, - NewPublishLock(self.publish_lock.clone()), - ) - .await?; + let added_shard_ids: Vec = new_assigned_shard_ids + .into_iter() + .filter(|shard_id| !self.assigned_shards.contains_key(shard_id)) + .collect(); - ctx.send_message( - doc_processor_mailbox, - NewPublishToken(self.publish_token.clone()), - ) - .await?; + info!(added_shards=?added_shard_ids, "adding shards assignment"); let acquire_shards_subrequest = AcquireShardsSubrequest { index_uid: self.client_id.source_uid.index_uid.to_string(), source_id: self.client_id.source_uid.source_id.clone(), - shard_ids: new_assigned_shard_ids, + shard_ids: added_shard_ids, publish_token: self.publish_token.clone(), }; let acquire_shards_request = AcquireShardsRequest { @@ -426,7 +490,6 @@ impl Source for IngestSource { .protect_future(self.metastore.acquire_shards(acquire_shards_request)) .await .context("failed to acquire shards")?; - let acquire_shards_subresponse = acquire_shards_response .subresponses .into_iter() @@ -466,7 +529,6 @@ impl Source for IngestSource { IndexingStatus::Active }; truncate_positions.push((shard_id, current_position_inclusive.clone())); - let assigned_shard = AssignedShard { leader_id, follower_id_opt, @@ -477,6 +539,7 @@ impl Source for IngestSource { self.assigned_shards.insert(shard_id, assigned_shard); } self.truncate(truncate_positions).await; + Ok(()) } @@ -491,7 +554,6 @@ impl Source for IngestSource { let shard_id = partition_id.as_u64().expect("shard ID should be a u64"); truncate_positions.push((shard_id, position)); } - self.truncate(truncate_positions).await; Ok(()) } @@ -511,9 +573,11 @@ impl Source for IngestSource { #[cfg(test)] mod tests { + use std::iter::once; use std::path::PathBuf; use bytes::Bytes; + use itertools::Itertools; use quickwit_actors::{ActorContext, Universe}; use quickwit_common::ServiceStream; use quickwit_config::{SourceConfig, SourceParams}; @@ -543,16 +607,48 @@ mod tests { 00000000000000000000000000/00000000000000000000000000"; let mut mock_metastore = MetastoreServiceClient::mock(); + mock_metastore + .expect_acquire_shards() + .withf(|request| { + assert_eq!(request.subrequests.len(), 1); + request.subrequests[0].shard_ids == [0] + }) + .once() + .returning(|request| { + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + let response = AcquireShardsResponse { + subresponses: vec![AcquireShardsSubresponse { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + acquired_shards: vec![Shard { + leader_id: "test-ingester-0".to_string(), + follower_id: None, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 0, + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(10u64.into()), + publish_token: Some(publish_token.to_string()), + }], + }], + }; + Ok(response) + }); mock_metastore .expect_acquire_shards() .once() + .withf(|request| { + assert_eq!(request.subrequests.len(), 1); + request.subrequests[0].shard_ids == [1] + }) .returning(|request| { assert_eq!(request.subrequests.len(), 1); let subrequest = &request.subrequests[0]; assert_eq!(subrequest.index_uid, "test-index:0"); assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_ids, vec![1, 2]); let response = AcquireShardsResponse { subresponses: vec![AcquireShardsSubresponse { @@ -572,27 +668,108 @@ mod tests { }; Ok(response) }); + mock_metastore + .expect_acquire_shards() + .withf(|request| { + assert_eq!(request.subrequests.len(), 1); + request.subrequests[0].shard_ids == [1, 2] + }) + .once() + .returning(|request| { + assert_eq!(request.subrequests.len(), 1); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + + let response = AcquireShardsResponse { + subresponses: vec![AcquireShardsSubresponse { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + acquired_shards: vec![ + Shard { + leader_id: "test-ingester-0".to_string(), + follower_id: None, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(11u64.into()), + publish_token: Some(publish_token.to_string()), + }, + Shard { + leader_id: "test-ingester-0".to_string(), + follower_id: None, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 2, + shard_state: ShardState::Open as i32, + publish_position_inclusive: Some(12u64.into()), + publish_token: Some(publish_token.to_string()), + }, + ], + }], + }; + Ok(response) + }); let ingester_pool = IngesterPool::default(); + // This sequence is used to remove the race condition by waiting for the fetch stream + // request. + let (sequence_tx, mut sequence_rx) = tokio::sync::mpsc::unbounded_channel::(); + let mut ingester_mock_0 = IngesterServiceClient::mock(); + let sequence_tx_clone1 = sequence_tx.clone(); ingester_mock_0 .expect_open_fetch_stream() + .withf(|req| req.from_position_exclusive == Some(10u64.into()) && req.shard_id == 0) .once() - .returning(|request| { + .returning(move |request| { + sequence_tx_clone1.send(1).unwrap(); + assert_eq!( + request.client_id, + "indexer/test-node/test-index:0/test-source/00000000000000000000000000" + ); + assert_eq!(request.index_uid, "test-index:0"); + assert_eq!(request.source_id, "test-source"); + let (_service_stream_tx, service_stream) = ServiceStream::new_bounded(1); + Ok(service_stream) + }); + let sequence_tx_clone2 = sequence_tx.clone(); + ingester_mock_0 + .expect_open_fetch_stream() + .withf(|req| req.from_position_exclusive == Some(11u64.into()) && req.shard_id == 1) + .times(2) + .returning(move |request| { + sequence_tx_clone2.send(2).unwrap(); + assert_eq!( + request.client_id, + "indexer/test-node/test-index:0/test-source/00000000000000000000000000" + ); + assert_eq!(request.index_uid, "test-index:0"); + assert_eq!(request.source_id, "test-source"); + let (_service_stream_tx, service_stream) = ServiceStream::new_bounded(1); + Ok(service_stream) + }); + let sequence_tx_clone3 = sequence_tx.clone(); + ingester_mock_0 + .expect_open_fetch_stream() + .withf(|req| req.from_position_exclusive == Some(12u64.into()) && req.shard_id == 2) + .once() + .returning(move |request| { + sequence_tx_clone3.send(3).unwrap(); assert_eq!( request.client_id, "indexer/test-node/test-index:0/test-source/00000000000000000000000000" ); assert_eq!(request.index_uid, "test-index:0"); assert_eq!(request.source_id, "test-source"); - assert_eq!(request.shard_id, 1); - assert_eq!(request.from_position_exclusive, Some(11u64.into())); - let (_service_stream_tx, service_stream) = ServiceStream::new_bounded(1); Ok(service_stream) }); ingester_mock_0 .expect_truncate_shards() + .withf(|truncate_req| truncate_req.subrequests[0].shard_id == 0) .once() .returning(|request| { assert_eq!(request.ingester_id, "test-ingester-0"); @@ -601,8 +778,45 @@ mod tests { let subrequest = &request.subrequests[0]; assert_eq!(subrequest.index_uid, "test-index:0"); assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id, 1); + assert_eq!(subrequest.to_position_inclusive, Some(10u64.into())); + + let response = TruncateShardsResponse {}; + Ok(response) + }); + + ingester_mock_0 + .expect_truncate_shards() + .withf(|truncate_req| truncate_req.subrequests[0].shard_id == 1) + .once() + .returning(|request| { + assert_eq!(request.ingester_id, "test-ingester-0"); + assert_eq!(request.subrequests.len(), 1); + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); assert_eq!(subrequest.to_position_inclusive, Some(11u64.into())); + Ok(TruncateShardsResponse {}) + }); + ingester_mock_0 + .expect_truncate_shards() + .withf(|truncate_req| { + truncate_req.subrequests.len() == 2 + && truncate_req.subrequests[0].shard_id == 1 + && truncate_req.subrequests[1].shard_id == 2 + }) + .once() + .returning(|request| { + assert_eq!(request.ingester_id, "test-ingester-0"); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.to_position_inclusive, Some(11u64.into())); + + let subrequest = &request.subrequests[1]; + assert_eq!(subrequest.index_uid, "test-index:0"); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.to_position_inclusive, Some(12u64.into())); let response = TruncateShardsResponse {}; Ok(response) @@ -638,16 +852,44 @@ mod tests { let ctx: SourceContext = ActorContext::for_test(&universe, source_mailbox, observable_state_tx); - // In this scenario, the indexer will only be able to acquire shard 1. + // We assign [0] (previously []). + // The stream does not need to be reset. + let shard_ids: BTreeSet = once(0).collect(); let publish_lock = source.publish_lock.clone(); + source + .assign_shards(shard_ids, &doc_processor_mailbox, &ctx) + .await + .unwrap(); + assert_eq!(sequence_rx.recv().await.unwrap(), 1); + assert!(publish_lock.is_alive()); + assert_eq!(publish_lock, source.publish_lock); + // We assign [0,1] (previously [0]). This should just add the shard 1. + // The stream does not need to be reset. + let shard_ids: BTreeSet = (0..2).collect(); + let publish_lock = source.publish_lock.clone(); + source + .assign_shards(shard_ids, &doc_processor_mailbox, &ctx) + .await + .unwrap(); + assert_eq!(sequence_rx.recv().await.unwrap(), 2); + assert!(publish_lock.is_alive()); + assert_eq!(publish_lock, source.publish_lock); + + // We assign [1,2]. (previously [0,1]) This should reset the stream + // because the shard 0 has to be removed. + // The publish lock should be killed and a new one should be created. + let shard_ids: BTreeSet = (1..3).collect(); + let publish_lock = source.publish_lock.clone(); source - .assign_shards(vec![1, 2], &doc_processor_mailbox, &ctx) + .assign_shards(shard_ids, &doc_processor_mailbox, &ctx) .await .unwrap(); - assert!(publish_lock.is_dead()); + assert_eq!(sequence_rx.recv().await.unwrap(), 3); + assert!(!publish_lock.is_alive()); assert!(source.publish_lock.is_alive()); + assert_ne!(publish_lock, source.publish_lock); let NewPublishLock(publish_lock) = doc_processor_inbox .recv_typed_message::() @@ -663,7 +905,7 @@ mod tests { .unwrap(); assert_eq!(source.publish_token, publish_token); - assert_eq!(source.assigned_shards.len(), 1); + assert_eq!(source.assigned_shards.len(), 2); let assigned_shard = source.assigned_shards.get(&1).unwrap(); let expected_assigned_shard = AssignedShard { @@ -675,6 +917,16 @@ mod tests { }; assert_eq!(assigned_shard, &expected_assigned_shard); + let assigned_shard = source.assigned_shards.get(&2).unwrap(); + let expected_assigned_shard = AssignedShard { + leader_id: "test-ingester-0".into(), + follower_id_opt: None, + partition_id: 2u64.into(), + current_position_inclusive: 12u64.into(), + status: IndexingStatus::Active, + }; + assert_eq!(assigned_shard, &expected_assigned_shard); + // Wait for the truncate future to complete. time::sleep(Duration::from_millis(1)).await; } @@ -779,8 +1031,11 @@ mod tests { let ctx: SourceContext = ActorContext::for_test(&universe, source_mailbox, observable_state_tx); + // In this scenario, the indexer will only be able to acquire shard 1. + let shard_ids: BTreeSet = once(1).collect(); + source - .assign_shards(vec![1], &doc_processor_mailbox, &ctx) + .assign_shards(shard_ids, &doc_processor_mailbox, &ctx) .await .unwrap(); @@ -862,7 +1117,7 @@ mod tests { .returning(|request| { assert_eq!( request.client_id, - "indexer/test-node/test-index:0/test-source/0" + "indexer/test-node/test-index:0/test-source/00000000000000000000000000" ); assert_eq!(request.index_uid, "test-index:0"); assert_eq!(request.source_id, "test-source"); @@ -932,6 +1187,8 @@ mod tests { let ctx: SourceContext = ActorContext::for_test(&universe, source_mailbox, observable_state_tx); + // In this scenario, the indexer will only be able to acquire shard 1. + let shard_ids: BTreeSet = (1..3).collect(); assert_eq!( shard_positions_update_rx.try_recv().unwrap_err(), TryRecvError::Empty @@ -939,7 +1196,7 @@ mod tests { // In this scenario, the indexer will only be able to acquire shard 1. source - .assign_shards(vec![1, 2], &doc_processor_mailbox, &ctx) + .assign_shards(shard_ids, &doc_processor_mailbox, &ctx) .await .unwrap(); @@ -1079,7 +1336,7 @@ mod tests { .await .unwrap(); let shard = source.assigned_shards.get(&2).unwrap(); - assert_eq!(shard.status, IndexingStatus::Complete); + assert_eq!(shard.status, IndexingStatus::EofReached); fetch_response_tx .send(Err(FetchStreamError { diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index addda75a6c2..ce308a10767 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -72,6 +72,7 @@ mod source_factory; mod vec_source; mod void_source; +use std::collections::BTreeSet; use std::path::PathBuf; use std::time::Duration; @@ -238,7 +239,7 @@ pub trait Source: Send + 'static { /// plane. async fn assign_shards( &mut self, - _shard_ids: Vec, + _shard_ids: BTreeSet, _doc_processor_mailbox: &Mailbox, _ctx: &SourceContext, ) -> anyhow::Result<()> { @@ -299,7 +300,7 @@ struct Loop; #[derive(Debug)] pub struct Assignment { - pub shard_ids: Vec, + pub shard_ids: BTreeSet, } #[derive(Debug)]