Skip to content

Commit

Permalink
Bugfix: truncating the ingest v1 source on initialization. (#4127)
Browse files Browse the repository at this point in the history
* Bugfix: truncating the ingest v1 source on initialization.

* Update quickwit/quickwit-ingest/src/ingest_service.proto

Co-authored-by: François Massot <[email protected]>

---------

Co-authored-by: François Massot <[email protected]>
  • Loading branch information
fulmicoton and fmassot committed Nov 14, 2023
1 parent 41f4bd5 commit 443be6e
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 22 deletions.
141 changes: 120 additions & 21 deletions quickwit/quickwit-indexing/src/source/ingest_api_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct IngestApiSourceCounters {
}

pub struct IngestApiSource {
ctx: Arc<SourceRuntimeArgs>,
runtime_args: Arc<SourceRuntimeArgs>,
source_id: String,
partition_id: PartitionId,
ingest_api_service: Mailbox<IngestApiService>,
Expand All @@ -71,26 +71,26 @@ impl fmt::Debug for IngestApiSource {

impl IngestApiSource {
pub async fn try_new(
ctx: Arc<SourceRuntimeArgs>,
runtime_args: Arc<SourceRuntimeArgs>,
checkpoint: SourceCheckpoint,
) -> anyhow::Result<Self> {
let source_id = ctx.source_id().to_string();
let queues_dir_path = ctx.queues_dir_path.as_path();
let source_id = runtime_args.source_id().to_string();
let queues_dir_path = runtime_args.queues_dir_path.as_path();
let ingest_api_service = get_ingest_api_service(queues_dir_path).await?;
let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into();

// Ensure a queue for this index exists.
let create_queue_req = CreateQueueIfNotExistsRequest {
queue_id: ctx.index_id().to_string(),
queue_id: runtime_args.index_id().to_string(),
};
ingest_api_service.ask_for_res(create_queue_req).await?;

let previous_offset = checkpoint
let previous_offset: Option<u64> = checkpoint
.position_for_partition(&partition_id)
.map(|position| position.as_u64().expect("offset should be stored as u64"));
let current_offset = previous_offset;
let ingest_api_source = IngestApiSource {
ctx,
runtime_args,
source_id,
partition_id,
ingest_api_service,
Expand All @@ -103,6 +103,20 @@ impl IngestApiSource {
Ok(ingest_api_source)
}

async fn send_suggest_truncate_to_ingest_service(
&self,
up_to_position_included: u64,
ctx: &ActorContext<SourceActor>,
) -> anyhow::Result<()> {
let suggest_truncate_req = SuggestTruncateRequest {
index_id: self.runtime_args.index_id().to_string(),
up_to_position_included,
};
ctx.ask_for_res(&self.ingest_api_service, suggest_truncate_req)
.await?;
Ok(())
}

fn update_counters(&mut self, current_offset: u64, num_docs: u64) {
self.counters.num_docs_processed += num_docs;
self.counters.current_offset = Some(current_offset);
Expand All @@ -112,13 +126,25 @@ impl IngestApiSource {

#[async_trait]
impl Source for IngestApiSource {
async fn initialize(
&mut self,
_: &Mailbox<DocProcessor>,
ctx: &SourceContext,
) -> Result<(), ActorExitStatus> {
if let Some(position) = self.counters.previous_offset {
self.send_suggest_truncate_to_ingest_service(position, ctx)
.await?;
}
Ok(())
}

async fn emit_batches(
&mut self,
batch_sink: &Mailbox<DocProcessor>,
ctx: &SourceContext,
) -> Result<Duration, ActorExitStatus> {
let fetch_req = FetchRequest {
index_id: self.ctx.index_id().to_string(),
index_id: self.runtime_args.index_id().to_string(),
start_after: self.counters.current_offset,
num_bytes_limit: None,
};
Expand Down Expand Up @@ -176,13 +202,8 @@ impl Source for IngestApiSource {
checkpoint.position_for_partition(&self.partition_id)
{
let up_to_position_included = offset.as_u64().expect("offset should be stored as u64");
let suggest_truncate_req = SuggestTruncateRequest {
index_id: self.ctx.index_id().to_string(),
up_to_position_included,
};
ctx.ask_for_res(&self.ingest_api_service, suggest_truncate_req)
.await
.map_err(anyhow::Error::from)?;
self.send_suggest_truncate_to_ingest_service(up_to_position_included, ctx)
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -217,6 +238,7 @@ mod tests {
use std::num::NonZeroUsize;
use std::time::Duration;

use quickwit_actors::Command::Nudge;
use quickwit_actors::Universe;
use quickwit_common::rand::append_random_suffix;
use quickwit_config::{
Expand Down Expand Up @@ -364,12 +386,26 @@ mod tests {
let index_uid = IndexUid::new_with_random_ulid(&index_id);
let temp_dir = tempfile::tempdir()?;
let queues_dir_path = temp_dir.path();

let ingest_api_service =
init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?;
let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into();
let create_queue_req = CreateQueueIfNotExistsRequest {
queue_id: index_id.clone(),
};
ingest_api_service
.ask_for_res(create_queue_req)
.await
.unwrap();

let ingest_req = make_ingest_request(index_id.clone(), 4, 1000, CommitType::Auto);
ingest_api_service
.ask_for_res(ingest_req)
.await
.map_err(|err| anyhow::anyhow!(err.to_string()))?;

let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox();
let mut checkpoint = SourceCheckpoint::default();
let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into();
let checkpoint_delta = SourceCheckpointDelta::from_partition_delta(
partition_id.clone(),
Position::from(0u64),
Expand All @@ -393,11 +429,6 @@ mod tests {
let (_ingest_api_source_mailbox, ingest_api_source_handle) =
universe.spawn_builder().spawn(ingest_api_source_actor);

let ingest_req = make_ingest_request(index_id.clone(), 4, 1000, CommitType::Auto);
ingest_api_service
.ask_for_res(ingest_req)
.await
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
universe.sleep(Duration::from_secs(2)).await;
let counters = ingest_api_source_handle
.process_pending_and_observe()
Expand Down Expand Up @@ -609,4 +640,72 @@ mod tests {
universe.assert_quit().await;
Ok(())
}

#[tokio::test]
async fn test_ingest_api_source_truncate_on_initialize() -> anyhow::Result<()> {
let universe = Universe::with_accelerated_time();
let metastore = metastore_for_test();
let index_id = append_random_suffix("test-ingest-api-source");
let index_uid = IndexUid::new_with_random_ulid(&index_id);
let temp_dir = tempfile::tempdir()?;
let queues_dir_path = temp_dir.path();

let ingest_api_service =
init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?;
let (doc_processor_mailbox, _doc_processor_inbox) = universe.create_test_mailbox();
let source_config = make_source_config();
let ctx = SourceRuntimeArgs::for_test(
index_uid,
source_config,
metastore,
queues_dir_path.to_path_buf(),
);

let create_queue_req = CreateQueueIfNotExistsRequest {
queue_id: index_id.clone(),
};
ingest_api_service
.ask_for_res(create_queue_req)
.await
.unwrap();

let ingest_req = make_ingest_request(index_id.clone(), 2, 20_000, CommitType::Auto);
ingest_api_service.ask(ingest_req).await.unwrap().unwrap();

let fetch_request = FetchRequest {
index_id: index_id.clone(),
start_after: None,
num_bytes_limit: None,
};
let FetchResponse { first_position, .. } = ingest_api_service
.ask(fetch_request.clone())
.await
.unwrap()
.unwrap();
assert_eq!(first_position, Some(0));

let partition_id = ingest_api_service.ask(GetPartitionId).await?.into();
let mut source_checkpoint = SourceCheckpoint::default();
source_checkpoint.add_partition(partition_id, Position::from(10u64));
let ingest_api_source = IngestApiSource::try_new(ctx, source_checkpoint).await?;
let ingest_api_source_actor = SourceActor {
source: Box::new(ingest_api_source),
doc_processor_mailbox,
};
let (ingest_api_source_mailbox, ingest_api_source_handle) =
universe.spawn_builder().spawn(ingest_api_source_actor);

ingest_api_source_mailbox.ask(Nudge).await.unwrap();
let FetchResponse { first_position, .. } = ingest_api_service
.ask(fetch_request.clone())
.await
.unwrap()
.unwrap();
// We should have truncated to keep only message strictly after the source checkpoint.
assert_eq!(first_position, Some(11u64));

ingest_api_source_handle.quit().await;
universe.assert_quit().await;
Ok(())
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/codegen/ingest_service.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct IngestApiService {
}

impl fmt::Debug for IngestApiService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("IngestApiService")
.field("partition_id", &self.partition_id)
.field("memory_limit", &self.memory_limit)
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message IngestResponse {
uint64 num_docs_for_processing = 1;
}

// Fetch messages with position strictly after `start_after`.
message FetchRequest {
string index_id = 1;
optional uint64 start_after = 2;
Expand Down

0 comments on commit 443be6e

Please sign in to comment.