Skip to content

Commit

Permalink
expose collect_block buffer size (#2326)
Browse files Browse the repository at this point in the history
* expose buffer of collect_block

* flip shard_size segment_size
  • Loading branch information
PSeitz authored Mar 15, 2024
1 parent 7ce950f commit 67ebba3
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 36 deletions.
6 changes: 3 additions & 3 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ pub struct TermsAggregation {
///
/// Defaults to 10 * size.
#[serde(skip_serializing_if = "Option::is_none", default)]
#[serde(alias = "segment_size")]
#[serde(alias = "shard_size")]
#[serde(alias = "split_size")]
pub shard_size: Option<u32>,
pub segment_size: Option<u32>,

/// If you set the `show_term_doc_count_error` parameter to true, the terms aggregation will
/// include doc_count_error_upper_bound, which is an upper bound to the error on the
Expand Down Expand Up @@ -200,7 +200,7 @@ impl TermsAggregationInternal {
pub(crate) fn from_req(req: &TermsAggregation) -> Self {
let size = req.size.unwrap_or(10);

let mut segment_size = req.shard_size.unwrap_or(size * 10);
let mut segment_size = req.segment_size.unwrap_or(size * 10);

let order = req.order.clone().unwrap_or_default();
segment_size = segment_size.max(size);
Expand Down
4 changes: 4 additions & 0 deletions src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ pub trait SegmentCollector: 'static {
fn collect(&mut self, doc: DocId, score: Score);

/// The query pushes the scored document to the collector via this method.
/// This method is used when the collector does not require scoring.
///
/// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the
/// buffer size passed to the collector.
fn collect_block(&mut self, docs: &[DocId]) {
for doc in docs {
self.collect(*doc, 0.0);
Expand Down
9 changes: 6 additions & 3 deletions src/docset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::DocId;
/// to compare `[u32; 4]`.
pub const TERMINATED: DocId = i32::MAX as u32;

pub const BUFFER_LEN: usize = 64;
/// The collect_block method on `SegmentCollector` uses a buffer of this size.
/// Passed results to `collect_block` will not exceed this size and will be
/// exactly this size as long as we can fill the buffer.
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;

/// Represents an iterable set of sorted doc ids.
pub trait DocSet: Send {
Expand Down Expand Up @@ -61,7 +64,7 @@ pub trait DocSet: Send {
/// This method is only here for specific high-performance
/// use case where batching. The normal way to
/// go through the `DocId`'s is to call `.advance()`.
fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
if self.doc() == TERMINATED {
return 0;
}
Expand Down Expand Up @@ -151,7 +154,7 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.seek(target)
}

fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.fill_buffer(buffer)
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ pub use common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64, HasLen};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};

pub use self::docset::{DocSet, TERMINATED};
pub use self::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
#[deprecated(
since = "0.22.0",
note = "Will be removed in tantivy 0.23. Use export from snippet module instead"
Expand Down
20 changes: 10 additions & 10 deletions src/query/all_query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::docset::{DocSet, BUFFER_LEN, TERMINATED};
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
use crate::index::SegmentReader;
use crate::query::boost_query::BoostScorer;
use crate::query::explanation::does_not_match;
Expand Down Expand Up @@ -54,7 +54,7 @@ impl DocSet for AllScorer {
self.doc
}

fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
if self.doc() == TERMINATED {
return 0;
}
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Scorer for AllScorer {
#[cfg(test)]
mod tests {
use super::AllQuery;
use crate::docset::{DocSet, BUFFER_LEN, TERMINATED};
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
use crate::query::{AllScorer, EnableScoring, Query};
use crate::schema::{Schema, TEXT};
use crate::{Index, IndexWriter};
Expand Down Expand Up @@ -162,16 +162,16 @@ mod tests {
pub fn test_fill_buffer() {
let mut postings = AllScorer {
doc: 0u32,
max_doc: BUFFER_LEN as u32 * 2 + 9,
max_doc: COLLECT_BLOCK_BUFFER_LEN as u32 * 2 + 9,
};
let mut buffer = [0u32; BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 {
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i);
}
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + BUFFER_LEN as u32);
assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + COLLECT_BLOCK_BUFFER_LEN as u32);
}
assert_eq!(postings.fill_buffer(&mut buffer), 9);
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/boolean_query/boolean_weight.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use crate::docset::BUFFER_LEN;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::index::SegmentReader;
use crate::postings::FreqReadingOption;
use crate::query::explanation::does_not_match;
Expand Down Expand Up @@ -228,7 +228,7 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
callback: &mut dyn FnMut(&[DocId]),
) -> crate::Result<()> {
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
let mut buffer = [0u32; BUFFER_LEN];
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];

match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
Expand Down
4 changes: 2 additions & 2 deletions src/query/boost_query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;

use crate::docset::BUFFER_LEN;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::fastfield::AliveBitSet;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::{DocId, DocSet, Score, SegmentReader, Term};
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
self.underlying.seek(target)
}

fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
self.underlying.fill_buffer(buffer)
}

Expand Down
4 changes: 2 additions & 2 deletions src/query/const_score_query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;

use crate::docset::BUFFER_LEN;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::{DocId, DocSet, Score, SegmentReader, TantivyError, Term};

Expand Down Expand Up @@ -119,7 +119,7 @@ impl<TDocSet: DocSet> DocSet for ConstScorer<TDocSet> {
self.docset.seek(target)
}

fn fill_buffer(&mut self, buffer: &mut [DocId; BUFFER_LEN]) -> usize {
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
self.docset.fill_buffer(buffer)
}

Expand Down
4 changes: 2 additions & 2 deletions src/query/term_query/term_weight.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::term_scorer::TermScorer;
use crate::docset::{DocSet, BUFFER_LEN};
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::fieldnorm::FieldNormReader;
use crate::index::SegmentReader;
use crate::postings::SegmentPostings;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Weight for TermWeight {
callback: &mut dyn FnMut(&[DocId]),
) -> crate::Result<()> {
let mut scorer = self.specialized_scorer(reader, 1.0)?;
let mut buffer = [0u32; BUFFER_LEN];
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut scorer, &mut buffer, callback);
Ok(())
}
Expand Down
16 changes: 8 additions & 8 deletions src/query/vec_docset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl HasLen for VecDocSet {
pub mod tests {

use super::*;
use crate::docset::{DocSet, BUFFER_LEN};
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::DocId;

#[test]
Expand All @@ -72,16 +72,16 @@ pub mod tests {

#[test]
pub fn test_fill_buffer() {
let doc_ids: Vec<DocId> = (1u32..=(BUFFER_LEN as u32 * 2 + 9)).collect();
let doc_ids: Vec<DocId> = (1u32..=(COLLECT_BLOCK_BUFFER_LEN as u32 * 2 + 9)).collect();
let mut postings = VecDocSet::from(doc_ids);
let mut buffer = [0u32; BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 {
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + 1);
}
assert_eq!(postings.fill_buffer(&mut buffer), BUFFER_LEN);
for i in 0u32..BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + 1 + BUFFER_LEN as u32);
assert_eq!(postings.fill_buffer(&mut buffer), COLLECT_BLOCK_BUFFER_LEN);
for i in 0u32..COLLECT_BLOCK_BUFFER_LEN as u32 {
assert_eq!(buffer[i as usize], i + 1 + COLLECT_BLOCK_BUFFER_LEN as u32);
}
assert_eq!(postings.fill_buffer(&mut buffer), 9);
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/weight.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Scorer;
use crate::docset::BUFFER_LEN;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::index::SegmentReader;
use crate::query::Explanation;
use crate::{DocId, DocSet, Score, TERMINATED};
Expand All @@ -22,7 +22,7 @@ pub(crate) fn for_each_scorer<TScorer: Scorer + ?Sized>(
#[inline]
pub(crate) fn for_each_docset_buffered<T: DocSet + ?Sized>(
docset: &mut T,
buffer: &mut [DocId; BUFFER_LEN],
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
mut callback: impl FnMut(&[DocId]),
) {
loop {
Expand Down Expand Up @@ -105,7 +105,7 @@ pub trait Weight: Send + Sync + 'static {
) -> crate::Result<()> {
let mut docset = self.scorer(reader, 1.0)?;

let mut buffer = [0u32; BUFFER_LEN];
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut docset, &mut buffer, callback);
Ok(())
}
Expand Down

0 comments on commit 67ebba3

Please sign in to comment.