Skip to content

Commit

Permalink
add fast path for full columns in fetch_block
Browse files Browse the repository at this point in the history
Spotted in `range_date_histogram` query in quickwit benchmark:
5% of time copying docs around, which is not needed in the full index case

remove Column to ColumnIndex deref
  • Loading branch information
PSeitz committed Mar 13, 2024
1 parent f6b0cc1 commit 729e8ab
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 32 deletions.
56 changes: 41 additions & 15 deletions columnar/src/block_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,32 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
ColumnBlockAccessor<T>
{
#[inline]
pub fn fetch_block(&mut self, docs: &[u32], accessor: &Column<T>) {
self.docid_cache.clear();
self.row_id_cache.clear();
accessor.row_ids_for_docs(docs, &mut self.docid_cache, &mut self.row_id_cache);
self.val_cache.resize(self.row_id_cache.len(), T::default());
accessor
.values
.get_vals(&self.row_id_cache, &mut self.val_cache);
pub fn fetch_block<'a>(&'a mut self, docs: &'a [u32], accessor: &Column<T>) {
if accessor.index.get_cardinality().is_full() {
self.val_cache.resize(docs.len(), T::default());
accessor.values.get_vals(docs, &mut self.val_cache);
} else {
self.docid_cache.clear();
self.row_id_cache.clear();
accessor.row_ids_for_docs(docs, &mut self.docid_cache, &mut self.row_id_cache);
self.val_cache.resize(self.row_id_cache.len(), T::default());
accessor
.values
.get_vals(&self.row_id_cache, &mut self.val_cache);
}
}
#[inline]
pub fn fetch_block_with_missing(&mut self, docs: &[u32], accessor: &Column<T>, missing: T) {
self.fetch_block(docs, accessor);
// We can compare docid_cache with docs to find missing docs
if docs.len() != self.docid_cache.len() || accessor.index.is_multivalue() {
// no missing values
if accessor.index.get_cardinality().is_full() {
return;
}

// We can compare docid_cache length with docs to find missing docs
// For multi value columns we can't rely on the length and always need to scan
if accessor.index.get_cardinality().is_multivalue() || docs.len() != self.docid_cache.len()
{
self.missing_docids_cache.clear();
find_missing_docs(docs, &self.docid_cache, |doc| {
self.missing_docids_cache.push(doc);
Expand All @@ -44,11 +56,25 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
}

#[inline]
pub fn iter_docid_vals(&self) -> impl Iterator<Item = (DocId, T)> + '_ {
self.docid_cache
.iter()
.cloned()
.zip(self.val_cache.iter().cloned())
/// Returns an iterator over the docids and values
/// The passed in `docs` slice needs to be the same slice that was passed to `fetch_block` or
/// `fetch_block_with_missing`.
///
/// The docs is used if the column is full (each docs has exactly one value), otherwise the
/// internal docid vec is used for the iterator, which e.g. may contain duplicate docs.
pub fn iter_docid_vals<'a>(
&'a self,
docs: &'a [u32],
accessor: &Column<T>,
) -> impl Iterator<Item = (DocId, T)> + '_ {
if accessor.index.get_cardinality().is_full() {
docs.iter().cloned().zip(self.val_cache.iter().cloned())
} else {
self.docid_cache
.iter()
.cloned()
.zip(self.val_cache.iter().cloned())
}
}
}

Expand Down
13 changes: 3 additions & 10 deletions columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod serialize;

use std::fmt::{self, Debug};
use std::io::Write;
use std::ops::{Deref, Range, RangeInclusive};
use std::ops::{Range, RangeInclusive};
use std::sync::Arc;

use common::BinarySerializable;
Expand Down Expand Up @@ -105,7 +105,8 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
}

pub fn values_for_doc(&self, doc_id: DocId) -> impl Iterator<Item = T> + '_ {
self.value_row_ids(doc_id)
self.index
.value_row_ids(doc_id)
.map(|value_row_id: RowId| self.values.get_val(value_row_id))
}

Expand Down Expand Up @@ -147,14 +148,6 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
}
}

impl<T> Deref for Column<T> {
type Target = ColumnIndex;

fn deref(&self) -> &Self::Target {
&self.index
}
}

impl BinarySerializable for Cardinality {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> std::io::Result<()> {
self.to_code().serialize(writer)
Expand Down
4 changes: 0 additions & 4 deletions columnar/src/column_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ impl From<MultiValueIndex> for ColumnIndex {
}

impl ColumnIndex {
#[inline]
pub fn is_multivalue(&self) -> bool {
matches!(self, ColumnIndex::Multivalued(_))
}
/// Returns the cardinality of the column index.
///
/// By convention, if the column contains no docs, we consider that it is
Expand Down
3 changes: 3 additions & 0 deletions columnar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl Cardinality {
pub fn is_multivalue(&self) -> bool {
matches!(self, Cardinality::Multivalued)
}
pub fn is_full(&self) -> bool {
matches!(self, Cardinality::Full)
}
pub(crate) fn to_code(self) -> u8 {
self as u8
}
Expand Down
5 changes: 4 additions & 1 deletion src/aggregation/bucket/histogram/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);

for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
for (doc, val) in bucket_agg_accessor
.column_block_accessor
.iter_docid_vals(docs, &bucket_agg_accessor.accessor)
{
let val = self.f64_from_fastfield_u64(val);

let bucket_pos = get_bucket_pos(val);
Expand Down
5 changes: 4 additions & 1 deletion src/aggregation/bucket/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ impl SegmentAggregationCollector for SegmentRangeCollector {
.column_block_accessor
.fetch_block(docs, &bucket_agg_accessor.accessor);

for (doc, val) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
for (doc, val) in bucket_agg_accessor
.column_block_accessor
.iter_docid_vals(docs, &bucket_agg_accessor.accessor)
{
let bucket_pos = self.get_bucket_pos(val);

let bucket = &mut self.buckets[bucket_pos];
Expand Down
5 changes: 4 additions & 1 deletion src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ impl SegmentAggregationCollector for SegmentTermCollector {
}
// has subagg
if let Some(blueprint) = self.blueprint.as_ref() {
for (doc, term_id) in bucket_agg_accessor.column_block_accessor.iter_docid_vals() {
for (doc, term_id) in bucket_agg_accessor
.column_block_accessor
.iter_docid_vals(docs, &bucket_agg_accessor.accessor)
{
let sub_aggregations = self
.term_buckets
.sub_aggs
Expand Down

0 comments on commit 729e8ab

Please sign in to comment.