diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index 54d7ce7813b..ab1d14144d8 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -56,7 +56,7 @@ pub struct IngestApiSourceCounters { } pub struct IngestApiSource { - ctx: Arc, + runtime_args: Arc, source_id: String, partition_id: PartitionId, ingest_api_service: Mailbox, @@ -71,26 +71,26 @@ impl fmt::Debug for IngestApiSource { impl IngestApiSource { pub async fn try_new( - ctx: Arc, + runtime_args: Arc, checkpoint: SourceCheckpoint, ) -> anyhow::Result { - let source_id = ctx.source_id().to_string(); - let queues_dir_path = ctx.queues_dir_path.as_path(); + let source_id = runtime_args.source_id().to_string(); + let queues_dir_path = runtime_args.queues_dir_path.as_path(); let ingest_api_service = get_ingest_api_service(queues_dir_path).await?; let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into(); // Ensure a queue for this index exists. let create_queue_req = CreateQueueIfNotExistsRequest { - queue_id: ctx.index_id().to_string(), + queue_id: runtime_args.index_id().to_string(), }; ingest_api_service.ask_for_res(create_queue_req).await?; - let previous_offset = checkpoint + let previous_offset: Option = checkpoint .position_for_partition(&partition_id) .map(|position| position.as_u64().expect("offset should be stored as u64")); let current_offset = previous_offset; let ingest_api_source = IngestApiSource { - ctx, + runtime_args, source_id, partition_id, ingest_api_service, @@ -103,6 +103,20 @@ impl IngestApiSource { Ok(ingest_api_source) } + async fn send_suggest_truncate_to_ingest_service( + &self, + up_to_position_included: u64, + ctx: &ActorContext, + ) -> anyhow::Result<()> { + let suggest_truncate_req = SuggestTruncateRequest { + index_id: self.runtime_args.index_id().to_string(), + up_to_position_included, + }; + ctx.ask_for_res(&self.ingest_api_service, suggest_truncate_req) + .await?; + Ok(()) + } + fn update_counters(&mut self, current_offset: u64, num_docs: u64) { self.counters.num_docs_processed += num_docs; self.counters.current_offset = Some(current_offset); @@ -112,13 +126,25 @@ impl IngestApiSource { #[async_trait] impl Source for IngestApiSource { + async fn initialize( + &mut self, + _: &Mailbox, + ctx: &SourceContext, + ) -> Result<(), ActorExitStatus> { + if let Some(position) = self.counters.previous_offset { + self.send_suggest_truncate_to_ingest_service(position, ctx) + .await?; + } + Ok(()) + } + async fn emit_batches( &mut self, batch_sink: &Mailbox, ctx: &SourceContext, ) -> Result { let fetch_req = FetchRequest { - index_id: self.ctx.index_id().to_string(), + index_id: self.runtime_args.index_id().to_string(), start_after: self.counters.current_offset, num_bytes_limit: None, }; @@ -176,13 +202,8 @@ impl Source for IngestApiSource { checkpoint.position_for_partition(&self.partition_id) { let up_to_position_included = offset.as_u64().expect("offset should be stored as u64"); - let suggest_truncate_req = SuggestTruncateRequest { - index_id: self.ctx.index_id().to_string(), - up_to_position_included, - }; - ctx.ask_for_res(&self.ingest_api_service, suggest_truncate_req) - .await - .map_err(anyhow::Error::from)?; + self.send_suggest_truncate_to_ingest_service(up_to_position_included, ctx) + .await?; } Ok(()) } @@ -217,6 +238,7 @@ mod tests { use std::num::NonZeroUsize; use std::time::Duration; + use quickwit_actors::Command::Nudge; use quickwit_actors::Universe; use quickwit_common::rand::append_random_suffix; use quickwit_config::{ @@ -364,12 +386,26 @@ mod tests { let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); + let ingest_api_service = init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; - let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into(); + let create_queue_req = CreateQueueIfNotExistsRequest { + queue_id: index_id.clone(), + }; + ingest_api_service + .ask_for_res(create_queue_req) + .await + .unwrap(); + + let ingest_req = make_ingest_request(index_id.clone(), 4, 1000, CommitType::Auto); + ingest_api_service + .ask_for_res(ingest_req) + .await + .map_err(|err| anyhow::anyhow!(err.to_string()))?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let mut checkpoint = SourceCheckpoint::default(); + let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into(); let checkpoint_delta = SourceCheckpointDelta::from_partition_delta( partition_id.clone(), Position::from(0u64), @@ -393,11 +429,6 @@ mod tests { let (_ingest_api_source_mailbox, ingest_api_source_handle) = universe.spawn_builder().spawn(ingest_api_source_actor); - let ingest_req = make_ingest_request(index_id.clone(), 4, 1000, CommitType::Auto); - ingest_api_service - .ask_for_res(ingest_req) - .await - .map_err(|err| anyhow::anyhow!(err.to_string()))?; universe.sleep(Duration::from_secs(2)).await; let counters = ingest_api_source_handle .process_pending_and_observe() @@ -609,4 +640,72 @@ mod tests { universe.assert_quit().await; Ok(()) } + + #[tokio::test] + async fn test_ingest_api_source_truncate_on_initialize() -> anyhow::Result<()> { + let universe = Universe::with_accelerated_time(); + let metastore = metastore_for_test(); + let index_id = append_random_suffix("test-ingest-api-source"); + let index_uid = IndexUid::new_with_random_ulid(&index_id); + let temp_dir = tempfile::tempdir()?; + let queues_dir_path = temp_dir.path(); + + let ingest_api_service = + init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; + let (doc_processor_mailbox, _doc_processor_inbox) = universe.create_test_mailbox(); + let source_config = make_source_config(); + let ctx = SourceRuntimeArgs::for_test( + index_uid, + source_config, + metastore, + queues_dir_path.to_path_buf(), + ); + + let create_queue_req = CreateQueueIfNotExistsRequest { + queue_id: index_id.clone(), + }; + ingest_api_service + .ask_for_res(create_queue_req) + .await + .unwrap(); + + let ingest_req = make_ingest_request(index_id.clone(), 2, 20_000, CommitType::Auto); + ingest_api_service.ask(ingest_req).await.unwrap().unwrap(); + + let fetch_request = FetchRequest { + index_id: index_id.clone(), + start_after: None, + num_bytes_limit: None, + }; + let FetchResponse { first_position, .. } = ingest_api_service + .ask(fetch_request.clone()) + .await + .unwrap() + .unwrap(); + assert_eq!(first_position, Some(0)); + + let partition_id = ingest_api_service.ask(GetPartitionId).await?.into(); + let mut source_checkpoint = SourceCheckpoint::default(); + source_checkpoint.add_partition(partition_id, Position::from(10u64)); + let ingest_api_source = IngestApiSource::try_new(ctx, source_checkpoint).await?; + let ingest_api_source_actor = SourceActor { + source: Box::new(ingest_api_source), + doc_processor_mailbox, + }; + let (ingest_api_source_mailbox, ingest_api_source_handle) = + universe.spawn_builder().spawn(ingest_api_source_actor); + + ingest_api_source_mailbox.ask(Nudge).await.unwrap(); + let FetchResponse { first_position, .. } = ingest_api_service + .ask(fetch_request.clone()) + .await + .unwrap() + .unwrap(); + // We should have truncated to keep only message strictly after the source checkpoint. + assert_eq!(first_position, Some(11u64)); + + ingest_api_source_handle.quit().await; + universe.assert_quit().await; + Ok(()) + } } diff --git a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs index 7b51c460739..693c9e46785 100644 --- a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs +++ b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs @@ -42,6 +42,7 @@ pub struct IngestResponse { #[prost(uint64, tag = "1")] pub num_docs_for_processing: u64, } +/// Fetch messages that have position strictly after `start_after`. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/quickwit/quickwit-ingest/src/ingest_api_service.rs b/quickwit/quickwit-ingest/src/ingest_api_service.rs index f69f265a14b..73946f17bee 100644 --- a/quickwit/quickwit-ingest/src/ingest_api_service.rs +++ b/quickwit/quickwit-ingest/src/ingest_api_service.rs @@ -58,7 +58,7 @@ pub struct IngestApiService { } impl fmt::Debug for IngestApiService { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("IngestApiService") .field("partition_id", &self.partition_id) .field("memory_limit", &self.memory_limit) diff --git a/quickwit/quickwit-ingest/src/ingest_service.proto b/quickwit/quickwit-ingest/src/ingest_service.proto index 645ee421a26..5b22203fda4 100644 --- a/quickwit/quickwit-ingest/src/ingest_service.proto +++ b/quickwit/quickwit-ingest/src/ingest_service.proto @@ -88,6 +88,7 @@ message IngestResponse { uint64 num_docs_for_processing = 1; } +// Fetch messages with position strictly after `start_after`. message FetchRequest { string index_id = 1; optional uint64 start_after = 2;