Skip to content

Commit

Permalink
fix cardinality aggregation performance (#2446)
Browse files Browse the repository at this point in the history
* fix cardinality aggregation performance

fix cardinality performance by fetching multiple terms at once. This
avoids decompressing the same block and keeps the buffer state between
terms.

add cardinality aggregation benchmark

bump rust version to 1.66

Performance comparison to before (AllQuery)
```
full
cardinality_agg                   Memory: 3.5 MB (-0.00%)    Avg: 21.2256ms (-97.78%)    Median: 21.0042ms (-97.82%)    [20.4717ms .. 23.6206ms]
terms_few_with_cardinality_agg    Memory: 10.6 MB            Avg: 81.9293ms (-97.37%)    Median: 81.5526ms (-97.38%)    [79.7564ms .. 88.0374ms]
dense
cardinality_agg                   Memory: 3.6 MB (-0.00%)    Avg: 25.9372ms (-97.24%)    Median: 25.7744ms (-97.25%)    [24.7241ms .. 27.8793ms]
terms_few_with_cardinality_agg    Memory: 10.6 MB            Avg: 93.9897ms (-96.91%)    Median: 92.7821ms (-96.94%)    [90.3312ms .. 117.4076ms]
sparse
cardinality_agg                   Memory: 895.4 KB (-0.00%)    Avg: 22.5113ms (-95.01%)    Median: 22.5629ms (-94.99%)    [22.1628ms .. 22.9436ms]
terms_few_with_cardinality_agg    Memory: 680.2 KB             Avg: 26.4250ms (-94.85%)    Median: 26.4135ms (-94.86%)    [26.3210ms .. 26.6774ms]
```

* clippy

* assert for sorted ordinals
  • Loading branch information
PSeitz committed Jul 2, 2024
1 parent 0f4c2e2 commit 56d79cb
Show file tree
Hide file tree
Showing 19 changed files with 211 additions and 50 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
readme = "README.md"
keywords = ["search", "information", "retrieval"]
edition = "2021"
rust-version = "1.63"
rust-version = "1.66"
exclude = ["benches/*.json", "benches/*.txt"]

[dependencies]
Expand All @@ -38,7 +38,7 @@ levenshtein_automata = "0.2.1"
uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.0"
downcast-rs = "1.2.1"
bitpacking = { version = "0.9.2", default-features = false, features = [
"bitpacker4x",
] }
Expand Down
36 changes: 34 additions & 2 deletions benches/agg_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ fn bench_agg(mut group: InputGroup<Index>) {
register!(group, terms_many_order_by_term);
register!(group, terms_many_with_top_hits);
register!(group, terms_many_with_avg_sub_agg);
register!(group, terms_many_json_mixed_type_with_sub_agg_card);
register!(group, terms_many_json_mixed_type_with_avg_sub_agg);

register!(group, cardinality_agg);
register!(group, terms_few_with_cardinality_agg);

register!(group, range_agg);
register!(group, range_agg_with_avg_sub_agg);
register!(group, range_agg_with_term_agg_few);
Expand Down Expand Up @@ -123,6 +127,33 @@ fn percentiles_f64(index: &Index) {
});
execute_agg(index, agg_req);
}

fn cardinality_agg(index: &Index) {
let agg_req = json!({
"cardinality": {
"cardinality": {
"field": "text_many_terms"
},
}
});
execute_agg(index, agg_req);
}
fn terms_few_with_cardinality_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_few_terms" },
"aggs": {
"cardinality": {
"cardinality": {
"field": "text_many_terms"
},
}
}
},
});
execute_agg(index, agg_req);
}

fn terms_few(index: &Index) {
let agg_req = json!({
"my_texts": { "terms": { "field": "text_few_terms" } },
Expand Down Expand Up @@ -171,7 +202,7 @@ fn terms_many_with_avg_sub_agg(index: &Index) {
});
execute_agg(index, agg_req);
}
fn terms_many_json_mixed_type_with_sub_agg_card(index: &Index) {
fn terms_many_json_mixed_type_with_avg_sub_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "json.mixed_type" },
Expand Down Expand Up @@ -268,6 +299,7 @@ fn range_agg_with_term_agg_many(index: &Index) {
});
execute_agg(index, agg_req);
}

fn histogram(index: &Index) {
let agg_req = json!({
"rangef64": {
Expand Down
6 changes: 3 additions & 3 deletions src/aggregation/agg_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::bucket::{
use super::metric::{
AverageAggregation, CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation,
MaxAggregation, MinAggregation, PercentilesAggregationReq, StatsAggregation, SumAggregation,
TopHitsAggregation,
TopHitsAggregationReq,
};

/// The top-level aggregation request structure, which contains [`Aggregation`] and their user
Expand Down Expand Up @@ -160,7 +160,7 @@ pub enum AggregationVariants {
Percentiles(PercentilesAggregationReq),
/// Finds the top k values matching some order
#[serde(rename = "top_hits")]
TopHits(TopHitsAggregation),
TopHits(TopHitsAggregationReq),
/// Computes an estimate of the number of unique values
#[serde(rename = "cardinality")]
Cardinality(CardinalityAggregationReq),
Expand Down Expand Up @@ -208,7 +208,7 @@ impl AggregationVariants {
_ => None,
}
}
pub(crate) fn as_top_hits(&self) -> Option<&TopHitsAggregation> {
pub(crate) fn as_top_hits(&self) -> Option<&TopHitsAggregationReq> {
match &self {
AggregationVariants::TopHits(top_hits) => Some(top_hits),
_ => None,
Expand Down
76 changes: 62 additions & 14 deletions src/aggregation/metric/cardinality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{BuildHasher, Hasher};

use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{BytesColumn, StrColumn};
use columnar::Dictionary;
use common::f64_to_u64;
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use rustc_hash::FxHashSet;
Expand Down Expand Up @@ -38,7 +38,53 @@ impl BuildHasher for BuildSaltedHasher {
///
/// The cardinality aggregation allows for computing an estimate
/// of the number of different values in a data set based on the
/// HyperLogLog++ alogrithm.
/// HyperLogLog++ algorithm. This is particularly useful for understanding the
/// uniqueness of values in a large dataset where counting each unique value
/// individually would be computationally expensive.
///
/// For example, you might use a cardinality aggregation to estimate the number
/// of unique visitors to a website by aggregating on a field that contains
/// user IDs or session IDs.
///
/// To use the cardinality aggregation, you'll need to provide a field to
/// aggregate on. The following example demonstrates a request for the cardinality
/// of the "user_id" field:
///
/// ```JSON
/// {
/// "cardinality": {
/// "field": "user_id"
/// }
/// }
/// ```
///
/// This request will return an estimate of the number of unique values in the
/// "user_id" field.
///
/// ## Missing Values
///
/// The `missing` parameter defines how documents that are missing a value should be treated.
/// By default, documents without a value for the specified field are ignored. However, you can
/// specify a default value for these documents using the `missing` parameter. This can be useful
/// when you want to include documents with missing values in the aggregation.
///
/// For example, the following request treats documents with missing values in the "user_id"
/// field as if they had a value of "unknown":
///
/// ```JSON
/// {
/// "cardinality": {
/// "field": "user_id",
/// "missing": "unknown"
/// }
/// }
/// ```
///
/// # Estimation Accuracy
///
/// The cardinality aggregation provides an approximate count, which is usually
/// accurate within a small error range. This trade-off allows for efficient
/// computation even on very large datasets.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct CardinalityAggregationReq {
/// The field name to compute the percentiles on.
Expand Down Expand Up @@ -108,27 +154,29 @@ impl SegmentCardinalityCollector {
agg_with_accessor: &AggregationWithAccessor,
) -> crate::Result<IntermediateMetricResult> {
if self.column_type == ColumnType::Str {
let mut buffer = String::new();
let term_dict = agg_with_accessor
let fallback_dict = Dictionary::empty();
let dict = agg_with_accessor
.str_dict_column
.as_ref()
.cloned()
.unwrap_or_else(|| {
StrColumn::wrap(BytesColumn::empty(agg_with_accessor.accessor.num_docs()))
});
.map(|el| el.dictionary())
.unwrap_or_else(|| &fallback_dict);
let mut has_missing = false;

// TODO: replace FxHashSet with something that allows iterating in order
// (e.g. sparse bitvec)
let mut term_ids = Vec::new();
for term_ord in self.entries.into_iter() {
if term_ord == u64::MAX {
has_missing = true;
} else {
if !term_dict.ord_to_str(term_ord, &mut buffer)? {
return Err(TantivyError::InternalError(format!(
"Couldn't find term_ord {term_ord} in dict"
)));
}
self.cardinality.sketch.insert_any(&buffer);
// we can reasonably exclude values above u32::MAX
term_ids.push(term_ord as u32);
}
}
term_ids.sort_unstable();
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
self.cardinality.sketch.insert_any(&term);
})?;
if has_missing {
let missing_key = self
.missing
Expand Down
14 changes: 7 additions & 7 deletions src/aggregation/metric/top_hits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ use crate::{DocAddress, DocId, SegmentOrdinal};
/// }
/// ```
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct TopHitsAggregation {
pub struct TopHitsAggregationReq {
sort: Vec<KeyOrder>,
size: usize,
from: Option<usize>,
Expand Down Expand Up @@ -164,7 +164,7 @@ fn unsupported_err(parameter: &str) -> crate::Result<()> {
))
}

impl TopHitsAggregation {
impl TopHitsAggregationReq {
/// Validate and resolve field retrieval parameters
pub fn validate_and_resolve_field_names(
&mut self,
Expand Down Expand Up @@ -431,7 +431,7 @@ impl Eq for DocSortValuesAndFields {}
/// The TopHitsCollector used for collecting over segments and merging results.
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct TopHitsTopNComputer {
req: TopHitsAggregation,
req: TopHitsAggregationReq,
top_n: TopNComputer<DocSortValuesAndFields, DocAddress, false>,
}

Expand All @@ -443,7 +443,7 @@ impl std::cmp::PartialEq for TopHitsTopNComputer {

impl TopHitsTopNComputer {
/// Create a new TopHitsCollector
pub fn new(req: &TopHitsAggregation) -> Self {
pub fn new(req: &TopHitsAggregationReq) -> Self {
Self {
top_n: TopNComputer::new(req.size + req.from.unwrap_or(0)),
req: req.clone(),
Expand Down Expand Up @@ -496,7 +496,7 @@ pub(crate) struct TopHitsSegmentCollector {

impl TopHitsSegmentCollector {
pub fn from_req(
req: &TopHitsAggregation,
req: &TopHitsAggregationReq,
accessor_idx: usize,
segment_ordinal: SegmentOrdinal,
) -> Self {
Expand All @@ -509,7 +509,7 @@ impl TopHitsSegmentCollector {
fn into_top_hits_collector(
self,
value_accessors: &HashMap<String, Vec<DynamicColumn>>,
req: &TopHitsAggregation,
req: &TopHitsAggregationReq,
) -> TopHitsTopNComputer {
let mut top_hits_computer = TopHitsTopNComputer::new(req);
let top_results = self.top_n.into_vec();
Expand All @@ -532,7 +532,7 @@ impl TopHitsSegmentCollector {
fn collect_with(
&mut self,
doc_id: crate::DocId,
req: &TopHitsAggregation,
req: &TopHitsAggregationReq,
accessors: &[(Column<u64>, ColumnType)],
) -> crate::Result<()> {
let sorts: Vec<DocValueAndOrder> = req
Expand Down
3 changes: 3 additions & 0 deletions src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@
//! - [Metric](metric)
//! - [Average](metric::AverageAggregation)
//! - [Stats](metric::StatsAggregation)
//! - [ExtendedStats](metric::ExtendedStatsAggregation)
//! - [Min](metric::MinAggregation)
//! - [Max](metric::MaxAggregation)
//! - [Sum](metric::SumAggregation)
//! - [Count](metric::CountAggregation)
//! - [Percentiles](metric::PercentilesAggregationReq)
//! - [Cardinality](metric::CardinalityAggregationReq)
//! - [TopHits](metric::TopHitsAggregationReq)
//!
//! # Example
//! Compute the average metric, by building [`agg_req::Aggregations`], which is built from an
Expand Down
4 changes: 2 additions & 2 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1574,11 +1574,11 @@ mod tests {
deleted_ids.remove(id);
}
IndexingOp::DeleteDoc { id } => {
existing_ids.remove(&id);
existing_ids.remove(id);
deleted_ids.insert(*id);
}
IndexingOp::DeleteDocQuery { id } => {
existing_ids.remove(&id);
existing_ids.remove(id);
deleted_ids.insert(*id);
}
_ => {}
Expand Down
2 changes: 1 addition & 1 deletion src/positions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! In "The beauty and the beast", the term "the" appears in position 0 and position 3.
//! This information is useful to run phrase queries.
//!
//! The [position](crate::SegmentComponent::Positions) file contains all of the
//! The [position](crate::index::SegmentComponent::Positions) file contains all of the
//! bitpacked positions delta, for all terms of a given field, one term after the other.
//!
//! Each term is encoded independently.
Expand Down
3 changes: 1 addition & 2 deletions src/query/boolean_query/boolean_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ impl Clone for BooleanQuery {
.subqueries
.iter()
.map(|(occur, subquery)| (*occur, subquery.box_clone()))
.collect::<Vec<_>>()
.into();
.collect::<Vec<_>>();
Self {
subqueries,
minimum_number_should_match: self.minimum_number_should_match,
Expand Down
2 changes: 1 addition & 1 deletion src/query/boolean_query/boolean_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<TScoreCombiner: ScoreCombiner> BooleanWeight<TScoreCombiner> {
scorer_union(should_scorers, &score_combiner_fn),
&score_combiner_fn,
)),
n @ _ if num_of_should_scorers == n => {
n if num_of_should_scorers == n => {
// When num_of_should_scorers equals the number of should clauses,
// they are no different from must clauses.
must_scorers = match must_scorers.take() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/disjunction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> DocSet
self.current_doc = TERMINATED;
}
self.current_score = self.score_combiner.score();
return self.current_doc;
self.current_doc
}

#[inline]
Expand Down
4 changes: 2 additions & 2 deletions src/schema/document/default_document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl std::fmt::Debug for ValueAddr {

/// A enum representing a value for tantivy to index.
///
/// Any changes need to be reflected in `BinarySerializable` for `ValueType`
/// ** Any changes need to be reflected in `BinarySerializable` for `ValueType` **
///
/// We can't use [schema::Type] or [columnar::ColumnType] here, because they are missing
/// some items like Array and PreTokStr.
Expand Down Expand Up @@ -553,7 +553,7 @@ impl BinarySerializable for ValueType {
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let num = u8::deserialize(reader)?;
let type_id = if (0..=12).contains(&num) {
unsafe { std::mem::transmute(num) }
unsafe { std::mem::transmute::<u8, ValueType>(num) }
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down
4 changes: 1 addition & 3 deletions sstable/benches/ord_to_term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ fn make_test_sstable(suffix: &str) -> FileSlice {

let table = builder.finish().unwrap();
let table = Arc::new(OwnedBytes::new(table));
let slice = common::file_slice::FileSlice::new(table.clone());

slice
common::file_slice::FileSlice::new(table.clone())
}

pub fn criterion_benchmark(c: &mut Criterion) {
Expand Down
2 changes: 1 addition & 1 deletion sstable/benches/stream_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tantivy_sstable::{Dictionary, MonotonicU64SSTable};

const CHARSET: &'static [u8] = b"abcdefghij";
const CHARSET: &[u8] = b"abcdefghij";

fn generate_key(rng: &mut impl Rng) -> String {
let len = rng.gen_range(3..12);
Expand Down
Loading

0 comments on commit 56d79cb

Please sign in to comment.