diff --git a/Cargo.toml b/Cargo.toml index ff1a05feaa..18f0b66936 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy" readme = "README.md" keywords = ["search", "information", "retrieval"] edition = "2021" -rust-version = "1.63" +rust-version = "1.66" exclude = ["benches/*.json", "benches/*.txt"] [dependencies] @@ -38,7 +38,7 @@ levenshtein_automata = "0.2.1" uuid = { version = "1.0.0", features = ["v4", "serde"] } crossbeam-channel = "0.5.4" rust-stemmers = "1.2.0" -downcast-rs = "1.2.0" +downcast-rs = "1.2.1" bitpacking = { version = "0.9.2", default-features = false, features = [ "bitpacker4x", ] } diff --git a/benches/agg_bench.rs b/benches/agg_bench.rs index f7a06608f3..05379bea11 100644 --- a/benches/agg_bench.rs +++ b/benches/agg_bench.rs @@ -54,7 +54,11 @@ fn bench_agg(mut group: InputGroup) { register!(group, terms_many_order_by_term); register!(group, terms_many_with_top_hits); register!(group, terms_many_with_avg_sub_agg); - register!(group, terms_many_json_mixed_type_with_sub_agg_card); + register!(group, terms_many_json_mixed_type_with_avg_sub_agg); + + register!(group, cardinality_agg); + register!(group, terms_few_with_cardinality_agg); + register!(group, range_agg); register!(group, range_agg_with_avg_sub_agg); register!(group, range_agg_with_term_agg_few); @@ -123,6 +127,33 @@ fn percentiles_f64(index: &Index) { }); execute_agg(index, agg_req); } + +fn cardinality_agg(index: &Index) { + let agg_req = json!({ + "cardinality": { + "cardinality": { + "field": "text_many_terms" + }, + } + }); + execute_agg(index, agg_req); +} +fn terms_few_with_cardinality_agg(index: &Index) { + let agg_req = json!({ + "my_texts": { + "terms": { "field": "text_few_terms" }, + "aggs": { + "cardinality": { + "cardinality": { + "field": "text_many_terms" + }, + } + } + }, + }); + execute_agg(index, agg_req); +} + fn terms_few(index: &Index) { let agg_req = json!({ "my_texts": { "terms": { "field": "text_few_terms" } }, @@ -171,7 +202,7 @@ fn terms_many_with_avg_sub_agg(index: &Index) { }); execute_agg(index, agg_req); } -fn terms_many_json_mixed_type_with_sub_agg_card(index: &Index) { +fn terms_many_json_mixed_type_with_avg_sub_agg(index: &Index) { let agg_req = json!({ "my_texts": { "terms": { "field": "json.mixed_type" }, @@ -268,6 +299,7 @@ fn range_agg_with_term_agg_many(index: &Index) { }); execute_agg(index, agg_req); } + fn histogram(index: &Index) { let agg_req = json!({ "rangef64": { diff --git a/src/aggregation/agg_req.rs b/src/aggregation/agg_req.rs index f2a7a6aed7..7d06fae2b3 100644 --- a/src/aggregation/agg_req.rs +++ b/src/aggregation/agg_req.rs @@ -36,7 +36,7 @@ use super::bucket::{ use super::metric::{ AverageAggregation, CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation, MaxAggregation, MinAggregation, PercentilesAggregationReq, StatsAggregation, SumAggregation, - TopHitsAggregation, + TopHitsAggregationReq, }; /// The top-level aggregation request structure, which contains [`Aggregation`] and their user @@ -160,7 +160,7 @@ pub enum AggregationVariants { Percentiles(PercentilesAggregationReq), /// Finds the top k values matching some order #[serde(rename = "top_hits")] - TopHits(TopHitsAggregation), + TopHits(TopHitsAggregationReq), /// Computes an estimate of the number of unique values #[serde(rename = "cardinality")] Cardinality(CardinalityAggregationReq), @@ -208,7 +208,7 @@ impl AggregationVariants { _ => None, } } - pub(crate) fn as_top_hits(&self) -> Option<&TopHitsAggregation> { + pub(crate) fn as_top_hits(&self) -> Option<&TopHitsAggregationReq> { match &self { AggregationVariants::TopHits(top_hits) => Some(top_hits), _ => None, diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index 7e0dfc2ab1..d8b2c496a3 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -2,7 +2,7 @@ use std::collections::hash_map::DefaultHasher; use std::hash::{BuildHasher, Hasher}; use columnar::column_values::CompactSpaceU64Accessor; -use columnar::{BytesColumn, StrColumn}; +use columnar::Dictionary; use common::f64_to_u64; use hyperloglogplus::{HyperLogLog, HyperLogLogPlus}; use rustc_hash::FxHashSet; @@ -38,7 +38,53 @@ impl BuildHasher for BuildSaltedHasher { /// /// The cardinality aggregation allows for computing an estimate /// of the number of different values in a data set based on the -/// HyperLogLog++ alogrithm. +/// HyperLogLog++ algorithm. This is particularly useful for understanding the +/// uniqueness of values in a large dataset where counting each unique value +/// individually would be computationally expensive. +/// +/// For example, you might use a cardinality aggregation to estimate the number +/// of unique visitors to a website by aggregating on a field that contains +/// user IDs or session IDs. +/// +/// To use the cardinality aggregation, you'll need to provide a field to +/// aggregate on. The following example demonstrates a request for the cardinality +/// of the "user_id" field: +/// +/// ```JSON +/// { +/// "cardinality": { +/// "field": "user_id" +/// } +/// } +/// ``` +/// +/// This request will return an estimate of the number of unique values in the +/// "user_id" field. +/// +/// ## Missing Values +/// +/// The `missing` parameter defines how documents that are missing a value should be treated. +/// By default, documents without a value for the specified field are ignored. However, you can +/// specify a default value for these documents using the `missing` parameter. This can be useful +/// when you want to include documents with missing values in the aggregation. +/// +/// For example, the following request treats documents with missing values in the "user_id" +/// field as if they had a value of "unknown": +/// +/// ```JSON +/// { +/// "cardinality": { +/// "field": "user_id", +/// "missing": "unknown" +/// } +/// } +/// ``` +/// +/// # Estimation Accuracy +/// +/// The cardinality aggregation provides an approximate count, which is usually +/// accurate within a small error range. This trade-off allows for efficient +/// computation even on very large datasets. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct CardinalityAggregationReq { /// The field name to compute the percentiles on. @@ -108,27 +154,29 @@ impl SegmentCardinalityCollector { agg_with_accessor: &AggregationWithAccessor, ) -> crate::Result { if self.column_type == ColumnType::Str { - let mut buffer = String::new(); - let term_dict = agg_with_accessor + let fallback_dict = Dictionary::empty(); + let dict = agg_with_accessor .str_dict_column .as_ref() - .cloned() - .unwrap_or_else(|| { - StrColumn::wrap(BytesColumn::empty(agg_with_accessor.accessor.num_docs())) - }); + .map(|el| el.dictionary()) + .unwrap_or_else(|| &fallback_dict); let mut has_missing = false; + + // TODO: replace FxHashSet with something that allows iterating in order + // (e.g. sparse bitvec) + let mut term_ids = Vec::new(); for term_ord in self.entries.into_iter() { if term_ord == u64::MAX { has_missing = true; } else { - if !term_dict.ord_to_str(term_ord, &mut buffer)? { - return Err(TantivyError::InternalError(format!( - "Couldn't find term_ord {term_ord} in dict" - ))); - } - self.cardinality.sketch.insert_any(&buffer); + // we can reasonably exclude values above u32::MAX + term_ids.push(term_ord as u32); } } + term_ids.sort_unstable(); + dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| { + self.cardinality.sketch.insert_any(&term); + })?; if has_missing { let missing_key = self .missing diff --git a/src/aggregation/metric/top_hits.rs b/src/aggregation/metric/top_hits.rs index 6c18805162..ab25efd2af 100644 --- a/src/aggregation/metric/top_hits.rs +++ b/src/aggregation/metric/top_hits.rs @@ -89,7 +89,7 @@ use crate::{DocAddress, DocId, SegmentOrdinal}; /// } /// ``` #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] -pub struct TopHitsAggregation { +pub struct TopHitsAggregationReq { sort: Vec, size: usize, from: Option, @@ -164,7 +164,7 @@ fn unsupported_err(parameter: &str) -> crate::Result<()> { )) } -impl TopHitsAggregation { +impl TopHitsAggregationReq { /// Validate and resolve field retrieval parameters pub fn validate_and_resolve_field_names( &mut self, @@ -431,7 +431,7 @@ impl Eq for DocSortValuesAndFields {} /// The TopHitsCollector used for collecting over segments and merging results. #[derive(Clone, Serialize, Deserialize, Debug)] pub struct TopHitsTopNComputer { - req: TopHitsAggregation, + req: TopHitsAggregationReq, top_n: TopNComputer, } @@ -443,7 +443,7 @@ impl std::cmp::PartialEq for TopHitsTopNComputer { impl TopHitsTopNComputer { /// Create a new TopHitsCollector - pub fn new(req: &TopHitsAggregation) -> Self { + pub fn new(req: &TopHitsAggregationReq) -> Self { Self { top_n: TopNComputer::new(req.size + req.from.unwrap_or(0)), req: req.clone(), @@ -496,7 +496,7 @@ pub(crate) struct TopHitsSegmentCollector { impl TopHitsSegmentCollector { pub fn from_req( - req: &TopHitsAggregation, + req: &TopHitsAggregationReq, accessor_idx: usize, segment_ordinal: SegmentOrdinal, ) -> Self { @@ -509,7 +509,7 @@ impl TopHitsSegmentCollector { fn into_top_hits_collector( self, value_accessors: &HashMap>, - req: &TopHitsAggregation, + req: &TopHitsAggregationReq, ) -> TopHitsTopNComputer { let mut top_hits_computer = TopHitsTopNComputer::new(req); let top_results = self.top_n.into_vec(); @@ -532,7 +532,7 @@ impl TopHitsSegmentCollector { fn collect_with( &mut self, doc_id: crate::DocId, - req: &TopHitsAggregation, + req: &TopHitsAggregationReq, accessors: &[(Column, ColumnType)], ) -> crate::Result<()> { let sorts: Vec = req diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index d83059a20c..72a37703c2 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -44,11 +44,14 @@ //! - [Metric](metric) //! - [Average](metric::AverageAggregation) //! - [Stats](metric::StatsAggregation) +//! - [ExtendedStats](metric::ExtendedStatsAggregation) //! - [Min](metric::MinAggregation) //! - [Max](metric::MaxAggregation) //! - [Sum](metric::SumAggregation) //! - [Count](metric::CountAggregation) //! - [Percentiles](metric::PercentilesAggregationReq) +//! - [Cardinality](metric::CardinalityAggregationReq) +//! - [TopHits](metric::TopHitsAggregationReq) //! //! # Example //! Compute the average metric, by building [`agg_req::Aggregations`], which is built from an diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 3149127a08..c835e43b85 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1574,11 +1574,11 @@ mod tests { deleted_ids.remove(id); } IndexingOp::DeleteDoc { id } => { - existing_ids.remove(&id); + existing_ids.remove(id); deleted_ids.insert(*id); } IndexingOp::DeleteDocQuery { id } => { - existing_ids.remove(&id); + existing_ids.remove(id); deleted_ids.insert(*id); } _ => {} diff --git a/src/positions/mod.rs b/src/positions/mod.rs index cd98eb69d2..a02ce71bed 100644 --- a/src/positions/mod.rs +++ b/src/positions/mod.rs @@ -3,7 +3,7 @@ //! In "The beauty and the beast", the term "the" appears in position 0 and position 3. //! This information is useful to run phrase queries. //! -//! The [position](crate::SegmentComponent::Positions) file contains all of the +//! The [position](crate::index::SegmentComponent::Positions) file contains all of the //! bitpacked positions delta, for all terms of a given field, one term after the other. //! //! Each term is encoded independently. diff --git a/src/query/boolean_query/boolean_query.rs b/src/query/boolean_query/boolean_query.rs index 1c39d3d50e..ac225be20f 100644 --- a/src/query/boolean_query/boolean_query.rs +++ b/src/query/boolean_query/boolean_query.rs @@ -144,8 +144,7 @@ impl Clone for BooleanQuery { .subqueries .iter() .map(|(occur, subquery)| (*occur, subquery.box_clone())) - .collect::>() - .into(); + .collect::>(); Self { subqueries, minimum_number_should_match: self.minimum_number_should_match, diff --git a/src/query/boolean_query/boolean_weight.rs b/src/query/boolean_query/boolean_weight.rs index 77f847063c..c0a5e2c37f 100644 --- a/src/query/boolean_query/boolean_weight.rs +++ b/src/query/boolean_query/boolean_weight.rs @@ -170,7 +170,7 @@ impl BooleanWeight { scorer_union(should_scorers, &score_combiner_fn), &score_combiner_fn, )), - n @ _ if num_of_should_scorers == n => { + n if num_of_should_scorers == n => { // When num_of_should_scorers equals the number of should clauses, // they are no different from must clauses. must_scorers = match must_scorers.take() { diff --git a/src/query/disjunction.rs b/src/query/disjunction.rs index 2b631e5522..b2d9d67ffa 100644 --- a/src/query/disjunction.rs +++ b/src/query/disjunction.rs @@ -131,7 +131,7 @@ impl DocSet self.current_doc = TERMINATED; } self.current_score = self.score_combiner.score(); - return self.current_doc; + self.current_doc } #[inline] diff --git a/src/schema/document/default_document.rs b/src/schema/document/default_document.rs index ee87785fb3..f3abb40012 100644 --- a/src/schema/document/default_document.rs +++ b/src/schema/document/default_document.rs @@ -508,7 +508,7 @@ impl std::fmt::Debug for ValueAddr { /// A enum representing a value for tantivy to index. /// -/// Any changes need to be reflected in `BinarySerializable` for `ValueType` +/// ** Any changes need to be reflected in `BinarySerializable` for `ValueType` ** /// /// We can't use [schema::Type] or [columnar::ColumnType] here, because they are missing /// some items like Array and PreTokStr. @@ -553,7 +553,7 @@ impl BinarySerializable for ValueType { fn deserialize(reader: &mut R) -> io::Result { let num = u8::deserialize(reader)?; let type_id = if (0..=12).contains(&num) { - unsafe { std::mem::transmute(num) } + unsafe { std::mem::transmute::(num) } } else { return Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/sstable/benches/ord_to_term.rs b/sstable/benches/ord_to_term.rs index 9285af2e4e..db1823dd3b 100644 --- a/sstable/benches/ord_to_term.rs +++ b/sstable/benches/ord_to_term.rs @@ -16,9 +16,7 @@ fn make_test_sstable(suffix: &str) -> FileSlice { let table = builder.finish().unwrap(); let table = Arc::new(OwnedBytes::new(table)); - let slice = common::file_slice::FileSlice::new(table.clone()); - - slice + common::file_slice::FileSlice::new(table.clone()) } pub fn criterion_benchmark(c: &mut Criterion) { diff --git a/sstable/benches/stream_bench.rs b/sstable/benches/stream_bench.rs index d8df433e9a..23c8a22002 100644 --- a/sstable/benches/stream_bench.rs +++ b/sstable/benches/stream_bench.rs @@ -7,7 +7,7 @@ use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use tantivy_sstable::{Dictionary, MonotonicU64SSTable}; -const CHARSET: &'static [u8] = b"abcdefghij"; +const CHARSET: &[u8] = b"abcdefghij"; fn generate_key(rng: &mut impl Rng) -> String { let len = rng.gen_range(3..12); diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index b4821fe24a..de72f2a4b9 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -338,6 +338,45 @@ impl Dictionary { Ok(true) } + /// Returns the terms for a _sorted_ list of term ordinals. + /// + /// Returns true if and only if all terms have been found. + pub fn sorted_ords_to_term_cb( + &self, + ord: impl Iterator, + mut cb: F, + ) -> io::Result { + let mut bytes = Vec::new(); + let mut current_block_addr = self.sstable_index.get_block_with_ord(0); + let mut current_sstable_delta_reader = + self.sstable_delta_reader_block(current_block_addr.clone())?; + let mut current_ordinal = 0; + for ord in ord { + assert!(ord >= current_ordinal); + // check if block changed for new term_ord + let new_block_addr = self.sstable_index.get_block_with_ord(ord); + if new_block_addr != current_block_addr { + current_block_addr = new_block_addr; + current_ordinal = current_block_addr.first_ordinal; + current_sstable_delta_reader = + self.sstable_delta_reader_block(current_block_addr.clone())?; + bytes.clear(); + } + + // move to ord inside that block + for _ in current_ordinal..=ord { + if !current_sstable_delta_reader.advance()? { + return Ok(false); + } + bytes.truncate(current_sstable_delta_reader.common_prefix_len()); + bytes.extend_from_slice(current_sstable_delta_reader.suffix()); + } + current_ordinal = ord + 1; + cb(&bytes); + } + Ok(true) + } + /// Returns the number of terms in the dictionary. pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result> { // find block in which the term would be @@ -551,6 +590,49 @@ mod tests { assert!(dic.term_ord(b"1000").unwrap().is_none()); } + #[test] + fn test_ords_term() { + let (dic, _slice) = make_test_sstable(); + + // Single term + let mut terms = Vec::new(); + assert!(dic + .sorted_ords_to_term_cb(100_000..100_001, |term| { terms.push(term.to_vec()) }) + .unwrap()); + assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]); + // Single term + let mut terms = Vec::new(); + assert!(dic + .sorted_ords_to_term_cb(100_001..100_002, |term| { terms.push(term.to_vec()) }) + .unwrap()); + assert_eq!(terms, vec![format!("{:05X}", 100_001).into_bytes(),]); + // both terms + let mut terms = Vec::new(); + assert!(dic + .sorted_ords_to_term_cb(100_000..100_002, |term| { terms.push(term.to_vec()) }) + .unwrap()); + assert_eq!( + terms, + vec![ + format!("{:05X}", 100_000).into_bytes(), + format!("{:05X}", 100_001).into_bytes(), + ] + ); + // Test cross block + let mut terms = Vec::new(); + assert!(dic + .sorted_ords_to_term_cb(98653..=98655, |term| { terms.push(term.to_vec()) }) + .unwrap()); + assert_eq!( + terms, + vec![ + format!("{:05X}", 98653).into_bytes(), + format!("{:05X}", 98654).into_bytes(), + format!("{:05X}", 98655).into_bytes(), + ] + ); + } + #[test] fn test_range() { let (dic, slice) = make_test_sstable(); diff --git a/sstable/src/value/range.rs b/sstable/src/value/range.rs index f2591c54bb..ca24a4be78 100644 --- a/sstable/src/value/range.rs +++ b/sstable/src/value/range.rs @@ -78,6 +78,7 @@ impl ValueWriter for RangeValueWriter { } #[cfg(test)] +#[allow(clippy::single_range_in_vec_init)] mod tests { use super::*; diff --git a/sstable/src/vint.rs b/sstable/src/vint.rs index e15988d8bc..acf4d9f3bb 100644 --- a/sstable/src/vint.rs +++ b/sstable/src/vint.rs @@ -39,8 +39,6 @@ pub fn deserialize_read(buf: &[u8]) -> (usize, u64) { #[cfg(test)] mod tests { - use std::u64; - use super::{deserialize_read, serialize}; fn aux_test_int(val: u64, expect_len: usize) { diff --git a/stacker/benches/crit_bench.rs b/stacker/benches/crit_bench.rs index 7d29df235a..a7618883b6 100644 --- a/stacker/benches/crit_bench.rs +++ b/stacker/benches/crit_bench.rs @@ -54,7 +54,7 @@ fn bench_hashmap_throughput(c: &mut Criterion) { ); // numbers - let input_bytes = 1_000_000 * 8 as u64; + let input_bytes = 1_000_000 * 8; group.throughput(Throughput::Bytes(input_bytes)); let numbers: Vec<[u8; 8]> = (0..1_000_000u64).map(|el| el.to_le_bytes()).collect(); @@ -82,7 +82,7 @@ fn bench_hashmap_throughput(c: &mut Criterion) { let mut rng = StdRng::from_seed([3u8; 32]); let zipf = zipf::ZipfDistribution::new(10_000, 1.03).unwrap(); - let input_bytes = 1_000_000 * 8 as u64; + let input_bytes = 1_000_000 * 8; group.throughput(Throughput::Bytes(input_bytes)); let zipf_numbers: Vec<[u8; 8]> = (0..1_000_000u64) .map(|_| zipf.sample(&mut rng).to_le_bytes()) @@ -110,7 +110,7 @@ impl DocIdRecorder { } } -fn create_hash_map<'a, T: AsRef<[u8]>>(terms: impl Iterator) -> ArenaHashMap { +fn create_hash_map>(terms: impl Iterator) -> ArenaHashMap { let mut map = ArenaHashMap::with_capacity(HASHMAP_SIZE); for term in terms { map.mutate_or_create(term.as_ref(), |val| { @@ -126,7 +126,7 @@ fn create_hash_map<'a, T: AsRef<[u8]>>(terms: impl Iterator) -> ArenaH map } -fn create_hash_map_with_expull<'a, T: AsRef<[u8]>>( +fn create_hash_map_with_expull>( terms: impl Iterator, ) -> ArenaHashMap { let mut memory_arena = MemoryArena::default(); @@ -145,7 +145,7 @@ fn create_hash_map_with_expull<'a, T: AsRef<[u8]>>( map } -fn create_fx_hash_ref_map_with_expull<'a>( +fn create_fx_hash_ref_map_with_expull( terms: impl Iterator, ) -> FxHashMap<&'static [u8], Vec> { let terms = terms.enumerate(); @@ -158,7 +158,7 @@ fn create_fx_hash_ref_map_with_expull<'a>( map } -fn create_fx_hash_owned_map_with_expull<'a>( +fn create_fx_hash_owned_map_with_expull( terms: impl Iterator, ) -> FxHashMap, Vec> { let terms = terms.enumerate(); diff --git a/stacker/example/hashmap.rs b/stacker/example/hashmap.rs index 568392cdae..c8b84af1f8 100644 --- a/stacker/example/hashmap.rs +++ b/stacker/example/hashmap.rs @@ -10,7 +10,7 @@ fn main() { } } -fn create_hash_map<'a, T: AsRef>(terms: impl Iterator) -> ArenaHashMap { +fn create_hash_map>(terms: impl Iterator) -> ArenaHashMap { let mut map = ArenaHashMap::with_capacity(4); for term in terms { map.mutate_or_create(term.as_ref().as_bytes(), |val| {