Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Tantivy 0.22.0 #148

Merged
merged 3 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,203 changes: 664 additions & 539 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lnx-engine/aexecutor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ edition = "2018"
async-channel = "1.6.1"
anyhow = "1"
rayon = "1.5.1"
tantivy = "0.18.0"
tantivy = "0.22.0"
crossbeam = "0.8"
tokio = { version = "1.11", features = ["sync"] }
6 changes: 3 additions & 3 deletions lnx-engine/aexecutor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod reader_executor;
use std::borrow::Borrow;

use anyhow::{Error, Result};
use tantivy::{LeasedItem, Searcher};
use tantivy::Searcher;
use tokio::sync::{oneshot, Semaphore};

use crate::reader_executor::TantivyExecutorPool;
Expand Down Expand Up @@ -96,7 +96,7 @@ impl SearcherExecutorPool {
/// the results once complete.
pub async fn spawn<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(LeasedItem<Searcher>, &tantivy::Executor) -> T + Send + 'static,
F: FnOnce(Searcher, &tantivy::Executor) -> T + Send + 'static,
T: Sync + Send + 'static,
{
let _permit = self.limiter.acquire().await?;
Expand All @@ -117,7 +117,7 @@ impl SearcherExecutorPool {
}

#[inline]
pub fn searcher(&self) -> LeasedItem<Searcher> {
pub fn searcher(&self) -> Searcher {
self.reader.searcher()
}
}
2 changes: 1 addition & 1 deletion lnx-engine/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Engine {
indexes = guard.as_ref().clone();
}

if !override_if_exists & indexes.get(index.name()).is_some() {
if !override_if_exists & indexes.contains_key(index.name()) {
return Err(Error::msg("index already exists."));
}

Expand Down
6 changes: 3 additions & 3 deletions lnx-engine/search-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ edition = "2018"
crossbeam = { version = "0.8.1", default-features = false, features = ["crossbeam-channel", "crossbeam-queue"] }
time = { version = "0.3.9", features = ["serde", "parsing", "formatting"] }
serde = { version = "1", features = ["derive"] }
sled = { version = "0.34.7", features = ["compression"] }
sled = { version = "0.34.7" }
tokio = { version = "1.12", features = ["sync", "fs", "rt"] }
compose = { git = "https://github.com/lnx-search/compose.git", rev = "77099ad" }
compose = { git = "https://github.com/lnx-search/compose.git", rev = "a3cf2ef" }

deunicode = "1.6.0"
tantivy = "0.18.0"
tantivy = "0.22.0"
tracing = "0.1.29"
crc32fast = "1.3.0"
bincode = "1.3"
Expand Down
2 changes: 1 addition & 1 deletion lnx-engine/search-index/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn compress_stop_words() -> Result<()> {
encoder.write_all(&data)?;
let data = encoder.finish()?;

fs::write("./_dist/stop_words", &data)?;
fs::write("./_dist/stop_words", data)?;

Ok(())
}
4 changes: 2 additions & 2 deletions lnx-engine/search-index/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ mod tests {
use crate::structures::{DocumentValue, IndexDeclaration};

fn init_state() {
let _ = std::env::set_var("RUST_LOG", "debug,sled=info");
let _ = pretty_env_logger::try_init_timed();
std::env::set_var("RUST_LOG", "debug,sled=info");
let _ = tracing_subscriber::fmt::try_init();
}

async fn get_index_with(value: serde_json::Value) -> Result<Index> {
Expand Down
6 changes: 4 additions & 2 deletions lnx-engine/search-index/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tantivy::query::{
BooleanQuery,
BoostQuery,
EmptyQuery,
EnableScoring,
FuzzyTermQuery,
MoreLikeThisQuery,
Query,
Expand Down Expand Up @@ -559,7 +560,7 @@ impl QueryBuilder {
.schema
.get_field(&v)
.map(|field| (field, 0.0f32))
.ok_or_else(|| {
.map_err(|_| {
anyhow!(
"Unknown field {:?} in fuzzy query config {:?}.",
v,
Expand Down Expand Up @@ -691,6 +692,7 @@ impl QueryBuilder {
&qry,
&TopDocs::with_limit(1),
executor,
EnableScoring::enabled_from_searcher(&searcher),
)?;
if results.is_empty() {
return Err(Error::msg(format!(
Expand Down Expand Up @@ -796,7 +798,7 @@ impl QueryBuilder {
}

fn get_searchable_field(&self, field: &str) -> Result<Field> {
let field = self.schema.get_field(field).ok_or_else(|| {
let field = self.schema.get_field(field).map_err(|_| {
Error::msg(format!("no field exists with name: {:?}", field))
})?;

Expand Down
73 changes: 46 additions & 27 deletions lnx-engine/search-index/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ use aexecutor::SearcherExecutorPool;
use anyhow::{anyhow, Error, Result};
use serde::{Deserialize, Serialize};
use tantivy::collector::{Count, TopDocs};
use tantivy::fastfield::FastFieldReader;
use tantivy::query::{Query, TermQuery};
use tantivy::schema::{Field, FieldType, IndexRecordOption, Schema, Value};
use tantivy::query::{EnableScoring, Query, TermQuery};
use tantivy::schema::{Field, FieldType, IndexRecordOption, OwnedValue, Schema};
use tantivy::{
DateTime,
DocAddress,
DocId,
Document,
Executor,
IndexReader,
LeasedItem,
Order,
ReloadPolicy,
Score,
Searcher,
SegmentReader,
TantivyDocument,
Term,
};

Expand Down Expand Up @@ -146,11 +147,17 @@ fn order_and_search<R: AsScore + tantivy::fastfield::FastValue>(
executor: &Executor,
) -> Result<(Vec<(R, DocAddress)>, usize)> {
let collector = TopDocs::with_limit(limit + offset);
let collector = collector.order_by_fast_field(field);
let field_name = searcher.schema().get_field_name(field);
let collector = collector.order_by_fast_field(field_name, Order::Desc);
let collector = (collector, Count);

let (result_addresses, count) = searcher
.search_with_executor(&query, &collector, executor)
.search_with_executor(
&query,
&collector,
executor,
EnableScoring::enabled_from_searcher(searcher),
)
.map_err(Error::from)?;

let results = result_addresses
Expand All @@ -167,7 +174,12 @@ macro_rules! execute_staged_search {
let collector = ($collector, Count);

let (results, count) = $searcher
.search_with_executor(&$query, &collector, $executor)
.search_with_executor(
&$query,
&collector,
$executor,
tantivy::query::EnableScoring::enabled_from_searcher(&$searcher),
)
.map_err(Error::from)?;

let results = results
Expand All @@ -189,13 +201,13 @@ fn process_search<S: AsScore>(
) -> Result<Vec<DocumentHit>> {
let mut hits = Vec::with_capacity(top_docs.len());
for (ratio, ref_address) in top_docs {
let retrieved_doc = searcher.doc(ref_address)?;
let mut doc = schema.to_named_doc(&retrieved_doc);
let retrieved_doc: TantivyDocument = searcher.doc(ref_address)?;
let mut doc = retrieved_doc.to_named_doc(schema);
let id = doc.0
.remove("_id")
.ok_or_else(|| Error::msg("document has been missed labeled (missing primary key '_id'), the dataset is invalid"))?;

if let Value::U64(doc_id) = id[0] {
if let OwnedValue::U64(doc_id) = id[0] {
hits.push(DocumentHit::from_tantivy_document(
ctx,
doc_id,
Expand Down Expand Up @@ -226,9 +238,8 @@ fn order_and_sort(
offset: usize,
executor: &Executor,
) -> Result<(Vec<DocumentHit>, usize)> {
let is_multi_value = ctx
.multi_value_fields()
.contains(schema.get_field_name(field));
let field_name = schema.get_field_name(field);
let is_multi_value = ctx.multi_value_fields().contains(field_name);

if is_multi_value {
return Err(anyhow!(
Expand Down Expand Up @@ -266,15 +277,17 @@ fn order_and_sort(
let collector = TopDocs::with_limit(limit + offset);
let out = match field_type {
FieldType::I64(_) => {
let field_name = field_name.to_owned();
let collector =
collector.custom_score(move |segment_reader: &SegmentReader| {
let reader = segment_reader
.fast_fields()
.i64(field)
.i64(&field_name)
.expect("field exists");

move |doc: DocId| {
let value: i64 = reader.get(doc);
let value: i64 =
reader.first(doc).expect("Col must not be None");
std::cmp::Reverse(value)
}
});
Expand All @@ -285,15 +298,17 @@ fn order_and_sort(
(process_search(ctx, searcher, schema, out.0)?, out.1)
},
FieldType::U64(_) => {
let field_name = field_name.to_owned();
let collector =
collector.custom_score(move |segment_reader: &SegmentReader| {
let reader = segment_reader
.fast_fields()
.u64(field)
.u64(&field_name)
.expect("field exists");

move |doc: DocId| {
let value: u64 = reader.get(doc);
let value: u64 =
reader.first(doc).expect("Col must not be None");
std::cmp::Reverse(value)
}
});
Expand All @@ -304,15 +319,17 @@ fn order_and_sort(
(process_search(ctx, searcher, schema, out.0)?, out.1)
},
FieldType::F64(_) => {
let field_name = field_name.to_owned();
let collector =
collector.custom_score(move |segment_reader: &SegmentReader| {
let reader = segment_reader
.fast_fields()
.f64(field)
.f64(&field_name)
.expect("field exists");

move |doc: DocId| {
let value: f64 = reader.get(doc);
let value: f64 =
reader.first(doc).expect("Col must not be None");
std::cmp::Reverse(value)
}
});
Expand All @@ -323,15 +340,17 @@ fn order_and_sort(
(process_search(ctx, searcher, schema, out.0)?, out.1)
},
FieldType::Date(_) => {
let field_name = field_name.to_owned();
let collector =
collector.custom_score(move |segment_reader: &SegmentReader| {
let reader = segment_reader
.fast_fields()
.date(field)
.date(&field_name)
.expect("field exists");

move |doc: DocId| {
let value: DateTime = reader.get(doc);
let value: DateTime =
reader.first(doc).expect("Col must not be None");
std::cmp::Reverse(value)
}
});
Expand Down Expand Up @@ -373,8 +392,7 @@ impl Reader {
let reader: IndexReader = ctx
.index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommit)
.num_searchers(ctx.reader_ctx.max_concurrency)
.reload_policy(ReloadPolicy::OnCommitWithDelay)
.try_into()?;
info!(
"index reader created with reload policy=OnCommit, num_searchers={}",
Expand Down Expand Up @@ -441,6 +459,7 @@ impl Reader {
&qry,
&TopDocs::with_limit(1),
executor,
EnableScoring::enabled_from_searcher(&searcher),
)?;
if results.is_empty() {
return Err(Error::msg(format!(
Expand All @@ -450,10 +469,10 @@ impl Reader {
}

let (_, addr) = results.remove(0);
let doc = searcher.doc(addr)?;
let doc: TantivyDocument = searcher.doc(addr)?;
let schema = searcher.schema();

Ok(schema.to_named_doc(&doc))
Ok(doc.to_named_doc(schema))
})
.await??;

Expand Down Expand Up @@ -487,7 +506,7 @@ impl Reader {
let schema = searcher.schema();
let order_by = order_by.map(|v| schema.get_field(&v));

let (hits, count) = if let Some(Some(field)) = order_by {
let (hits, count) = if let Some(Ok(field)) = order_by {
order_and_sort(
sort,
field,
Expand Down Expand Up @@ -541,7 +560,7 @@ impl Reader {
///
/// This is an internal export to allow the writer
/// to have access to the segment reader information.
pub(crate) fn get_searcher(&self) -> LeasedItem<Searcher> {
pub(crate) fn get_searcher(&self) -> Searcher {
self.pool.searcher()
}
}
Loading
Loading