From 11435c62d59c49fdaaaa9d3fdcc9d846bfbf02ed Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 10 Nov 2023 08:41:52 -0800 Subject: [PATCH] Allow btree indices to update, similar to vector indices, on a call to optimize indices --- rust/Cargo.toml | 3 + rust/lance-index/Cargo.toml | 2 + rust/lance-index/src/scalar.rs | 8 + rust/lance-index/src/scalar/btree.rs | 146 +++++++++++++++-- rust/lance-index/src/scalar/flat.rs | 14 ++ rust/lance-index/src/scalar/lance_format.rs | 75 +++++++-- rust/lance-index/src/util.rs | 1 + .../src/util}/chunker.rs | 26 ++- rust/lance-index/src/util/datafusion.rs | 148 +++++++++++++++++- rust/lance/Cargo.toml | 4 +- rust/lance/benches/scalar_index.rs | 28 ++-- rust/lance/src/dataset.rs | 2 - rust/lance/src/dataset/fragment.rs | 2 +- rust/lance/src/dataset/scanner.rs | 104 ++++++++---- rust/lance/src/dataset/write.rs | 3 +- rust/lance/src/index/append.rs | 66 ++++++-- rust/lance/src/index/scalar.rs | 40 ++--- 17 files changed, 543 insertions(+), 129 deletions(-) rename rust/{lance/src/dataset => lance-index/src/util}/chunker.rs (80%) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index d64aae056c..3958e5c308 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -70,6 +70,9 @@ bytes = "1.4" byteorder = "1.5" chrono = "0.4.23" criterion = { version = "0.5", features = ["async", "async_tokio"] } +datafusion = { version = "32.0.0", default-features = false, features = [ + "regex_expressions", +] } datafusion-common = "32.0" datafusion-sql = "32.0" either = "1.0" diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index e657c87438..bc01df35ac 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -20,6 +20,7 @@ arrow-arith.workspace = true arrow-select.workspace = true async-recursion.workspace = true async-trait.workspace = true +datafusion.workspace = true datafusion-common.workspace = true datafusion-expr = "32.0.0" datafusion-physical-expr = { version = "32.0.0", default-features = false } @@ -33,6 +34,7 @@ nohash-hasher.workspace = true num_cpus.workspace = true num-traits.workspace = true object_store.workspace = true +pin-project.workspace = true prost.workspace = true rand.workspace = true roaring.workspace = true diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index bc4b98511d..88a0713d45 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -19,6 +19,7 @@ use std::{any::Any, ops::Bound, sync::Arc}; use arrow_array::{RecordBatch, UInt64Array}; use arrow_schema::Schema; use async_trait::async_trait; +use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::scalar::ScalarValue; use lance_core::Result; @@ -156,4 +157,11 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index { mapping: &IntMap>, dest_store: &dyn IndexStore, ) -> Result<()>; + + /// Add the new data into the index, creating an updated version of the index in `dest_store` + async fn update( + &self, + new_data: SendableRecordBatchStream, + dest_store: &dyn IndexStore, + ) -> Result<()>; } diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 1acd0831b9..24111b7b4e 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -22,21 +22,35 @@ use std::{ }; use arrow_array::{Array, RecordBatch, UInt32Array, UInt64Array}; -use arrow_schema::{DataType, Field, Schema}; +use arrow_schema::{DataType, Field, Schema, SortOptions}; use async_trait::async_trait; -use datafusion_common::ScalarValue; +use datafusion::physical_plan::{ + repartition::RepartitionExec, sorts::sort::SortExec, stream::RecordBatchStreamAdapter, + union::UnionExec, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, +}; +use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::Accumulator; -use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; +use datafusion_physical_expr::{ + expressions::{Column, MaxAccumulator, MinAccumulator}, + PhysicalSortExpr, +}; use futures::{ - stream::{self, BoxStream}, - FutureExt, StreamExt, TryStreamExt, + future::BoxFuture, + stream::{self}, + FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, }; use lance_core::{Error, Result}; use nohash_hasher::IntMap; use serde::{Serialize, Serializer}; use snafu::{location, Location}; -use crate::{Index, IndexType}; +use crate::{ + util::{ + chunker::chunk_concat_stream, + datafusion::{execute_plan, OneShotExec}, + }, + Index, IndexType, +}; use super::{ flat::FlatIndexMetadata, IndexReader, IndexStore, IndexWriter, ScalarIndex, ScalarQuery, @@ -636,9 +650,9 @@ impl BTreeLookup { /// /// Note: this is very similar to the IVF index except we store the IVF part in a btree /// for faster lookup -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct BTreeIndex { - page_lookup: BTreeLookup, + page_lookup: Arc, store: Arc, sub_index: Arc, } @@ -650,7 +664,7 @@ impl BTreeIndex { store: Arc, sub_index: Arc, ) -> Self { - let page_lookup = BTreeLookup::new(tree, null_pages); + let page_lookup = Arc::new(BTreeLookup::new(tree, null_pages)); Self { page_lookup, store, @@ -721,6 +735,21 @@ impl BTreeIndex { Ok(Self::new(map, null_pages, store, sub_index)) } + + async fn into_old_data(self) -> Result { + let reader = self.store.open_index_file(BTREE_PAGES_NAME).await?; + let pages = self.page_lookup.all_page_ids(); + let schema = self.sub_index.schema().clone(); + let batches = IndexReaderStream { + reader, + pages, + idx: 0, + } + .map(|fut| fut.map_err(DataFusionError::from)) + .buffered(num_cpus::get()) + .boxed(); + Ok(RecordBatchStreamAdapter::new(schema, batches)) + } } fn wrap_bound(bound: &Bound) -> Bound { @@ -859,6 +888,15 @@ impl ScalarIndex for BTreeIndex { .copy_index_file(BTREE_LOOKUP_NAME, dest_store) .await } + + async fn update( + &self, + new_data: SendableRecordBatchStream, + dest_store: &dyn IndexStore, + ) -> Result<()> { + let merged_data_source = Box::new(BTreeUpdater::new(self.clone(), new_data)); + train_btree_index(merged_data_source, self.sub_index.as_ref(), dest_store).await + } } struct BatchStats { @@ -916,6 +954,14 @@ pub trait BTreeSubIndex: Debug + Send + Sync { /// Deserialize a subindex from Arrow async fn load_subindex(&self, serialized: RecordBatch) -> Result>; + /// Retrieve the data used to originally train this page + /// + /// In order to perform an update we need to merge the old data in with the new data which + /// means we need to access the new data. Right now this is convenient for flat indices but + /// we may need to take a different approach if we ever decide to use a sub-index other than + /// flat + async fn retrieve_data(&self, serialized: RecordBatch) -> Result; + /// The schema of the subindex when serialized to Arrow fn schema(&self) -> &Arc; @@ -971,7 +1017,7 @@ fn btree_stats_as_batch(stats: Vec) -> Result { } #[async_trait] -pub trait BtreeTrainingSource: Send + Sync { +pub trait BtreeTrainingSource: Send { /// Returns a stream of batches, ordered by the value column (in ascending order) /// /// Each batch should have chunk_size rows @@ -982,7 +1028,7 @@ pub trait BtreeTrainingSource: Send + Sync { async fn scan_ordered_chunks( self: Box, chunk_size: u32, - ) -> Result>>; + ) -> Result; } /// Train a btree index from a stream of sorted page-size batches of values and row ids @@ -1018,3 +1064,81 @@ pub async fn train_btree_index( btree_index_file.finish().await?; Ok(()) } + +struct BTreeUpdater { + index: BTreeIndex, + new_data: SendableRecordBatchStream, +} + +impl BTreeUpdater { + fn new(index: BTreeIndex, new_data: SendableRecordBatchStream) -> Self { + Self { index, new_data } + } +} + +impl BTreeUpdater { + fn into_old_input(index: BTreeIndex) -> Arc { + let schema = index.sub_index.schema().clone(); + let batches = index.into_old_data().into_stream().try_flatten().boxed(); + let stream = Box::pin(RecordBatchStreamAdapter::new(schema, batches)); + Arc::new(OneShotExec::new(stream)) + } +} + +#[async_trait] +impl BtreeTrainingSource for BTreeUpdater { + async fn scan_ordered_chunks( + self: Box, + chunk_size: u32, + ) -> Result { + let new_input = Arc::new(OneShotExec::new(self.new_data)); + let old_input = Self::into_old_input(self.index); + debug_assert_eq!( + old_input.schema().all_fields().len(), + new_input.schema().all_fields().len() + ); + let sort_expr = PhysicalSortExpr { + expr: Arc::new(Column::new("values", 0)), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }; + // TODO: Use SortPreservingMergeExec here + let all_data = Arc::new(UnionExec::new(vec![old_input, new_input])); + // Squash back into one partition + let all_data = Arc::new(RepartitionExec::try_new( + all_data, + datafusion::physical_plan::Partitioning::RoundRobinBatch(1), + )?); + let ordered = Arc::new(SortExec::new(vec![sort_expr], all_data)); + let unchunked = execute_plan(ordered)?; + Ok(chunk_concat_stream(unchunked, chunk_size as usize)) + } +} + +struct IndexReaderStream { + reader: Arc, + pages: Vec, + idx: usize, +} + +impl Stream for IndexReaderStream { + type Item = BoxFuture<'static, Result>; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + let idx = this.idx; + if idx >= this.pages.len() { + return std::task::Poll::Ready(None); + } + let page_number = this.pages[idx]; + this.idx += 1; + let reader_copy = this.reader.clone(); + let read_task = async move { reader_copy.read_record_batch(page_number).await }.boxed(); + std::task::Poll::Ready(Some(read_task)) + } +} diff --git a/rust/lance-index/src/scalar/flat.rs b/rust/lance-index/src/scalar/flat.rs index 7977cb9f1e..4a7de01b0f 100644 --- a/rust/lance-index/src/scalar/flat.rs +++ b/rust/lance-index/src/scalar/flat.rs @@ -20,6 +20,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; +use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_physical_expr::expressions::{in_list, lit, Column}; use lance_core::Result; use nohash_hasher::IntMap; @@ -125,6 +126,10 @@ impl BTreeSubIndex for FlatIndexMetadata { ) -> Result { remap_batch(serialized, mapping) } + + async fn retrieve_data(&self, serialized: RecordBatch) -> Result { + Ok(serialized) + } } #[derive(Serialize)] @@ -246,6 +251,15 @@ impl ScalarIndex for FlatIndex { writer.finish().await?; Ok(()) } + + async fn update( + &self, + _new_data: SendableRecordBatchStream, + _dest_store: &dyn IndexStore, + ) -> Result<()> { + // If this was desired, then you would need to merge new_data and data and write it back out + todo!() + } } #[cfg(test)] diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 872eacdd3b..9d36682b93 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -148,10 +148,13 @@ mod tests { use std::{ops::Bound, path::Path}; - use crate::scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - ScalarIndex, ScalarQuery, + use crate::{ + scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + ScalarIndex, ScalarQuery, + }, + util::datafusion::reader_to_stream, }; use super::*; @@ -162,12 +165,9 @@ mod tests { }; use arrow_schema::{DataType, Field}; use arrow_select::take::TakeOptions; + use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::ScalarValue; - use futures::{ - stream::{self, BoxStream}, - StreamExt, - }; - use lance_core::{io::object_store::ObjectStoreParams, Error}; + use lance_core::io::object_store::ObjectStoreParams; use lance_datagen::{array, gen, BatchCount, RowCount}; use tempfile::{tempdir, TempDir}; @@ -181,23 +181,25 @@ mod tests { Arc::new(LanceIndexStore::new(object_store, test_path.to_owned())) } - struct MockTrainingSource { - data: R, + struct MockTrainingSource { + data: SendableRecordBatchStream, } - impl MockTrainingSource { - fn new(data: R) -> Self { - Self { data } + impl MockTrainingSource { + fn new(data: impl RecordBatchReader + Send + 'static) -> Self { + Self { + data: reader_to_stream(Box::new(data)), + } } } #[async_trait] - impl BtreeTrainingSource for MockTrainingSource { + impl BtreeTrainingSource for MockTrainingSource { async fn scan_ordered_chunks( self: Box, _chunk_size: u32, - ) -> Result>> { - Ok(stream::iter(self.data.map(|batch| batch.map_err(Error::from))).boxed()) + ) -> Result { + Ok(self.data) } } @@ -254,6 +256,45 @@ mod tests { assert_eq!(100, row_ids.len()); } + #[tokio::test] + async fn test_btree_update() { + let index_dir = tempdir().unwrap(); + let index_store = test_store(&index_dir); + let data = gen() + .col(Some("values".to_string()), array::step::()) + .col(Some("row_ids".to_string()), array::step::()) + .into_reader_rows(RowCount::from(4096), BatchCount::from(100)); + train_index(&index_store, data, DataType::Int32).await; + let index = BTreeIndex::load(index_store).await.unwrap(); + + let data = gen() + .col(Some("values".to_string()), array::step::()) + .col(Some("row_ids".to_string()), array::step::()) + .into_reader_rows(RowCount::from(4096), BatchCount::from(100)); + + let updated_index_dir = tempdir().unwrap(); + let updated_index_store = test_store(&updated_index_dir); + index + .update( + reader_to_stream(Box::new(data)), + updated_index_store.as_ref(), + ) + .await + .unwrap(); + let updated_index = BTreeIndex::load(updated_index_store).await.unwrap(); + + let row_ids = updated_index + .search(&ScalarQuery::Equals(ScalarValue::Int32(Some(10000)))) + .await + .unwrap(); + + assert_eq!(2, row_ids.len()); + assert_eq!( + vec![10000, 10000], + row_ids.values().into_iter().copied().collect::>() + ); + } + async fn check(index: &BTreeIndex, query: ScalarQuery, expected: &[u64]) { let results = index.search(&query).await.unwrap(); let expected_arr = UInt64Array::from_iter_values(expected.iter().copied()); diff --git a/rust/lance-index/src/util.rs b/rust/lance-index/src/util.rs index 3400f8f2b3..b82b10f8d3 100644 --- a/rust/lance-index/src/util.rs +++ b/rust/lance-index/src/util.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod chunker; pub mod datafusion; diff --git a/rust/lance/src/dataset/chunker.rs b/rust/lance-index/src/util/chunker.rs similarity index 80% rename from rust/lance/src/dataset/chunker.rs rename to rust/lance-index/src/util/chunker.rs index f988457472..e9499444ec 100644 --- a/rust/lance/src/dataset/chunker.rs +++ b/rust/lance-index/src/util/chunker.rs @@ -15,9 +15,11 @@ use std::collections::VecDeque; use std::pin::Pin; +use arrow::compute::kernels; use arrow_array::RecordBatch; -use datafusion::physical_plan::SendableRecordBatchStream; -use futures::{Stream, StreamExt}; +use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; +use datafusion_common::DataFusionError; +use futures::{Stream, StreamExt, TryStreamExt}; use crate::Result; @@ -122,3 +124,23 @@ pub fn chunk_stream( }) .boxed() } + +pub fn chunk_concat_stream( + stream: SendableRecordBatchStream, + chunk_size: usize, +) -> SendableRecordBatchStream { + let schema = stream.schema().clone(); + let schema_copy = schema.clone(); + let chunked = chunk_stream(stream, chunk_size); + let chunk_concat = chunked + .and_then(move |batches| { + std::future::ready( + // chunk_stream is zero-copy and so it gives us pieces of batches. However, the btree + // index needs 1 batch-per-page and so we concatenate here. + kernels::concat::concat_batches(&schema, batches.iter()).map_err(|e| e.into()), + ) + }) + .map_err(DataFusionError::from) + .boxed(); + Box::pin(RecordBatchStreamAdapter::new(schema_copy, chunk_concat)) +} diff --git a/rust/lance-index/src/util/datafusion.rs b/rust/lance-index/src/util/datafusion.rs index 188b232365..ce5240368d 100644 --- a/rust/lance-index/src/util/datafusion.rs +++ b/rust/lance-index/src/util/datafusion.rs @@ -12,8 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_schema::DataType; -use datafusion_common::ScalarValue; +use std::{ + cell::RefCell, + sync::{Arc, Mutex}, +}; + +use arrow_array::RecordBatchReader; +use arrow_schema::{DataType, Schema}; +use datafusion::{ + execution::{ + context::SessionState, + runtime_env::{RuntimeConfig, RuntimeEnv}, + }, + physical_plan::{ + stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, + SendableRecordBatchStream, + }, + prelude::SessionConfig, +}; +use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_physical_expr::Partitioning; + +use futures::{stream, TryStreamExt}; +use lance_arrow::SchemaExt; +use lance_core::Result; // This is slightly tedious but when we convert expressions from SQL strings to logical // datafusion expressions there is no type coercion that happens. In other words "x = 7" @@ -242,3 +264,125 @@ pub fn safe_coerce_scalar(value: &ScalarValue, ty: &DataType) -> Option None, } } + +pub struct OneShotExec { + stream: Mutex>>, + // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as + // can still function after exhuasted + schema: Arc, +} + +impl OneShotExec { + pub fn new(stream: SendableRecordBatchStream) -> Self { + let schema = stream.schema().clone(); + Self { + stream: Mutex::new(RefCell::new(Some(stream))), + schema, + } + } +} + +impl std::fmt::Debug for OneShotExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let val_guard = self.stream.lock().unwrap(); + let stream = val_guard.borrow(); + f.debug_struct("OneShotExec") + .field("exhausted", &stream.is_none()) + .field("schema", self.schema.as_ref()) + .finish() + } +} + +impl DisplayAs for OneShotExec { + fn fmt_as( + &self, + t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + let val_guard = self.stream.lock().unwrap(); + let stream = val_guard.borrow(); + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let exhausted = if stream.is_some() { "" } else { "EXHUASTED" }; + let columns = self + .schema + .field_names() + .iter() + .map(|s| s.to_string()) + .collect::>(); + write!( + f, + "OneShotStream: {} columns=[{}]", + exhausted, + columns.join(",") + ) + } + } + } +} + +/// Convert a sendable RecordBatchReader to a sendable RecordBatchStream +pub fn reader_to_stream(reader: Box) -> SendableRecordBatchStream { + let schema = reader.schema().clone(); + let stream = stream::iter(reader).map_err(DataFusionError::from); + Box::pin(RecordBatchStreamAdapter::new(schema, stream)) +} + +impl ExecutionPlan for OneShotExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow_schema::SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning { + Partitioning::RoundRobinBatch(1) + } + + fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + todo!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion_common::Result { + let mut val_guard = self.stream.lock().unwrap(); + let stream = val_guard.get_mut(); + let stream = stream.take(); + if let Some(stream) = stream { + Ok(stream) + } else { + panic!("Attempt to use OneShotExec more than once"); + } + } + + fn statistics(&self) -> datafusion_common::Statistics { + todo!() + } +} + +pub fn execute_plan(plan: Arc) -> Result { + let session_config = SessionConfig::new(); + let runtime_config = RuntimeConfig::new(); + let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?); + let session_state = SessionState::new_with_config_rt(session_config, runtime_env); + // NOTE: we are only executing the first partition here. Therefore, if + // the plan has more than one partition, we will be missing data. + assert_eq!(plan.output_partitioning().partition_count(), 1); + Ok(plan.execute(0, session_state.task_ctx())?) +} diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 353a8adcc5..419e95e7e9 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -59,9 +59,7 @@ arrow = { version = "47.0.0", features = ["prettyprint"] } nohash-hasher.workspace = true num_cpus.workspace = true # TODO: use datafusion sub-modules to reduce build size? -datafusion = { version = "32.0.0", default-features = false, features = [ - "regex_expressions", -] } +datafusion.workspace = true lapack = { version = "0.19.0", optional = true } cblas = { version = "0.4.0", optional = true } lru_time_cache = "0.11" diff --git a/rust/lance/benches/scalar_index.rs b/rust/lance/benches/scalar_index.rs index 5ff313b1db..870d2c4740 100644 --- a/rust/lance/benches/scalar_index.rs +++ b/rust/lance/benches/scalar_index.rs @@ -2,26 +2,26 @@ use std::sync::Arc; use arrow_array::{ types::{UInt32Type, UInt64Type}, - RecordBatch, RecordBatchReader, + RecordBatchReader, }; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::scalar::ScalarValue; -use futures::{ - stream::{self, BoxStream}, - StreamExt, TryStreamExt, -}; +use datafusion::{physical_plan::SendableRecordBatchStream, scalar::ScalarValue}; +use futures::TryStreamExt; use lance::{ io::{object_store::ObjectStoreParams, ObjectStore}, Dataset, }; -use lance_core::{Error, Result}; +use lance_core::Result; use lance_datagen::{array, gen, BatchCount, RowCount}; -use lance_index::scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - lance_format::LanceIndexStore, - IndexStore, ScalarIndex, ScalarQuery, +use lance_index::{ + scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + lance_format::LanceIndexStore, + IndexStore, ScalarIndex, ScalarQuery, + }, + util::datafusion::reader_to_stream, }; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; @@ -49,8 +49,8 @@ impl BtreeTrainingSource for BenchmarkDataSource { async fn scan_ordered_chunks( self: Box, _chunk_size: u32, - ) -> Result>> { - Ok(stream::iter(Self::test_data().map(|batch| batch.map_err(Error::from))).boxed()) + ) -> Result { + Ok(reader_to_stream(Box::new(Self::test_data()))) } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 518ab21af4..f18957d630 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -48,8 +48,6 @@ use std::sync::Arc; use tracing::instrument; pub mod builder; -pub(crate) mod chunker; - pub mod cleanup; mod feature_flags; pub mod fragment; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 1e874cc40c..c622e39375 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -33,11 +33,11 @@ use lance_core::{ }, Error, Result, ROW_ID, }; +use lance_index::util::chunker::chunk_stream; use object_store::path::Path; use snafu::{location, Location}; use uuid::Uuid; -use super::chunker::chunk_stream; use super::hash_joiner::HashJoiner; use super::scanner::Scanner; use super::updater::Updater; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 75e6dcb023..1e89912f44 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -19,10 +19,6 @@ use std::task::{Context, Poll}; use arrow_array::{Array, Float32Array, Int64Array, RecordBatch}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, SortOptions}; use async_recursion::async_recursion; -use datafusion::execution::{ - context::SessionState, - runtime_env::{RuntimeConfig, RuntimeEnv}, -}; use datafusion::logical_expr::AggregateFunction; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::expressions; @@ -37,11 +33,11 @@ use datafusion::physical_plan::{ union::UnionExec, ExecutionPlan, SendableRecordBatchStream, }; -use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use futures::stream::{Stream, StreamExt}; use lance_core::ROW_ID_FIELD; use lance_index::scalar::expression::ScalarIndexExpr; +use lance_index::util::datafusion::execute_plan; use lance_index::vector::{Query, DIST_COL}; use lance_linalg::distance::MetricType; use log::debug; @@ -489,22 +485,16 @@ impl Scanner { Ok(Arc::new(schema)) } - fn execute_plan(&self, plan: Arc) -> Result { - let session_config = SessionConfig::new(); - let runtime_config = RuntimeConfig::new(); - let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?); - let session_state = SessionState::new_with_config_rt(session_config, runtime_env); - // NOTE: we are only executing the first partition here. Therefore, if - // the plan has more than one partition, we will be missing data. - assert_eq!(plan.output_partitioning().partition_count(), 1); - Ok(plan.execute(0, session_state.task_ctx())?) - } - /// Create a stream from the Scanner. #[instrument(skip_all)] pub async fn try_into_stream(&self) -> Result { let plan = self.create_plan().await?; - Ok(DatasetRecordBatchStream::new(self.execute_plan(plan)?)) + Ok(DatasetRecordBatchStream::new(execute_plan(plan)?)) + } + + pub(crate) async fn try_into_dfstream(&self) -> Result { + let plan = self.create_plan().await?; + execute_plan(plan) } /// Scan and return the number of matching rows @@ -531,7 +521,7 @@ impl Scanner { plan, plan_schema, )?); - let mut stream = self.execute_plan(count_plan)?; + let mut stream = execute_plan(count_plan)?; // A count plan will always return a single batch with a single row. if let Some(first_batch) = stream.next().await { @@ -2736,10 +2726,17 @@ mod test { dataset: Dataset, sample_query: Arc, delete_query: Arc, + // The original version of the data, two fragments, rows 0-1000 original_version: u64, + // The original version of the data, 1 row deleted, compacted to a single fragment compact_version: u64, + // The original version of the data + an extra 1000 unindexed append_version: u64, + // The original version of the data + an extra 1000 rows, with indices updated so all rows indexed + updated_version: u64, + // The original version of the data with 1 deleted row delete_version: u64, + // The original version of the data + an extra 1000 uindexed + 1 deleted row append_then_delete_version: u64, } @@ -2751,6 +2748,7 @@ mod test { use_new_data: bool, with_row_id: bool, use_compaction: bool, + use_updated: bool, } impl ScalarIndexTestFixture { @@ -2835,8 +2833,16 @@ mod test { let append_version = dataset.version().version; + // UPDATE + + dataset.optimize_indices().await.unwrap(); + let updated_version = dataset.version().version; + // APPEND -> DELETE + dataset.checkout_version(append_version).await.unwrap(); + dataset.restore(None).await.unwrap(); + dataset.delete("not_indexed = 75").await.unwrap(); let append_then_delete_version = dataset.version().version; @@ -2867,6 +2873,7 @@ mod test { original_version, compact_version, append_version, + updated_version, delete_version, append_then_delete_version, } @@ -2882,11 +2889,23 @@ mod test { async fn get_dataset(&self, params: &ScalarTestParams) -> Dataset { let version = if params.use_compaction { - if params.use_deleted_data || params.use_new_data { - panic!("There is no test data combining new/deleted data with compaction"); + // These combinations should not be possible + if params.use_deleted_data || params.use_new_data || params.use_updated { + panic!( + "There is no test data combining new/deleted/updated data with compaction" + ); } else { self.compact_version } + } else if params.use_updated { + // These combinations should not be possible + if params.use_deleted_data || params.use_new_data || params.use_compaction { + panic!( + "There is no test data combining updated data with new/deleted/compaction" + ); + } else { + self.updated_version + } } else { match (params.use_new_data, params.use_deleted_data) { (false, false) => self.original_version, @@ -3014,7 +3033,7 @@ mod test { ); } // If there is new data then the dupe of row 50 should be in the results - if params.use_new_data { + if params.use_new_data || params.use_updated { self.assert_one( &batch, |val| val == 1050, @@ -3088,14 +3107,18 @@ mod test { |val| val == 50, "The query contained 50 even though it was filtered", ); - let mut expected_num_rows = if params.use_new_data { 1999 } else { 999 }; + let mut expected_num_rows = if params.use_new_data || params.use_updated { + 1999 + } else { + 999 + }; if params.use_deleted_data || params.use_compaction { expected_num_rows -= 1; } assert_eq!(batch.num_rows(), expected_num_rows); // Let's also make sure our filter can target something in the new data only - if params.use_new_data { + if params.use_new_data || params.use_updated { let (_, batch) = self.run_query("indexed == 1050", None, params).await; assert_eq!(batch.num_rows(), 1); } @@ -3131,7 +3154,11 @@ mod test { "The non-indexed refine filter was not applied", ); - let mut expected_num_rows = if params.use_new_data { 199 } else { 99 }; + let mut expected_num_rows = if params.use_new_data || params.use_updated { + 199 + } else { + 99 + }; if params.use_deleted_data || params.use_compaction { expected_num_rows -= 1; } @@ -3164,17 +3191,26 @@ mod test { vec![false, true] }; for use_compaction in compaction_choices { - for with_row_id in [false, true] { - let params = ScalarTestParams { - use_index, - use_projection, - use_deleted_data, - use_new_data, - with_row_id, - use_compaction, + let updated_choices = + if use_deleted_data || use_new_data || use_compaction { + vec![false] + } else { + vec![false, true] }; - fixture.check_vector_queries(¶ms).await; - fixture.check_simple_queries(¶ms).await; + for use_updated in updated_choices { + for with_row_id in [false, true] { + let params = ScalarTestParams { + use_index, + use_projection, + use_deleted_data, + use_new_data, + with_row_id, + use_compaction, + use_updated, + }; + fixture.check_vector_queries(¶ms).await; + fixture.check_simple_queries(¶ms).await; + } } } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index cf4478018c..026789dd8e 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -31,12 +31,13 @@ use lance_core::{ }, Error, Result, }; +use lance_index::util::chunker::chunk_stream; use object_store::path::Path; use tracing::instrument; use uuid::Uuid; use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress}; -use super::{chunker::chunk_stream, DATA_DIR}; +use super::DATA_DIR; /// The mode to write dataset. #[derive(Debug, Clone, Copy)] diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 2f0c484fd6..3bc2677194 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -15,11 +15,14 @@ use std::sync::Arc; use lance_core::{format::Index as IndexMetadata, Error, Result}; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_index::IndexType; use log::info; use snafu::{location, Location}; use uuid::Uuid; use crate::dataset::index::unindexed_fragments; +use crate::dataset::scanner::ColumnOrdering; use crate::dataset::Dataset; use crate::index::vector::ivf::IVFIndex; @@ -47,24 +50,55 @@ pub async fn append_index( })?; let index = dataset - .open_vector_index(&column.name, old_index.uuid.to_string().as_str()) + .open_generic_index(&column.name, &old_index.uuid.to_string()) .await?; - let Some(ivf_idx) = index.as_any().downcast_ref::() else { - info!("Index type: {:?} does not support append", index); - return Ok(None); - }; - - let mut scanner = dataset.scan(); - scanner.with_fragments(unindexed); - scanner.with_row_id(); - scanner.project(&[&column.name])?; - let stream = scanner.try_into_stream().await?; - - let new_index = ivf_idx - .append(dataset.as_ref(), stream, old_index, &column.name) - .await?; - Ok(Some(new_index)) + match index.index_type() { + IndexType::Scalar => { + let index = dataset + .open_scalar_index(&column.name, &old_index.uuid.to_string()) + .await?; + + let mut scanner = dataset.scan(); + scanner + .with_fragments(unindexed) + .with_row_id() + .order_by(Some(vec![ColumnOrdering::asc_nulls_first( + column.name.clone(), + )]))? + .project(&[&column.name])?; + let new_data_stream = scanner.try_into_stream().await?; + + let new_uuid = Uuid::new_v4(); + + let index_dir = dataset.indices_dir().child(new_uuid.to_string()); + let new_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir); + + index.update(new_data_stream.into(), &new_store).await?; + + Ok(Some(new_uuid)) + } + IndexType::Vector => { + let mut scanner = dataset.scan(); + scanner.with_fragments(unindexed); + scanner.with_row_id(); + let new_data_stream = scanner.try_into_stream().await?; + + let index = dataset + .open_vector_index(&column.name, old_index.uuid.to_string().as_str()) + .await?; + + let Some(ivf_idx) = index.as_any().downcast_ref::() else { + info!("Index type: {:?} does not support append", index); + return Ok(None); + }; + + let new_index = ivf_idx + .append(dataset.as_ref(), new_data_stream, old_index, &column.name) + .await?; + Ok(Some(new_index)) + } + } } #[cfg(test)] diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 139c3d3e05..96cfd53c0d 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -15,27 +15,25 @@ //! Utilities for integrating scalar indices with datasets //! -use std::{future, sync::Arc}; +use std::sync::Arc; -use arrow::compute::kernels; -use arrow_array::RecordBatch; use async_trait::async_trait; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use lance_index::scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - lance_format::LanceIndexStore, - ScalarIndex, +use datafusion::physical_plan::SendableRecordBatchStream; +use lance_index::{ + scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + lance_format::LanceIndexStore, + ScalarIndex, + }, + util::chunker::chunk_concat_stream, }; use snafu::{location, Location}; use tracing::instrument; use lance_core::{Error, Result}; -use crate::{ - dataset::{chunker, scanner::ColumnOrdering}, - Dataset, -}; +use crate::{dataset::scanner::ColumnOrdering, Dataset}; use super::IndexParams; @@ -58,7 +56,7 @@ impl BtreeTrainingSource for TrainingRequest { async fn scan_ordered_chunks( self: Box, chunk_size: u32, - ) -> Result>> { + ) -> Result { let mut scan = self.dataset.scan(); let scan = scan .with_row_id() @@ -67,18 +65,8 @@ impl BtreeTrainingSource for TrainingRequest { )]))? .project(&[&self.column])?; - let schema = scan.schema()?; - let ordered_batches = scan.try_into_stream().await?; - let chunked = chunker::chunk_stream(ordered_batches.into(), chunk_size as usize); - Ok(chunked - .and_then(move |batches| { - future::ready( - // chunk_stream is zero-copy and so it gives us pieces of batches. However, the btree - // index needs 1 batch-per-page and so we concatenate here. - kernels::concat::concat_batches(&schema, batches.iter()).map_err(|e| e.into()), - ) - }) - .boxed()) + let ordered_batches = scan.try_into_dfstream().await?; + Ok(chunk_concat_stream(ordered_batches, chunk_size as usize)) } }