Skip to content

Commit

Permalink
Allow btree indices to update, similar to vector indices, on a call t…
Browse files Browse the repository at this point in the history
…o optimize indices
  • Loading branch information
westonpace committed Nov 14, 2023
1 parent ccb53c1 commit 11435c6
Show file tree
Hide file tree
Showing 17 changed files with 543 additions and 129 deletions.
3 changes: 3 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ bytes = "1.4"
byteorder = "1.5"
chrono = "0.4.23"
criterion = { version = "0.5", features = ["async", "async_tokio"] }
datafusion = { version = "32.0.0", default-features = false, features = [
"regex_expressions",
] }
datafusion-common = "32.0"
datafusion-sql = "32.0"
either = "1.0"
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ arrow-arith.workspace = true
arrow-select.workspace = true
async-recursion.workspace = true
async-trait.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr = "32.0.0"
datafusion-physical-expr = { version = "32.0.0", default-features = false }
Expand All @@ -33,6 +34,7 @@ nohash-hasher.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
object_store.workspace = true
pin-project.workspace = true
prost.workspace = true
rand.workspace = true
roaring.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{any::Any, ops::Bound, sync::Arc};
use arrow_array::{RecordBatch, UInt64Array};
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::scalar::ScalarValue;

use lance_core::Result;
Expand Down Expand Up @@ -156,4 +157,11 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index {
mapping: &IntMap<u64, Option<u64>>,
dest_store: &dyn IndexStore,
) -> Result<()>;

/// Add the new data into the index, creating an updated version of the index in `dest_store`
async fn update(
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
) -> Result<()>;
}
146 changes: 135 additions & 11 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,35 @@ use std::{
};

use arrow_array::{Array, RecordBatch, UInt32Array, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use arrow_schema::{DataType, Field, Schema, SortOptions};
use async_trait::async_trait;
use datafusion_common::ScalarValue;
use datafusion::physical_plan::{
repartition::RepartitionExec, sorts::sort::SortExec, stream::RecordBatchStreamAdapter,
union::UnionExec, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::Accumulator;
use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::{
expressions::{Column, MaxAccumulator, MinAccumulator},
PhysicalSortExpr,
};
use futures::{
stream::{self, BoxStream},
FutureExt, StreamExt, TryStreamExt,
future::BoxFuture,
stream::{self},
FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use lance_core::{Error, Result};
use nohash_hasher::IntMap;
use serde::{Serialize, Serializer};
use snafu::{location, Location};

use crate::{Index, IndexType};
use crate::{
util::{
chunker::chunk_concat_stream,
datafusion::{execute_plan, OneShotExec},
},
Index, IndexType,
};

use super::{
flat::FlatIndexMetadata, IndexReader, IndexStore, IndexWriter, ScalarIndex, ScalarQuery,
Expand Down Expand Up @@ -636,9 +650,9 @@ impl BTreeLookup {
///
/// Note: this is very similar to the IVF index except we store the IVF part in a btree
/// for faster lookup
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct BTreeIndex {
page_lookup: BTreeLookup,
page_lookup: Arc<BTreeLookup>,
store: Arc<dyn IndexStore>,
sub_index: Arc<dyn BTreeSubIndex>,
}
Expand All @@ -650,7 +664,7 @@ impl BTreeIndex {
store: Arc<dyn IndexStore>,
sub_index: Arc<dyn BTreeSubIndex>,
) -> Self {
let page_lookup = BTreeLookup::new(tree, null_pages);
let page_lookup = Arc::new(BTreeLookup::new(tree, null_pages));
Self {
page_lookup,
store,
Expand Down Expand Up @@ -721,6 +735,21 @@ impl BTreeIndex {

Ok(Self::new(map, null_pages, store, sub_index))
}

async fn into_old_data(self) -> Result<impl RecordBatchStream> {
let reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
let pages = self.page_lookup.all_page_ids();
let schema = self.sub_index.schema().clone();
let batches = IndexReaderStream {
reader,
pages,
idx: 0,
}
.map(|fut| fut.map_err(DataFusionError::from))
.buffered(num_cpus::get())
.boxed();
Ok(RecordBatchStreamAdapter::new(schema, batches))
}
}

fn wrap_bound(bound: &Bound<ScalarValue>) -> Bound<OrderableScalarValue> {
Expand Down Expand Up @@ -859,6 +888,15 @@ impl ScalarIndex for BTreeIndex {
.copy_index_file(BTREE_LOOKUP_NAME, dest_store)
.await
}

async fn update(
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
) -> Result<()> {
let merged_data_source = Box::new(BTreeUpdater::new(self.clone(), new_data));
train_btree_index(merged_data_source, self.sub_index.as_ref(), dest_store).await
}
}

struct BatchStats {
Expand Down Expand Up @@ -916,6 +954,14 @@ pub trait BTreeSubIndex: Debug + Send + Sync {
/// Deserialize a subindex from Arrow
async fn load_subindex(&self, serialized: RecordBatch) -> Result<Arc<dyn ScalarIndex>>;

/// Retrieve the data used to originally train this page
///
/// In order to perform an update we need to merge the old data in with the new data which
/// means we need to access the new data. Right now this is convenient for flat indices but
/// we may need to take a different approach if we ever decide to use a sub-index other than
/// flat
async fn retrieve_data(&self, serialized: RecordBatch) -> Result<RecordBatch>;

/// The schema of the subindex when serialized to Arrow
fn schema(&self) -> &Arc<Schema>;

Expand Down Expand Up @@ -971,7 +1017,7 @@ fn btree_stats_as_batch(stats: Vec<EncodedBatch>) -> Result<RecordBatch> {
}

#[async_trait]
pub trait BtreeTrainingSource: Send + Sync {
pub trait BtreeTrainingSource: Send {
/// Returns a stream of batches, ordered by the value column (in ascending order)
///
/// Each batch should have chunk_size rows
Expand All @@ -982,7 +1028,7 @@ pub trait BtreeTrainingSource: Send + Sync {
async fn scan_ordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<BoxStream<'static, Result<RecordBatch>>>;
) -> Result<SendableRecordBatchStream>;
}

/// Train a btree index from a stream of sorted page-size batches of values and row ids
Expand Down Expand Up @@ -1018,3 +1064,81 @@ pub async fn train_btree_index(
btree_index_file.finish().await?;
Ok(())
}

struct BTreeUpdater {
index: BTreeIndex,
new_data: SendableRecordBatchStream,
}

impl BTreeUpdater {
fn new(index: BTreeIndex, new_data: SendableRecordBatchStream) -> Self {
Self { index, new_data }
}
}

impl BTreeUpdater {
fn into_old_input(index: BTreeIndex) -> Arc<dyn ExecutionPlan> {
let schema = index.sub_index.schema().clone();
let batches = index.into_old_data().into_stream().try_flatten().boxed();
let stream = Box::pin(RecordBatchStreamAdapter::new(schema, batches));
Arc::new(OneShotExec::new(stream))
}
}

#[async_trait]
impl BtreeTrainingSource for BTreeUpdater {
async fn scan_ordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
let new_input = Arc::new(OneShotExec::new(self.new_data));
let old_input = Self::into_old_input(self.index);
debug_assert_eq!(
old_input.schema().all_fields().len(),
new_input.schema().all_fields().len()
);
let sort_expr = PhysicalSortExpr {
expr: Arc::new(Column::new("values", 0)),
options: SortOptions {
descending: false,
nulls_first: true,
},
};
// TODO: Use SortPreservingMergeExec here
let all_data = Arc::new(UnionExec::new(vec![old_input, new_input]));
// Squash back into one partition
let all_data = Arc::new(RepartitionExec::try_new(
all_data,
datafusion::physical_plan::Partitioning::RoundRobinBatch(1),
)?);
let ordered = Arc::new(SortExec::new(vec![sort_expr], all_data));
let unchunked = execute_plan(ordered)?;
Ok(chunk_concat_stream(unchunked, chunk_size as usize))
}
}

struct IndexReaderStream {
reader: Arc<dyn IndexReader>,
pages: Vec<u32>,
idx: usize,
}

impl Stream for IndexReaderStream {
type Item = BoxFuture<'static, Result<RecordBatch>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
let idx = this.idx;
if idx >= this.pages.len() {
return std::task::Poll::Ready(None);
}
let page_number = this.pages[idx];
this.idx += 1;
let reader_copy = this.reader.clone();
let read_task = async move { reader_copy.read_record_batch(page_number).await }.boxed();
std::task::Poll::Ready(Some(read_task))
}
}
14 changes: 14 additions & 0 deletions rust/lance-index/src/scalar/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use arrow_array::{
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;

use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_physical_expr::expressions::{in_list, lit, Column};
use lance_core::Result;
use nohash_hasher::IntMap;
Expand Down Expand Up @@ -125,6 +126,10 @@ impl BTreeSubIndex for FlatIndexMetadata {
) -> Result<RecordBatch> {
remap_batch(serialized, mapping)
}

async fn retrieve_data(&self, serialized: RecordBatch) -> Result<RecordBatch> {
Ok(serialized)
}
}

#[derive(Serialize)]
Expand Down Expand Up @@ -246,6 +251,15 @@ impl ScalarIndex for FlatIndex {
writer.finish().await?;
Ok(())
}

async fn update(
&self,
_new_data: SendableRecordBatchStream,
_dest_store: &dyn IndexStore,
) -> Result<()> {
// If this was desired, then you would need to merge new_data and data and write it back out
todo!()
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 11435c6

Please sign in to comment.