Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deserializing QueryAST for every split. #5282

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 52 additions & 30 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::Field;
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term};
use tokio::task::JoinError;
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
Expand Down Expand Up @@ -342,9 +343,11 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
}

/// Apply a leaf search on a single split.
#[allow(clippy::too_many_arguments)]
async fn leaf_search_single_split(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
query_ast: Arc<QueryAst>,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
doc_mapper: Arc<dyn DocMapper>,
Expand All @@ -363,9 +366,6 @@ async fn leaf_search_single_split(
return Ok(cached_answer);
}

let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date and time filter, where the current
// split can't have better results.
Expand Down Expand Up @@ -1217,12 +1217,17 @@ pub async fn leaf_search(

let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(split_with_req.len());
let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> =
Vec::with_capacity(split_with_req.len());

let merge_collector = make_merge_collector(&request, &aggregations_limits)?;
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

let query_ast: Arc<QueryAst> = serde_json::from_str::<QueryAst>(&request.query_ast)
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?
.into();

for (split, mut request) in split_with_req {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
Expand All @@ -1236,44 +1241,59 @@ pub async fn leaf_search(
continue;
}

leaf_search_single_split_futures.push(tokio::spawn(
leaf_search_single_split_wrapper(
request,
searcher_context.clone(),
index_storage.clone(),
doc_mapper.clone(),
split,
split_filter.clone(),
incremental_merge_collector.clone(),
leaf_split_search_permit,
aggregations_limits.clone(),
)
.in_current_span(),
leaf_search_single_split_join_handles.push((
split.split_id.clone(),
tokio::spawn(
leaf_search_single_split_wrapper(
request,
query_ast.clone(),
searcher_context.clone(),
index_storage.clone(),
doc_mapper.clone(),
split,
split_filter.clone(),
incremental_merge_collector.clone(),
leaf_split_search_permit,
aggregations_limits.clone(),
)
.in_current_span(),
),
));
}

// TODO we could cancel running splits when !run_all_splits and the running split can no
// longer give better results after some other split answered.
let split_search_results: Vec<Result<(), _>> =
futures::future::join_all(leaf_search_single_split_futures).await;
let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new();

// There is no need to use `join_all`, as these are spawned tasks.
for (split, leaf_search_join_handle) in leaf_search_single_split_join_handles {
// splits that did not panic were already added to the collector
if let Err(join_error) = leaf_search_join_handle.await {
if join_error.is_cancelled() {
// An explicit task cancellation is not an error.
continue;
}
if join_error.is_panic() {
error!(split=%split, "leaf search task panicked");
} else {
error!(split=%split, "please report: leaf search was not cancelled, and could not extract panic. this should never happen");
}
split_search_join_errors.push((split, join_error));
}
}

// we can't use unwrap_or_clone because mutexes aren't Clone
let mut incremental_merge_collector = match Arc::try_unwrap(incremental_merge_collector) {
Ok(filter_merger) => filter_merger.into_inner().unwrap(),
Err(filter_merger) => filter_merger.lock().unwrap().clone(),
};

for result in split_search_results {
// splits that did not panic were already added to the collector
if let Err(e) = result {
incremental_merge_collector.add_failed_split(SplitSearchError {
// we could reasonably add a wrapper to the JoinHandle to give us the
// split_id anyway
split_id: "unknown".to_string(),
error: format!("{}", SearchError::from(e)),
retryable_error: true,
})
}
for (split_id, split_search_join_error) in split_search_join_errors {
incremental_merge_collector.add_failed_split(SplitSearchError {
split_id,
error: format!("{}", SearchError::from(split_search_join_error)),
retryable_error: true,
});
}

let result = crate::search_thread_pool()
Expand All @@ -1289,6 +1309,7 @@ pub async fn leaf_search(
#[instrument(skip_all, fields(split_id = split.split_id))]
async fn leaf_search_single_split_wrapper(
request: SearchRequest,
query_ast: Arc<QueryAst>,
searcher_context: Arc<SearcherContext>,
index_storage: Arc<dyn Storage>,
doc_mapper: Arc<dyn DocMapper>,
Expand All @@ -1305,6 +1326,7 @@ async fn leaf_search_single_split_wrapper(
let leaf_search_single_split_res = leaf_search_single_split(
&searcher_context,
request,
query_ast,
index_storage,
split.clone(),
doc_mapper,
Expand Down
Loading