From c6a4674ec55c26ec1d5c520cc16309c1d15e1aee Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 10 Nov 2023 08:41:52 -0800 Subject: [PATCH 1/4] Allow btree indices to update, similar to vector indices, on a call to optimize indices --- rust/Cargo.toml | 3 + rust/lance-index/Cargo.toml | 2 + rust/lance-index/src/scalar.rs | 8 + rust/lance-index/src/scalar/btree.rs | 146 +++++++++++++++-- rust/lance-index/src/scalar/flat.rs | 14 ++ rust/lance-index/src/scalar/lance_format.rs | 75 +++++++-- rust/lance-index/src/util.rs | 1 + .../src/util}/chunker.rs | 26 ++- rust/lance-index/src/util/datafusion.rs | 148 +++++++++++++++++- rust/lance/Cargo.toml | 4 +- rust/lance/benches/scalar_index.rs | 28 ++-- rust/lance/src/dataset.rs | 2 - rust/lance/src/dataset/fragment.rs | 2 +- rust/lance/src/dataset/scanner.rs | 104 ++++++++---- rust/lance/src/dataset/write.rs | 3 +- rust/lance/src/index/append.rs | 68 ++++++-- rust/lance/src/index/scalar.rs | 40 ++--- 17 files changed, 544 insertions(+), 130 deletions(-) rename rust/{lance/src/dataset => lance-index/src/util}/chunker.rs (80%) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index c8a8ed4063..384a50fbf1 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -70,6 +70,9 @@ bytes = "1.4" byteorder = "1.5" chrono = "0.4.23" criterion = { version = "0.5", features = ["async", "async_tokio"] } +datafusion = { version = "32.0.0", default-features = false, features = [ + "regex_expressions", +] } datafusion-common = "32.0" datafusion-sql = "32.0" either = "1.0" diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index e657c87438..bc01df35ac 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -20,6 +20,7 @@ arrow-arith.workspace = true arrow-select.workspace = true async-recursion.workspace = true async-trait.workspace = true +datafusion.workspace = true datafusion-common.workspace = true datafusion-expr = "32.0.0" datafusion-physical-expr = { version = "32.0.0", default-features = false } @@ -33,6 +34,7 @@ nohash-hasher.workspace = true num_cpus.workspace = true num-traits.workspace = true object_store.workspace = true +pin-project.workspace = true prost.workspace = true rand.workspace = true roaring.workspace = true diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index bc4b98511d..88a0713d45 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -19,6 +19,7 @@ use std::{any::Any, ops::Bound, sync::Arc}; use arrow_array::{RecordBatch, UInt64Array}; use arrow_schema::Schema; use async_trait::async_trait; +use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::scalar::ScalarValue; use lance_core::Result; @@ -156,4 +157,11 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index { mapping: &IntMap>, dest_store: &dyn IndexStore, ) -> Result<()>; + + /// Add the new data into the index, creating an updated version of the index in `dest_store` + async fn update( + &self, + new_data: SendableRecordBatchStream, + dest_store: &dyn IndexStore, + ) -> Result<()>; } diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index a011d616a9..905c7d55ff 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -22,14 +22,22 @@ use std::{ }; use arrow_array::{Array, RecordBatch, UInt32Array, UInt64Array}; -use arrow_schema::{DataType, Field, Schema}; +use arrow_schema::{DataType, Field, Schema, SortOptions}; use async_trait::async_trait; -use datafusion_common::ScalarValue; +use datafusion::physical_plan::{ + repartition::RepartitionExec, sorts::sort::SortExec, stream::RecordBatchStreamAdapter, + union::UnionExec, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, +}; +use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::Accumulator; -use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; +use datafusion_physical_expr::{ + expressions::{Column, MaxAccumulator, MinAccumulator}, + PhysicalSortExpr, +}; use futures::{ - stream::{self, BoxStream}, - FutureExt, StreamExt, TryStreamExt, + future::BoxFuture, + stream::{self}, + FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, }; use lance_core::{Error, Result}; use nohash_hasher::IntMap; @@ -37,7 +45,13 @@ use roaring::RoaringBitmap; use serde::{Serialize, Serializer}; use snafu::{location, Location}; -use crate::{Index, IndexType}; +use crate::{ + util::{ + chunker::chunk_concat_stream, + datafusion::{execute_plan, OneShotExec}, + }, + Index, IndexType, +}; use super::{ flat::FlatIndexMetadata, IndexReader, IndexStore, IndexWriter, ScalarIndex, ScalarQuery, @@ -637,9 +651,9 @@ impl BTreeLookup { /// /// Note: this is very similar to the IVF index except we store the IVF part in a btree /// for faster lookup -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct BTreeIndex { - page_lookup: BTreeLookup, + page_lookup: Arc, store: Arc, sub_index: Arc, } @@ -651,7 +665,7 @@ impl BTreeIndex { store: Arc, sub_index: Arc, ) -> Self { - let page_lookup = BTreeLookup::new(tree, null_pages); + let page_lookup = Arc::new(BTreeLookup::new(tree, null_pages)); Self { page_lookup, store, @@ -722,6 +736,21 @@ impl BTreeIndex { Ok(Self::new(map, null_pages, store, sub_index)) } + + async fn into_old_data(self) -> Result { + let reader = self.store.open_index_file(BTREE_PAGES_NAME).await?; + let pages = self.page_lookup.all_page_ids(); + let schema = self.sub_index.schema().clone(); + let batches = IndexReaderStream { + reader, + pages, + idx: 0, + } + .map(|fut| fut.map_err(DataFusionError::from)) + .buffered(num_cpus::get()) + .boxed(); + Ok(RecordBatchStreamAdapter::new(schema, batches)) + } } fn wrap_bound(bound: &Bound) -> Bound { @@ -874,6 +903,15 @@ impl ScalarIndex for BTreeIndex { .copy_index_file(BTREE_LOOKUP_NAME, dest_store) .await } + + async fn update( + &self, + new_data: SendableRecordBatchStream, + dest_store: &dyn IndexStore, + ) -> Result<()> { + let merged_data_source = Box::new(BTreeUpdater::new(self.clone(), new_data)); + train_btree_index(merged_data_source, self.sub_index.as_ref(), dest_store).await + } } struct BatchStats { @@ -931,6 +969,14 @@ pub trait BTreeSubIndex: Debug + Send + Sync { /// Deserialize a subindex from Arrow async fn load_subindex(&self, serialized: RecordBatch) -> Result>; + /// Retrieve the data used to originally train this page + /// + /// In order to perform an update we need to merge the old data in with the new data which + /// means we need to access the new data. Right now this is convenient for flat indices but + /// we may need to take a different approach if we ever decide to use a sub-index other than + /// flat + async fn retrieve_data(&self, serialized: RecordBatch) -> Result; + /// The schema of the subindex when serialized to Arrow fn schema(&self) -> &Arc; @@ -986,7 +1032,7 @@ fn btree_stats_as_batch(stats: Vec) -> Result { } #[async_trait] -pub trait BtreeTrainingSource: Send + Sync { +pub trait BtreeTrainingSource: Send { /// Returns a stream of batches, ordered by the value column (in ascending order) /// /// Each batch should have chunk_size rows @@ -997,7 +1043,7 @@ pub trait BtreeTrainingSource: Send + Sync { async fn scan_ordered_chunks( self: Box, chunk_size: u32, - ) -> Result>>; + ) -> Result; } /// Train a btree index from a stream of sorted page-size batches of values and row ids @@ -1033,3 +1079,81 @@ pub async fn train_btree_index( btree_index_file.finish().await?; Ok(()) } + +struct BTreeUpdater { + index: BTreeIndex, + new_data: SendableRecordBatchStream, +} + +impl BTreeUpdater { + fn new(index: BTreeIndex, new_data: SendableRecordBatchStream) -> Self { + Self { index, new_data } + } +} + +impl BTreeUpdater { + fn into_old_input(index: BTreeIndex) -> Arc { + let schema = index.sub_index.schema().clone(); + let batches = index.into_old_data().into_stream().try_flatten().boxed(); + let stream = Box::pin(RecordBatchStreamAdapter::new(schema, batches)); + Arc::new(OneShotExec::new(stream)) + } +} + +#[async_trait] +impl BtreeTrainingSource for BTreeUpdater { + async fn scan_ordered_chunks( + self: Box, + chunk_size: u32, + ) -> Result { + let new_input = Arc::new(OneShotExec::new(self.new_data)); + let old_input = Self::into_old_input(self.index); + debug_assert_eq!( + old_input.schema().all_fields().len(), + new_input.schema().all_fields().len() + ); + let sort_expr = PhysicalSortExpr { + expr: Arc::new(Column::new("values", 0)), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }; + // TODO: Use SortPreservingMergeExec here + let all_data = Arc::new(UnionExec::new(vec![old_input, new_input])); + // Squash back into one partition + let all_data = Arc::new(RepartitionExec::try_new( + all_data, + datafusion::physical_plan::Partitioning::RoundRobinBatch(1), + )?); + let ordered = Arc::new(SortExec::new(vec![sort_expr], all_data)); + let unchunked = execute_plan(ordered)?; + Ok(chunk_concat_stream(unchunked, chunk_size as usize)) + } +} + +struct IndexReaderStream { + reader: Arc, + pages: Vec, + idx: usize, +} + +impl Stream for IndexReaderStream { + type Item = BoxFuture<'static, Result>; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + let idx = this.idx; + if idx >= this.pages.len() { + return std::task::Poll::Ready(None); + } + let page_number = this.pages[idx]; + this.idx += 1; + let reader_copy = this.reader.clone(); + let read_task = async move { reader_copy.read_record_batch(page_number).await }.boxed(); + std::task::Poll::Ready(Some(read_task)) + } +} diff --git a/rust/lance-index/src/scalar/flat.rs b/rust/lance-index/src/scalar/flat.rs index 46fd90b1e2..58cbb86be2 100644 --- a/rust/lance-index/src/scalar/flat.rs +++ b/rust/lance-index/src/scalar/flat.rs @@ -20,6 +20,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; +use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_physical_expr::expressions::{in_list, lit, Column}; use lance_core::{format::RowAddress, Result}; use nohash_hasher::IntMap; @@ -126,6 +127,10 @@ impl BTreeSubIndex for FlatIndexMetadata { ) -> Result { remap_batch(serialized, mapping) } + + async fn retrieve_data(&self, serialized: RecordBatch) -> Result { + Ok(serialized) + } } #[derive(Serialize)] @@ -260,6 +265,15 @@ impl ScalarIndex for FlatIndex { writer.finish().await?; Ok(()) } + + async fn update( + &self, + _new_data: SendableRecordBatchStream, + _dest_store: &dyn IndexStore, + ) -> Result<()> { + // If this was desired, then you would need to merge new_data and data and write it back out + todo!() + } } #[cfg(test)] diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 872eacdd3b..9d36682b93 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -148,10 +148,13 @@ mod tests { use std::{ops::Bound, path::Path}; - use crate::scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - ScalarIndex, ScalarQuery, + use crate::{ + scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + ScalarIndex, ScalarQuery, + }, + util::datafusion::reader_to_stream, }; use super::*; @@ -162,12 +165,9 @@ mod tests { }; use arrow_schema::{DataType, Field}; use arrow_select::take::TakeOptions; + use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::ScalarValue; - use futures::{ - stream::{self, BoxStream}, - StreamExt, - }; - use lance_core::{io::object_store::ObjectStoreParams, Error}; + use lance_core::io::object_store::ObjectStoreParams; use lance_datagen::{array, gen, BatchCount, RowCount}; use tempfile::{tempdir, TempDir}; @@ -181,23 +181,25 @@ mod tests { Arc::new(LanceIndexStore::new(object_store, test_path.to_owned())) } - struct MockTrainingSource { - data: R, + struct MockTrainingSource { + data: SendableRecordBatchStream, } - impl MockTrainingSource { - fn new(data: R) -> Self { - Self { data } + impl MockTrainingSource { + fn new(data: impl RecordBatchReader + Send + 'static) -> Self { + Self { + data: reader_to_stream(Box::new(data)), + } } } #[async_trait] - impl BtreeTrainingSource for MockTrainingSource { + impl BtreeTrainingSource for MockTrainingSource { async fn scan_ordered_chunks( self: Box, _chunk_size: u32, - ) -> Result>> { - Ok(stream::iter(self.data.map(|batch| batch.map_err(Error::from))).boxed()) + ) -> Result { + Ok(self.data) } } @@ -254,6 +256,45 @@ mod tests { assert_eq!(100, row_ids.len()); } + #[tokio::test] + async fn test_btree_update() { + let index_dir = tempdir().unwrap(); + let index_store = test_store(&index_dir); + let data = gen() + .col(Some("values".to_string()), array::step::()) + .col(Some("row_ids".to_string()), array::step::()) + .into_reader_rows(RowCount::from(4096), BatchCount::from(100)); + train_index(&index_store, data, DataType::Int32).await; + let index = BTreeIndex::load(index_store).await.unwrap(); + + let data = gen() + .col(Some("values".to_string()), array::step::()) + .col(Some("row_ids".to_string()), array::step::()) + .into_reader_rows(RowCount::from(4096), BatchCount::from(100)); + + let updated_index_dir = tempdir().unwrap(); + let updated_index_store = test_store(&updated_index_dir); + index + .update( + reader_to_stream(Box::new(data)), + updated_index_store.as_ref(), + ) + .await + .unwrap(); + let updated_index = BTreeIndex::load(updated_index_store).await.unwrap(); + + let row_ids = updated_index + .search(&ScalarQuery::Equals(ScalarValue::Int32(Some(10000)))) + .await + .unwrap(); + + assert_eq!(2, row_ids.len()); + assert_eq!( + vec![10000, 10000], + row_ids.values().into_iter().copied().collect::>() + ); + } + async fn check(index: &BTreeIndex, query: ScalarQuery, expected: &[u64]) { let results = index.search(&query).await.unwrap(); let expected_arr = UInt64Array::from_iter_values(expected.iter().copied()); diff --git a/rust/lance-index/src/util.rs b/rust/lance-index/src/util.rs index 3400f8f2b3..b82b10f8d3 100644 --- a/rust/lance-index/src/util.rs +++ b/rust/lance-index/src/util.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod chunker; pub mod datafusion; diff --git a/rust/lance/src/dataset/chunker.rs b/rust/lance-index/src/util/chunker.rs similarity index 80% rename from rust/lance/src/dataset/chunker.rs rename to rust/lance-index/src/util/chunker.rs index f988457472..e9499444ec 100644 --- a/rust/lance/src/dataset/chunker.rs +++ b/rust/lance-index/src/util/chunker.rs @@ -15,9 +15,11 @@ use std::collections::VecDeque; use std::pin::Pin; +use arrow::compute::kernels; use arrow_array::RecordBatch; -use datafusion::physical_plan::SendableRecordBatchStream; -use futures::{Stream, StreamExt}; +use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; +use datafusion_common::DataFusionError; +use futures::{Stream, StreamExt, TryStreamExt}; use crate::Result; @@ -122,3 +124,23 @@ pub fn chunk_stream( }) .boxed() } + +pub fn chunk_concat_stream( + stream: SendableRecordBatchStream, + chunk_size: usize, +) -> SendableRecordBatchStream { + let schema = stream.schema().clone(); + let schema_copy = schema.clone(); + let chunked = chunk_stream(stream, chunk_size); + let chunk_concat = chunked + .and_then(move |batches| { + std::future::ready( + // chunk_stream is zero-copy and so it gives us pieces of batches. However, the btree + // index needs 1 batch-per-page and so we concatenate here. + kernels::concat::concat_batches(&schema, batches.iter()).map_err(|e| e.into()), + ) + }) + .map_err(DataFusionError::from) + .boxed(); + Box::pin(RecordBatchStreamAdapter::new(schema_copy, chunk_concat)) +} diff --git a/rust/lance-index/src/util/datafusion.rs b/rust/lance-index/src/util/datafusion.rs index 188b232365..ce5240368d 100644 --- a/rust/lance-index/src/util/datafusion.rs +++ b/rust/lance-index/src/util/datafusion.rs @@ -12,8 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_schema::DataType; -use datafusion_common::ScalarValue; +use std::{ + cell::RefCell, + sync::{Arc, Mutex}, +}; + +use arrow_array::RecordBatchReader; +use arrow_schema::{DataType, Schema}; +use datafusion::{ + execution::{ + context::SessionState, + runtime_env::{RuntimeConfig, RuntimeEnv}, + }, + physical_plan::{ + stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, + SendableRecordBatchStream, + }, + prelude::SessionConfig, +}; +use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_physical_expr::Partitioning; + +use futures::{stream, TryStreamExt}; +use lance_arrow::SchemaExt; +use lance_core::Result; // This is slightly tedious but when we convert expressions from SQL strings to logical // datafusion expressions there is no type coercion that happens. In other words "x = 7" @@ -242,3 +264,125 @@ pub fn safe_coerce_scalar(value: &ScalarValue, ty: &DataType) -> Option None, } } + +pub struct OneShotExec { + stream: Mutex>>, + // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as + // can still function after exhuasted + schema: Arc, +} + +impl OneShotExec { + pub fn new(stream: SendableRecordBatchStream) -> Self { + let schema = stream.schema().clone(); + Self { + stream: Mutex::new(RefCell::new(Some(stream))), + schema, + } + } +} + +impl std::fmt::Debug for OneShotExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let val_guard = self.stream.lock().unwrap(); + let stream = val_guard.borrow(); + f.debug_struct("OneShotExec") + .field("exhausted", &stream.is_none()) + .field("schema", self.schema.as_ref()) + .finish() + } +} + +impl DisplayAs for OneShotExec { + fn fmt_as( + &self, + t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + let val_guard = self.stream.lock().unwrap(); + let stream = val_guard.borrow(); + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let exhausted = if stream.is_some() { "" } else { "EXHUASTED" }; + let columns = self + .schema + .field_names() + .iter() + .map(|s| s.to_string()) + .collect::>(); + write!( + f, + "OneShotStream: {} columns=[{}]", + exhausted, + columns.join(",") + ) + } + } + } +} + +/// Convert a sendable RecordBatchReader to a sendable RecordBatchStream +pub fn reader_to_stream(reader: Box) -> SendableRecordBatchStream { + let schema = reader.schema().clone(); + let stream = stream::iter(reader).map_err(DataFusionError::from); + Box::pin(RecordBatchStreamAdapter::new(schema, stream)) +} + +impl ExecutionPlan for OneShotExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow_schema::SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning { + Partitioning::RoundRobinBatch(1) + } + + fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + todo!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion_common::Result { + let mut val_guard = self.stream.lock().unwrap(); + let stream = val_guard.get_mut(); + let stream = stream.take(); + if let Some(stream) = stream { + Ok(stream) + } else { + panic!("Attempt to use OneShotExec more than once"); + } + } + + fn statistics(&self) -> datafusion_common::Statistics { + todo!() + } +} + +pub fn execute_plan(plan: Arc) -> Result { + let session_config = SessionConfig::new(); + let runtime_config = RuntimeConfig::new(); + let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?); + let session_state = SessionState::new_with_config_rt(session_config, runtime_env); + // NOTE: we are only executing the first partition here. Therefore, if + // the plan has more than one partition, we will be missing data. + assert_eq!(plan.output_partitioning().partition_count(), 1); + Ok(plan.execute(0, session_state.task_ctx())?) +} diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 353a8adcc5..419e95e7e9 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -59,9 +59,7 @@ arrow = { version = "47.0.0", features = ["prettyprint"] } nohash-hasher.workspace = true num_cpus.workspace = true # TODO: use datafusion sub-modules to reduce build size? -datafusion = { version = "32.0.0", default-features = false, features = [ - "regex_expressions", -] } +datafusion.workspace = true lapack = { version = "0.19.0", optional = true } cblas = { version = "0.4.0", optional = true } lru_time_cache = "0.11" diff --git a/rust/lance/benches/scalar_index.rs b/rust/lance/benches/scalar_index.rs index 5ff313b1db..870d2c4740 100644 --- a/rust/lance/benches/scalar_index.rs +++ b/rust/lance/benches/scalar_index.rs @@ -2,26 +2,26 @@ use std::sync::Arc; use arrow_array::{ types::{UInt32Type, UInt64Type}, - RecordBatch, RecordBatchReader, + RecordBatchReader, }; use async_trait::async_trait; use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::scalar::ScalarValue; -use futures::{ - stream::{self, BoxStream}, - StreamExt, TryStreamExt, -}; +use datafusion::{physical_plan::SendableRecordBatchStream, scalar::ScalarValue}; +use futures::TryStreamExt; use lance::{ io::{object_store::ObjectStoreParams, ObjectStore}, Dataset, }; -use lance_core::{Error, Result}; +use lance_core::Result; use lance_datagen::{array, gen, BatchCount, RowCount}; -use lance_index::scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - lance_format::LanceIndexStore, - IndexStore, ScalarIndex, ScalarQuery, +use lance_index::{ + scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + lance_format::LanceIndexStore, + IndexStore, ScalarIndex, ScalarQuery, + }, + util::datafusion::reader_to_stream, }; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; @@ -49,8 +49,8 @@ impl BtreeTrainingSource for BenchmarkDataSource { async fn scan_ordered_chunks( self: Box, _chunk_size: u32, - ) -> Result>> { - Ok(stream::iter(Self::test_data().map(|batch| batch.map_err(Error::from))).boxed()) + ) -> Result { + Ok(reader_to_stream(Box::new(Self::test_data()))) } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3ed0697cc1..5742183931 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -48,8 +48,6 @@ use std::sync::Arc; use tracing::instrument; pub mod builder; -pub(crate) mod chunker; - pub mod cleanup; mod feature_flags; pub mod fragment; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 1e874cc40c..c622e39375 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -33,11 +33,11 @@ use lance_core::{ }, Error, Result, ROW_ID, }; +use lance_index::util::chunker::chunk_stream; use object_store::path::Path; use snafu::{location, Location}; use uuid::Uuid; -use super::chunker::chunk_stream; use super::hash_joiner::HashJoiner; use super::scanner::Scanner; use super::updater::Updater; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 75e6dcb023..1e89912f44 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -19,10 +19,6 @@ use std::task::{Context, Poll}; use arrow_array::{Array, Float32Array, Int64Array, RecordBatch}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, SortOptions}; use async_recursion::async_recursion; -use datafusion::execution::{ - context::SessionState, - runtime_env::{RuntimeConfig, RuntimeEnv}, -}; use datafusion::logical_expr::AggregateFunction; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::expressions; @@ -37,11 +33,11 @@ use datafusion::physical_plan::{ union::UnionExec, ExecutionPlan, SendableRecordBatchStream, }; -use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use futures::stream::{Stream, StreamExt}; use lance_core::ROW_ID_FIELD; use lance_index::scalar::expression::ScalarIndexExpr; +use lance_index::util::datafusion::execute_plan; use lance_index::vector::{Query, DIST_COL}; use lance_linalg::distance::MetricType; use log::debug; @@ -489,22 +485,16 @@ impl Scanner { Ok(Arc::new(schema)) } - fn execute_plan(&self, plan: Arc) -> Result { - let session_config = SessionConfig::new(); - let runtime_config = RuntimeConfig::new(); - let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?); - let session_state = SessionState::new_with_config_rt(session_config, runtime_env); - // NOTE: we are only executing the first partition here. Therefore, if - // the plan has more than one partition, we will be missing data. - assert_eq!(plan.output_partitioning().partition_count(), 1); - Ok(plan.execute(0, session_state.task_ctx())?) - } - /// Create a stream from the Scanner. #[instrument(skip_all)] pub async fn try_into_stream(&self) -> Result { let plan = self.create_plan().await?; - Ok(DatasetRecordBatchStream::new(self.execute_plan(plan)?)) + Ok(DatasetRecordBatchStream::new(execute_plan(plan)?)) + } + + pub(crate) async fn try_into_dfstream(&self) -> Result { + let plan = self.create_plan().await?; + execute_plan(plan) } /// Scan and return the number of matching rows @@ -531,7 +521,7 @@ impl Scanner { plan, plan_schema, )?); - let mut stream = self.execute_plan(count_plan)?; + let mut stream = execute_plan(count_plan)?; // A count plan will always return a single batch with a single row. if let Some(first_batch) = stream.next().await { @@ -2736,10 +2726,17 @@ mod test { dataset: Dataset, sample_query: Arc, delete_query: Arc, + // The original version of the data, two fragments, rows 0-1000 original_version: u64, + // The original version of the data, 1 row deleted, compacted to a single fragment compact_version: u64, + // The original version of the data + an extra 1000 unindexed append_version: u64, + // The original version of the data + an extra 1000 rows, with indices updated so all rows indexed + updated_version: u64, + // The original version of the data with 1 deleted row delete_version: u64, + // The original version of the data + an extra 1000 uindexed + 1 deleted row append_then_delete_version: u64, } @@ -2751,6 +2748,7 @@ mod test { use_new_data: bool, with_row_id: bool, use_compaction: bool, + use_updated: bool, } impl ScalarIndexTestFixture { @@ -2835,8 +2833,16 @@ mod test { let append_version = dataset.version().version; + // UPDATE + + dataset.optimize_indices().await.unwrap(); + let updated_version = dataset.version().version; + // APPEND -> DELETE + dataset.checkout_version(append_version).await.unwrap(); + dataset.restore(None).await.unwrap(); + dataset.delete("not_indexed = 75").await.unwrap(); let append_then_delete_version = dataset.version().version; @@ -2867,6 +2873,7 @@ mod test { original_version, compact_version, append_version, + updated_version, delete_version, append_then_delete_version, } @@ -2882,11 +2889,23 @@ mod test { async fn get_dataset(&self, params: &ScalarTestParams) -> Dataset { let version = if params.use_compaction { - if params.use_deleted_data || params.use_new_data { - panic!("There is no test data combining new/deleted data with compaction"); + // These combinations should not be possible + if params.use_deleted_data || params.use_new_data || params.use_updated { + panic!( + "There is no test data combining new/deleted/updated data with compaction" + ); } else { self.compact_version } + } else if params.use_updated { + // These combinations should not be possible + if params.use_deleted_data || params.use_new_data || params.use_compaction { + panic!( + "There is no test data combining updated data with new/deleted/compaction" + ); + } else { + self.updated_version + } } else { match (params.use_new_data, params.use_deleted_data) { (false, false) => self.original_version, @@ -3014,7 +3033,7 @@ mod test { ); } // If there is new data then the dupe of row 50 should be in the results - if params.use_new_data { + if params.use_new_data || params.use_updated { self.assert_one( &batch, |val| val == 1050, @@ -3088,14 +3107,18 @@ mod test { |val| val == 50, "The query contained 50 even though it was filtered", ); - let mut expected_num_rows = if params.use_new_data { 1999 } else { 999 }; + let mut expected_num_rows = if params.use_new_data || params.use_updated { + 1999 + } else { + 999 + }; if params.use_deleted_data || params.use_compaction { expected_num_rows -= 1; } assert_eq!(batch.num_rows(), expected_num_rows); // Let's also make sure our filter can target something in the new data only - if params.use_new_data { + if params.use_new_data || params.use_updated { let (_, batch) = self.run_query("indexed == 1050", None, params).await; assert_eq!(batch.num_rows(), 1); } @@ -3131,7 +3154,11 @@ mod test { "The non-indexed refine filter was not applied", ); - let mut expected_num_rows = if params.use_new_data { 199 } else { 99 }; + let mut expected_num_rows = if params.use_new_data || params.use_updated { + 199 + } else { + 99 + }; if params.use_deleted_data || params.use_compaction { expected_num_rows -= 1; } @@ -3164,17 +3191,26 @@ mod test { vec![false, true] }; for use_compaction in compaction_choices { - for with_row_id in [false, true] { - let params = ScalarTestParams { - use_index, - use_projection, - use_deleted_data, - use_new_data, - with_row_id, - use_compaction, + let updated_choices = + if use_deleted_data || use_new_data || use_compaction { + vec![false] + } else { + vec![false, true] }; - fixture.check_vector_queries(¶ms).await; - fixture.check_simple_queries(¶ms).await; + for use_updated in updated_choices { + for with_row_id in [false, true] { + let params = ScalarTestParams { + use_index, + use_projection, + use_deleted_data, + use_new_data, + with_row_id, + use_compaction, + use_updated, + }; + fixture.check_vector_queries(¶ms).await; + fixture.check_simple_queries(¶ms).await; + } } } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index cf4478018c..026789dd8e 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -31,12 +31,13 @@ use lance_core::{ }, Error, Result, }; +use lance_index::util::chunker::chunk_stream; use object_store::path::Path; use tracing::instrument; use uuid::Uuid; use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress}; -use super::{chunker::chunk_stream, DATA_DIR}; +use super::DATA_DIR; /// The mode to write dataset. #[derive(Debug, Clone, Copy)] diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 9aa7be08e1..b23584cb9f 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -15,12 +15,15 @@ use std::sync::Arc; use lance_core::{format::Index as IndexMetadata, Error, Result}; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_index::IndexType; use log::info; use roaring::RoaringBitmap; use snafu::{location, Location}; use uuid::Uuid; use crate::dataset::index::unindexed_fragments; +use crate::dataset::scanner::ColumnOrdering; use crate::dataset::Dataset; use crate::index::vector::ivf::IVFIndex; @@ -56,25 +59,56 @@ pub async fn append_index( })?; let index = dataset - .open_vector_index(&column.name, old_index.uuid.to_string().as_str()) + .open_generic_index(&column.name, &old_index.uuid.to_string()) .await?; - let Some(ivf_idx) = index.as_any().downcast_ref::() else { - info!("Index type: {:?} does not support append", index); - return Ok(None); - }; - - let mut scanner = dataset.scan(); - scanner.with_fragments(unindexed); - scanner.with_row_id(); - scanner.project(&[&column.name])?; - let stream = scanner.try_into_stream().await?; - - let new_index = ivf_idx - .append(dataset.as_ref(), stream, old_index, &column.name) - .await?; - - Ok(Some((new_index, frag_bitmap))) + match index.index_type() { + IndexType::Scalar => { + let index = dataset + .open_scalar_index(&column.name, &old_index.uuid.to_string()) + .await?; + + let mut scanner = dataset.scan(); + scanner + .with_fragments(unindexed) + .with_row_id() + .order_by(Some(vec![ColumnOrdering::asc_nulls_first( + column.name.clone(), + )]))? + .project(&[&column.name])?; + let new_data_stream = scanner.try_into_stream().await?; + + let new_uuid = Uuid::new_v4(); + + let index_dir = dataset.indices_dir().child(new_uuid.to_string()); + let new_store = LanceIndexStore::new((*dataset.object_store).clone(), index_dir); + + index.update(new_data_stream.into(), &new_store).await?; + + Ok(Some((new_uuid, frag_bitmap))) + } + IndexType::Vector => { + let mut scanner = dataset.scan(); + scanner.with_fragments(unindexed); + scanner.with_row_id(); + let new_data_stream = scanner.try_into_stream().await?; + + let index = dataset + .open_vector_index(&column.name, old_index.uuid.to_string().as_str()) + .await?; + + let Some(ivf_idx) = index.as_any().downcast_ref::() else { + info!("Index type: {:?} does not support append", index); + return Ok(None); + }; + + let new_index = ivf_idx + .append(dataset.as_ref(), new_data_stream, old_index, &column.name) + .await?; + + Ok(Some((new_index, frag_bitmap))) + } + } } #[cfg(test)] diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 139c3d3e05..96cfd53c0d 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -15,27 +15,25 @@ //! Utilities for integrating scalar indices with datasets //! -use std::{future, sync::Arc}; +use std::sync::Arc; -use arrow::compute::kernels; -use arrow_array::RecordBatch; use async_trait::async_trait; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use lance_index::scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - lance_format::LanceIndexStore, - ScalarIndex, +use datafusion::physical_plan::SendableRecordBatchStream; +use lance_index::{ + scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + lance_format::LanceIndexStore, + ScalarIndex, + }, + util::chunker::chunk_concat_stream, }; use snafu::{location, Location}; use tracing::instrument; use lance_core::{Error, Result}; -use crate::{ - dataset::{chunker, scanner::ColumnOrdering}, - Dataset, -}; +use crate::{dataset::scanner::ColumnOrdering, Dataset}; use super::IndexParams; @@ -58,7 +56,7 @@ impl BtreeTrainingSource for TrainingRequest { async fn scan_ordered_chunks( self: Box, chunk_size: u32, - ) -> Result>> { + ) -> Result { let mut scan = self.dataset.scan(); let scan = scan .with_row_id() @@ -67,18 +65,8 @@ impl BtreeTrainingSource for TrainingRequest { )]))? .project(&[&self.column])?; - let schema = scan.schema()?; - let ordered_batches = scan.try_into_stream().await?; - let chunked = chunker::chunk_stream(ordered_batches.into(), chunk_size as usize); - Ok(chunked - .and_then(move |batches| { - future::ready( - // chunk_stream is zero-copy and so it gives us pieces of batches. However, the btree - // index needs 1 batch-per-page and so we concatenate here. - kernels::concat::concat_batches(&schema, batches.iter()).map_err(|e| e.into()), - ) - }) - .boxed()) + let ordered_batches = scan.try_into_dfstream().await?; + Ok(chunk_concat_stream(ordered_batches, chunk_size as usize)) } } From 47c51376d2b62313eb70d836c349adc7994f5641 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 14 Nov 2023 10:04:11 -0800 Subject: [PATCH 2/4] Refactor datafusion utilities into a separate crate, rebase --- rust/Cargo.toml | 1 + rust/lance-datafusion/Cargo.toml | 22 +++ .../util => lance-datafusion/src}/chunker.rs | 2 +- rust/lance-datafusion/src/exec.rs | 185 ++++++++++++++++++ .../src/expr.rs} | 148 +------------- rust/lance-datafusion/src/lib.rs | 3 + rust/lance-index/Cargo.toml | 1 + rust/lance-index/src/lib.rs | 1 - rust/lance-index/src/scalar/btree.rs | 12 +- rust/lance-index/src/scalar/expression.rs | 3 +- rust/lance-index/src/scalar/lance_format.rs | 16 +- rust/lance-index/src/util.rs | 16 -- rust/lance/Cargo.toml | 1 + rust/lance/benches/scalar_index.rs | 16 +- rust/lance/src/datafusion/logical_expr.rs | 2 +- rust/lance/src/dataset/fragment.rs | 2 +- rust/lance/src/dataset/scanner.rs | 2 +- rust/lance/src/dataset/write.rs | 2 +- rust/lance/src/index/append.rs | 1 + rust/lance/src/index/scalar.rs | 14 +- 20 files changed, 248 insertions(+), 202 deletions(-) create mode 100644 rust/lance-datafusion/Cargo.toml rename rust/{lance-index/src/util => lance-datafusion/src}/chunker.rs (99%) create mode 100644 rust/lance-datafusion/src/exec.rs rename rust/{lance-index/src/util/datafusion.rs => lance-datafusion/src/expr.rs} (72%) create mode 100644 rust/lance-datafusion/src/lib.rs delete mode 100644 rust/lance-index/src/util.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 384a50fbf1..073d712e82 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -39,6 +39,7 @@ rust-version = "1.70" lance = { version = "=0.8.16", path = "./lance" } lance-arrow = { version = "=0.8.16", path = "./lance-arrow" } lance-core = { version = "=0.8.16", path = "./lance-core" } +lance-datafusion = { version = "=0.8.16", path = "./lance-datafusion" } lance-datagen = { version = "=0.8.16", path = "./lance-datagen" } lance-index = { version = "=0.8.16", path = "./lance-index" } lance-linalg = { version = "=0.8.16", path = "./lance-linalg" } diff --git a/rust/lance-datafusion/Cargo.toml b/rust/lance-datafusion/Cargo.toml new file mode 100644 index 0000000000..1ed27a6d1d --- /dev/null +++ b/rust/lance-datafusion/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "lance-datafusion" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +keywords.workspace = true +categories.workspace = true +description = "Internal utilities used by other lance modules to simplify working with datafusion" + +[dependencies] +arrow.workspace = true +arrow-array.workspace = true +arrow-schema.workspace = true +datafusion.workspace = true +datafusion-common.workspace = true +datafusion-expr = "32.0.0" +datafusion-physical-expr = { version = "32.0.0", default-features = false } +futures.workspace = true +lance-arrow.workspace = true +lance-core.workspace = true diff --git a/rust/lance-index/src/util/chunker.rs b/rust/lance-datafusion/src/chunker.rs similarity index 99% rename from rust/lance-index/src/util/chunker.rs rename to rust/lance-datafusion/src/chunker.rs index e9499444ec..5bcfb06044 100644 --- a/rust/lance-index/src/util/chunker.rs +++ b/rust/lance-datafusion/src/chunker.rs @@ -21,7 +21,7 @@ use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecord use datafusion_common::DataFusionError; use futures::{Stream, StreamExt, TryStreamExt}; -use crate::Result; +use lance_core::Result; /// Wraps a [`SendableRecordBatchStream`] into a stream of RecordBatch chunks of /// a given size. This slices but does not copy any buffers. diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs new file mode 100644 index 0000000000..14e30f37c0 --- /dev/null +++ b/rust/lance-datafusion/src/exec.rs @@ -0,0 +1,185 @@ +// Copyright 2023 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for working with datafusion execution plans + +use std::{ + cell::RefCell, + sync::{Arc, Mutex}, +}; + +use arrow_array::RecordBatchReader; +use arrow_schema::Schema as ArrowSchema; +use datafusion::{ + execution::{ + context::{SessionConfig, SessionState}, + runtime_env::{RuntimeConfig, RuntimeEnv}, + }, + physical_plan::{ + stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, + SendableRecordBatchStream, + }, +}; +use datafusion_common::DataFusionError; +use datafusion_physical_expr::Partitioning; +use futures::TryStreamExt; + +use lance_arrow::SchemaExt; +use lance_core::{datatypes::Schema, Error, Result}; + +pub struct OneShotExec { + stream: Mutex>>, + // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as + // can still function after exhuasted + schema: Arc, +} + +impl OneShotExec { + pub fn new(stream: SendableRecordBatchStream) -> Self { + let schema = stream.schema().clone(); + Self { + stream: Mutex::new(RefCell::new(Some(stream))), + schema, + } + } +} + +impl std::fmt::Debug for OneShotExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let val_guard = self.stream.lock().unwrap(); + let stream = val_guard.borrow(); + f.debug_struct("OneShotExec") + .field("exhausted", &stream.is_none()) + .field("schema", self.schema.as_ref()) + .finish() + } +} + +impl DisplayAs for OneShotExec { + fn fmt_as( + &self, + t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + let val_guard = self.stream.lock().unwrap(); + let stream = val_guard.borrow(); + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let exhausted = if stream.is_some() { "" } else { "EXHUASTED" }; + let columns = self + .schema + .field_names() + .iter() + .map(|s| s.to_string()) + .collect::>(); + write!( + f, + "OneShotStream: {} columns=[{}]", + exhausted, + columns.join(",") + ) + } + } + } +} + +/// Convert reader to a stream and a schema. +/// +/// Will peek the first batch to get the dictionaries for dictionary columns. +/// +/// NOTE: this does not validate the schema. For example, for appends the schema +/// should be checked to make sure it matches the existing dataset schema before +/// writing. +pub fn reader_to_stream( + batches: Box, +) -> Result<(SendableRecordBatchStream, Schema)> { + let arrow_schema = batches.schema(); + let mut schema: Schema = Schema::try_from(batches.schema().as_ref())?; + let mut peekable = batches.peekable(); + if let Some(batch) = peekable.peek() { + if let Ok(b) = batch { + schema.set_dictionary(b)?; + } else { + return Err(Error::from(batch.as_ref().unwrap_err())); + } + } + schema.validate()?; + + let stream = RecordBatchStreamAdapter::new( + arrow_schema, + futures::stream::iter(peekable).map_err(DataFusionError::from), + ); + let stream = Box::pin(stream) as SendableRecordBatchStream; + + Ok((stream, schema)) +} + +impl ExecutionPlan for OneShotExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow_schema::SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning { + Partitioning::RoundRobinBatch(1) + } + + fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + todo!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion_common::Result { + let mut val_guard = self.stream.lock().unwrap(); + let stream = val_guard.get_mut(); + let stream = stream.take(); + if let Some(stream) = stream { + Ok(stream) + } else { + panic!("Attempt to use OneShotExec more than once"); + } + } + + fn statistics(&self) -> datafusion_common::Statistics { + todo!() + } +} + +pub fn execute_plan(plan: Arc) -> Result { + let session_config = SessionConfig::new(); + let runtime_config = RuntimeConfig::new(); + let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?); + let session_state = SessionState::new_with_config_rt(session_config, runtime_env); + // NOTE: we are only executing the first partition here. Therefore, if + // the plan has more than one partition, we will be missing data. + assert_eq!(plan.output_partitioning().partition_count(), 1); + Ok(plan.execute(0, session_state.task_ctx())?) +} diff --git a/rust/lance-index/src/util/datafusion.rs b/rust/lance-datafusion/src/expr.rs similarity index 72% rename from rust/lance-index/src/util/datafusion.rs rename to rust/lance-datafusion/src/expr.rs index ce5240368d..ed71f9fc9f 100644 --- a/rust/lance-index/src/util/datafusion.rs +++ b/rust/lance-datafusion/src/expr.rs @@ -12,30 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - cell::RefCell, - sync::{Arc, Mutex}, -}; +//! Utilities for working with datafusion expressions -use arrow_array::RecordBatchReader; -use arrow_schema::{DataType, Schema}; -use datafusion::{ - execution::{ - context::SessionState, - runtime_env::{RuntimeConfig, RuntimeEnv}, - }, - physical_plan::{ - stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, - SendableRecordBatchStream, - }, - prelude::SessionConfig, -}; -use datafusion_common::{DataFusionError, ScalarValue}; -use datafusion_physical_expr::Partitioning; - -use futures::{stream, TryStreamExt}; -use lance_arrow::SchemaExt; -use lance_core::Result; +use arrow_schema::DataType; +use datafusion_common::ScalarValue; // This is slightly tedious but when we convert expressions from SQL strings to logical // datafusion expressions there is no type coercion that happens. In other words "x = 7" @@ -264,125 +244,3 @@ pub fn safe_coerce_scalar(value: &ScalarValue, ty: &DataType) -> Option None, } } - -pub struct OneShotExec { - stream: Mutex>>, - // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as - // can still function after exhuasted - schema: Arc, -} - -impl OneShotExec { - pub fn new(stream: SendableRecordBatchStream) -> Self { - let schema = stream.schema().clone(); - Self { - stream: Mutex::new(RefCell::new(Some(stream))), - schema, - } - } -} - -impl std::fmt::Debug for OneShotExec { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let val_guard = self.stream.lock().unwrap(); - let stream = val_guard.borrow(); - f.debug_struct("OneShotExec") - .field("exhausted", &stream.is_none()) - .field("schema", self.schema.as_ref()) - .finish() - } -} - -impl DisplayAs for OneShotExec { - fn fmt_as( - &self, - t: datafusion::physical_plan::DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - let val_guard = self.stream.lock().unwrap(); - let stream = val_guard.borrow(); - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let exhausted = if stream.is_some() { "" } else { "EXHUASTED" }; - let columns = self - .schema - .field_names() - .iter() - .map(|s| s.to_string()) - .collect::>(); - write!( - f, - "OneShotStream: {} columns=[{}]", - exhausted, - columns.join(",") - ) - } - } - } -} - -/// Convert a sendable RecordBatchReader to a sendable RecordBatchStream -pub fn reader_to_stream(reader: Box) -> SendableRecordBatchStream { - let schema = reader.schema().clone(); - let stream = stream::iter(reader).map_err(DataFusionError::from); - Box::pin(RecordBatchStreamAdapter::new(schema, stream)) -} - -impl ExecutionPlan for OneShotExec { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn schema(&self) -> arrow_schema::SchemaRef { - self.schema.clone() - } - - fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning { - Partitioning::RoundRobinBatch(1) - } - - fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { - None - } - - fn children(&self) -> Vec> { - vec![] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> datafusion_common::Result> { - todo!() - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> datafusion_common::Result { - let mut val_guard = self.stream.lock().unwrap(); - let stream = val_guard.get_mut(); - let stream = stream.take(); - if let Some(stream) = stream { - Ok(stream) - } else { - panic!("Attempt to use OneShotExec more than once"); - } - } - - fn statistics(&self) -> datafusion_common::Statistics { - todo!() - } -} - -pub fn execute_plan(plan: Arc) -> Result { - let session_config = SessionConfig::new(); - let runtime_config = RuntimeConfig::new(); - let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?); - let session_state = SessionState::new_with_config_rt(session_config, runtime_env); - // NOTE: we are only executing the first partition here. Therefore, if - // the plan has more than one partition, we will be missing data. - assert_eq!(plan.output_partitioning().partition_count(), 1); - Ok(plan.execute(0, session_state.task_ctx())?) -} diff --git a/rust/lance-datafusion/src/lib.rs b/rust/lance-datafusion/src/lib.rs new file mode 100644 index 0000000000..2a96b8fff6 --- /dev/null +++ b/rust/lance-datafusion/src/lib.rs @@ -0,0 +1,3 @@ +pub mod chunker; +pub mod exec; +pub mod expr; diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index bc01df35ac..c8d55f7c4b 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -28,6 +28,7 @@ futures.workspace = true half.workspace = true lance-arrow.workspace = true lance-core.workspace = true +lance-datafusion.workspace = true lance-linalg.workspace = true log.workspace = true nohash-hasher.workspace = true diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index 5d756fdf7e..031afb6f08 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -21,7 +21,6 @@ use lance_core::Result; use roaring::RoaringBitmap; pub mod scalar; -pub mod util; pub mod vector; pub const INDEX_FILE_NAME: &str = "index.idx"; diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 905c7d55ff..1af3c90093 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -40,18 +40,16 @@ use futures::{ FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, }; use lance_core::{Error, Result}; +use lance_datafusion::{ + chunker::chunk_concat_stream, + exec::{execute_plan, OneShotExec}, +}; use nohash_hasher::IntMap; use roaring::RoaringBitmap; use serde::{Serialize, Serializer}; use snafu::{location, Location}; -use crate::{ - util::{ - chunker::chunk_concat_stream, - datafusion::{execute_plan, OneShotExec}, - }, - Index, IndexType, -}; +use crate::{Index, IndexType}; use super::{ flat::FlatIndexMetadata, IndexReader, IndexStore, IndexWriter, ScalarIndex, ScalarQuery, diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 540b4e2e04..f5301b94ac 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -22,10 +22,9 @@ use datafusion_expr::{expr::InList, Between, BinaryExpr, Expr, Operator}; use futures::join; use lance_core::{utils::mask::RowIdMask, Result}; +use lance_datafusion::expr::safe_coerce_scalar; use roaring::RoaringTreemap; -use crate::util::datafusion::safe_coerce_scalar; - use super::{ScalarIndex, ScalarQuery}; /// An indexed expression consists of a scalar index query with a post-scan filter diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 9d36682b93..88ff736fd5 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -148,13 +148,10 @@ mod tests { use std::{ops::Bound, path::Path}; - use crate::{ - scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - ScalarIndex, ScalarQuery, - }, - util::datafusion::reader_to_stream, + use crate::scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + ScalarIndex, ScalarQuery, }; use super::*; @@ -168,6 +165,7 @@ mod tests { use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_common::ScalarValue; use lance_core::io::object_store::ObjectStoreParams; + use lance_datafusion::exec::reader_to_stream; use lance_datagen::{array, gen, BatchCount, RowCount}; use tempfile::{tempdir, TempDir}; @@ -188,7 +186,7 @@ mod tests { impl MockTrainingSource { fn new(data: impl RecordBatchReader + Send + 'static) -> Self { Self { - data: reader_to_stream(Box::new(data)), + data: reader_to_stream(Box::new(data)).unwrap().0, } } } @@ -276,7 +274,7 @@ mod tests { let updated_index_store = test_store(&updated_index_dir); index .update( - reader_to_stream(Box::new(data)), + reader_to_stream(Box::new(data)).unwrap().0, updated_index_store.as_ref(), ) .await diff --git a/rust/lance-index/src/util.rs b/rust/lance-index/src/util.rs deleted file mode 100644 index b82b10f8d3..0000000000 --- a/rust/lance-index/src/util.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2023 Lance Developers. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod chunker; -pub mod datafusion; diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 419e95e7e9..2c050cc505 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -18,6 +18,7 @@ no-default-features = true [dependencies] lance-arrow = { workspace = true } lance-core = { workspace = true } +lance-datafusion = { workspace = true } lance-datagen = { workspace = true } lance-linalg = { workspace = true } lance-index = { workspace = true } diff --git a/rust/lance/benches/scalar_index.rs b/rust/lance/benches/scalar_index.rs index 870d2c4740..77d76e0748 100644 --- a/rust/lance/benches/scalar_index.rs +++ b/rust/lance/benches/scalar_index.rs @@ -13,15 +13,13 @@ use lance::{ Dataset, }; use lance_core::Result; +use lance_datafusion::exec::reader_to_stream; use lance_datagen::{array, gen, BatchCount, RowCount}; -use lance_index::{ - scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - lance_format::LanceIndexStore, - IndexStore, ScalarIndex, ScalarQuery, - }, - util::datafusion::reader_to_stream, +use lance_index::scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + lance_format::LanceIndexStore, + IndexStore, ScalarIndex, ScalarQuery, }; #[cfg(target_os = "linux")] use pprof::criterion::{Output, PProfProfiler}; @@ -50,7 +48,7 @@ impl BtreeTrainingSource for BenchmarkDataSource { self: Box, _chunk_size: u32, ) -> Result { - Ok(reader_to_stream(Box::new(Self::test_data()))) + Ok(reader_to_stream(Box::new(Self::test_data()))?.0) } } diff --git a/rust/lance/src/datafusion/logical_expr.rs b/rust/lance/src/datafusion/logical_expr.rs index 5c04862de6..4866e7a2be 100644 --- a/rust/lance/src/datafusion/logical_expr.rs +++ b/rust/lance/src/datafusion/logical_expr.rs @@ -23,7 +23,7 @@ use datafusion::logical_expr::{ use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use lance_arrow::DataTypeExt; -use lance_index::util::datafusion::safe_coerce_scalar; +use lance_datafusion::expr::safe_coerce_scalar; use crate::datatypes::Schema; use crate::{Error, Result}; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index c622e39375..66b69a3c5d 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -33,7 +33,7 @@ use lance_core::{ }, Error, Result, ROW_ID, }; -use lance_index::util::chunker::chunk_stream; +use lance_datafusion::chunker::chunk_stream; use object_store::path::Path; use snafu::{location, Location}; use uuid::Uuid; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 1e89912f44..c72bd09b52 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -36,8 +36,8 @@ use datafusion::physical_plan::{ use datafusion::scalar::ScalarValue; use futures::stream::{Stream, StreamExt}; use lance_core::ROW_ID_FIELD; +use lance_datafusion::exec::execute_plan; use lance_index::scalar::expression::ScalarIndexExpr; -use lance_index::util::datafusion::execute_plan; use lance_index::vector::{Query, DIST_COL}; use lance_linalg::distance::MetricType; use log::debug; diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 026789dd8e..a14d412ed4 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -31,7 +31,7 @@ use lance_core::{ }, Error, Result, }; -use lance_index::util::chunker::chunk_stream; +use lance_datafusion::chunker::chunk_stream; use object_store::path::Path; use tracing::instrument; use uuid::Uuid; diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index b23584cb9f..2a60a2b9a8 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -91,6 +91,7 @@ pub async fn append_index( let mut scanner = dataset.scan(); scanner.with_fragments(unindexed); scanner.with_row_id(); + scanner.project(&[&column.name])?; let new_data_stream = scanner.try_into_stream().await?; let index = dataset diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 96cfd53c0d..c31b9ffea4 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -19,14 +19,12 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::physical_plan::SendableRecordBatchStream; -use lance_index::{ - scalar::{ - btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, - flat::FlatIndexMetadata, - lance_format::LanceIndexStore, - ScalarIndex, - }, - util::chunker::chunk_concat_stream, +use lance_datafusion::chunker::chunk_concat_stream; +use lance_index::scalar::{ + btree::{train_btree_index, BTreeIndex, BtreeTrainingSource}, + flat::FlatIndexMetadata, + lance_format::LanceIndexStore, + ScalarIndex, }; use snafu::{location, Location}; use tracing::instrument; From e2d04e04a25853e90c7952f6fb5b4fb19080f3c6 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 16 Nov 2023 15:16:45 -0800 Subject: [PATCH 3/4] Switch to using SortPreservingMergeExec for merging the btree streams --- rust/lance-datafusion/src/exec.rs | 74 +++++++++++++++------------- rust/lance-index/src/scalar/btree.rs | 27 +++++----- 2 files changed, 57 insertions(+), 44 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 14e30f37c0..50b2275585 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -38,6 +38,41 @@ use futures::TryStreamExt; use lance_arrow::SchemaExt; use lance_core::{datatypes::Schema, Error, Result}; +/// Convert reader to a stream and a schema. +/// +/// Will peek the first batch to get the dictionaries for dictionary columns. +/// +/// NOTE: this does not validate the schema. For example, for appends the schema +/// should be checked to make sure it matches the existing dataset schema before +/// writing. +pub fn reader_to_stream( + batches: Box, +) -> Result<(SendableRecordBatchStream, Schema)> { + let arrow_schema = batches.schema(); + let mut schema: Schema = Schema::try_from(batches.schema().as_ref())?; + let mut peekable = batches.peekable(); + if let Some(batch) = peekable.peek() { + if let Ok(b) = batch { + schema.set_dictionary(b)?; + } else { + return Err(Error::from(batch.as_ref().unwrap_err())); + } + } + schema.validate()?; + + let stream = RecordBatchStreamAdapter::new( + arrow_schema, + futures::stream::iter(peekable).map_err(DataFusionError::from), + ); + let stream = Box::pin(stream) as SendableRecordBatchStream; + + Ok((stream, schema)) +} + +/// An source execution node created from an existing stream +/// +/// It can only be used once, and will return the stream. After that the node +/// is exhuasted. pub struct OneShotExec { stream: Mutex>>, // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as @@ -46,6 +81,7 @@ pub struct OneShotExec { } impl OneShotExec { + /// Create a new instance from a given stream pub fn new(stream: SendableRecordBatchStream) -> Self { let schema = stream.schema().clone(); Self { @@ -76,7 +112,7 @@ impl DisplayAs for OneShotExec { let stream = val_guard.borrow(); match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - let exhausted = if stream.is_some() { "" } else { "EXHUASTED" }; + let exhausted = if stream.is_some() { "" } else { "EXHUASTED " }; let columns = self .schema .field_names() @@ -85,7 +121,7 @@ impl DisplayAs for OneShotExec { .collect::>(); write!( f, - "OneShotStream: {} columns=[{}]", + "OneShotStream: {}columns=[{}]", exhausted, columns.join(",") ) @@ -94,37 +130,6 @@ impl DisplayAs for OneShotExec { } } -/// Convert reader to a stream and a schema. -/// -/// Will peek the first batch to get the dictionaries for dictionary columns. -/// -/// NOTE: this does not validate the schema. For example, for appends the schema -/// should be checked to make sure it matches the existing dataset schema before -/// writing. -pub fn reader_to_stream( - batches: Box, -) -> Result<(SendableRecordBatchStream, Schema)> { - let arrow_schema = batches.schema(); - let mut schema: Schema = Schema::try_from(batches.schema().as_ref())?; - let mut peekable = batches.peekable(); - if let Some(batch) = peekable.peek() { - if let Ok(b) = batch { - schema.set_dictionary(b)?; - } else { - return Err(Error::from(batch.as_ref().unwrap_err())); - } - } - schema.validate()?; - - let stream = RecordBatchStreamAdapter::new( - arrow_schema, - futures::stream::iter(peekable).map_err(DataFusionError::from), - ); - let stream = Box::pin(stream) as SendableRecordBatchStream; - - Ok((stream, schema)) -} - impl ExecutionPlan for OneShotExec { fn as_any(&self) -> &dyn std::any::Any { self @@ -173,6 +178,9 @@ impl ExecutionPlan for OneShotExec { } } +/// Executes a plan using default session & runtime configuration +/// +/// Only executes a single partition. Panics if the plan has more than one partition. pub fn execute_plan(plan: Arc) -> Result { let session_config = SessionConfig::new(); let runtime_config = RuntimeConfig::new(); diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 1af3c90093..4239fcf557 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -25,8 +25,11 @@ use arrow_array::{Array, RecordBatch, UInt32Array, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SortOptions}; use async_trait::async_trait; use datafusion::physical_plan::{ - repartition::RepartitionExec, sorts::sort::SortExec, stream::RecordBatchStreamAdapter, - union::UnionExec, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, + repartition::RepartitionExec, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, + stream::RecordBatchStreamAdapter, + union::UnionExec, + ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::Accumulator; @@ -735,7 +738,8 @@ impl BTreeIndex { Ok(Self::new(map, null_pages, store, sub_index)) } - async fn into_old_data(self) -> Result { + /// Create a stream of all the data in the index, in the same format used to train the index + async fn into_data_stream(self) -> Result { let reader = self.store.open_index_file(BTREE_PAGES_NAME).await?; let pages = self.page_lookup.all_page_ids(); let schema = self.sub_index.schema().clone(); @@ -907,6 +911,7 @@ impl ScalarIndex for BTreeIndex { new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, ) -> Result<()> { + // Merge the existing index data with the new data and then retrain the index on the merged stream let merged_data_source = Box::new(BTreeUpdater::new(self.clone(), new_data)); train_btree_index(merged_data_source, self.sub_index.as_ref(), dest_store).await } @@ -1078,6 +1083,7 @@ pub async fn train_btree_index( Ok(()) } +/// A source of training data created by merging existing data with new data struct BTreeUpdater { index: BTreeIndex, new_data: SendableRecordBatchStream, @@ -1092,7 +1098,7 @@ impl BTreeUpdater { impl BTreeUpdater { fn into_old_input(index: BTreeIndex) -> Arc { let schema = index.sub_index.schema().clone(); - let batches = index.into_old_data().into_stream().try_flatten().boxed(); + let batches = index.into_data_stream().into_stream().try_flatten().boxed(); let stream = Box::pin(RecordBatchStreamAdapter::new(schema, batches)); Arc::new(OneShotExec::new(stream)) } @@ -1117,19 +1123,18 @@ impl BtreeTrainingSource for BTreeUpdater { nulls_first: true, }, }; - // TODO: Use SortPreservingMergeExec here + // The UnionExec creates multiple partitions but the SortPreservingMergeExec merges + // them back into a single partition. let all_data = Arc::new(UnionExec::new(vec![old_input, new_input])); - // Squash back into one partition - let all_data = Arc::new(RepartitionExec::try_new( - all_data, - datafusion::physical_plan::Partitioning::RoundRobinBatch(1), - )?); - let ordered = Arc::new(SortExec::new(vec![sort_expr], all_data)); + let ordered = Arc::new(SortPreservingMergeExec::new(vec![sort_expr], all_data)); let unchunked = execute_plan(ordered)?; Ok(chunk_concat_stream(unchunked, chunk_size as usize)) } } +/// A stream that reads the original training data back out of the index +/// +/// This is used for updating the index struct IndexReaderStream { reader: Arc, pages: Vec, From cdb2c4ff3360552ffa8d1eb2408208124cccd02b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 16 Nov 2023 15:18:50 -0800 Subject: [PATCH 4/4] Remove unused imports --- rust/lance-index/src/scalar/btree.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 4239fcf557..7316fa570f 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -25,11 +25,8 @@ use arrow_array::{Array, RecordBatch, UInt32Array, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SortOptions}; use async_trait::async_trait; use datafusion::physical_plan::{ - repartition::RepartitionExec, - sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, - stream::RecordBatchStreamAdapter, - union::UnionExec, - ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, + sorts::sort_preserving_merge::SortPreservingMergeExec, stream::RecordBatchStreamAdapter, + union::UnionExec, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::Accumulator;