Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow scalar indices to be updated with new data #1576

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ rust-version = "1.70"
lance = { version = "=0.8.16", path = "./lance" }
lance-arrow = { version = "=0.8.16", path = "./lance-arrow" }
lance-core = { version = "=0.8.16", path = "./lance-core" }
lance-datafusion = { version = "=0.8.16", path = "./lance-datafusion" }
lance-datagen = { version = "=0.8.16", path = "./lance-datagen" }
lance-index = { version = "=0.8.16", path = "./lance-index" }
lance-linalg = { version = "=0.8.16", path = "./lance-linalg" }
Expand Down Expand Up @@ -70,6 +71,9 @@ bytes = "1.4"
byteorder = "1.5"
chrono = "0.4.23"
criterion = { version = "0.5", features = ["async", "async_tokio"] }
datafusion = { version = "32.0.0", default-features = false, features = [
"regex_expressions",
] }
datafusion-common = "32.0"
datafusion-sql = "32.0"
either = "1.0"
Expand Down
22 changes: 22 additions & 0 deletions rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "lance-datafusion"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
readme.workspace = true
keywords.workspace = true
categories.workspace = true
description = "Internal utilities used by other lance modules to simplify working with datafusion"

[dependencies]
arrow.workspace = true
arrow-array.workspace = true
arrow-schema.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr = "32.0.0"
datafusion-physical-expr = { version = "32.0.0", default-features = false }
futures.workspace = true
lance-arrow.workspace = true
lance-core.workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use std::collections::VecDeque;
use std::pin::Pin;

use arrow::compute::kernels;
use arrow_array::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{Stream, StreamExt};
use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
use datafusion_common::DataFusionError;
use futures::{Stream, StreamExt, TryStreamExt};

use crate::Result;
use lance_core::Result;

/// Wraps a [`SendableRecordBatchStream`] into a stream of RecordBatch chunks of
/// a given size. This slices but does not copy any buffers.
Expand Down Expand Up @@ -122,3 +124,23 @@ pub fn chunk_stream(
})
.boxed()
}

pub fn chunk_concat_stream(
stream: SendableRecordBatchStream,
chunk_size: usize,
) -> SendableRecordBatchStream {
let schema = stream.schema().clone();
let schema_copy = schema.clone();
let chunked = chunk_stream(stream, chunk_size);
let chunk_concat = chunked
.and_then(move |batches| {
std::future::ready(
// chunk_stream is zero-copy and so it gives us pieces of batches. However, the btree
// index needs 1 batch-per-page and so we concatenate here.
kernels::concat::concat_batches(&schema, batches.iter()).map_err(|e| e.into()),
)
})
.map_err(DataFusionError::from)
.boxed();
Box::pin(RecordBatchStreamAdapter::new(schema_copy, chunk_concat))
}
193 changes: 193 additions & 0 deletions rust/lance-datafusion/src/exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Utilities for working with datafusion execution plans

use std::{
cell::RefCell,
sync::{Arc, Mutex},
};

use arrow_array::RecordBatchReader;
use arrow_schema::Schema as ArrowSchema;
use datafusion::{
execution::{
context::{SessionConfig, SessionState},
runtime_env::{RuntimeConfig, RuntimeEnv},
},
physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream,
},
};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::Partitioning;
use futures::TryStreamExt;

use lance_arrow::SchemaExt;
use lance_core::{datatypes::Schema, Error, Result};

/// Convert reader to a stream and a schema.
///
/// Will peek the first batch to get the dictionaries for dictionary columns.
///
/// NOTE: this does not validate the schema. For example, for appends the schema
/// should be checked to make sure it matches the existing dataset schema before
/// writing.
pub fn reader_to_stream(
batches: Box<dyn RecordBatchReader + Send>,
) -> Result<(SendableRecordBatchStream, Schema)> {
let arrow_schema = batches.schema();
let mut schema: Schema = Schema::try_from(batches.schema().as_ref())?;
let mut peekable = batches.peekable();
if let Some(batch) = peekable.peek() {
if let Ok(b) = batch {
schema.set_dictionary(b)?;
} else {
return Err(Error::from(batch.as_ref().unwrap_err()));
}
}
schema.validate()?;

let stream = RecordBatchStreamAdapter::new(
arrow_schema,
futures::stream::iter(peekable).map_err(DataFusionError::from),
);
let stream = Box::pin(stream) as SendableRecordBatchStream;

Ok((stream, schema))
}

/// An source execution node created from an existing stream
///
/// It can only be used once, and will return the stream. After that the node
/// is exhuasted.
pub struct OneShotExec {
stream: Mutex<RefCell<Option<SendableRecordBatchStream>>>,
// We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
// can still function after exhuasted
schema: Arc<ArrowSchema>,
}

impl OneShotExec {
/// Create a new instance from a given stream
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema().clone();
Self {
stream: Mutex::new(RefCell::new(Some(stream))),
schema,
}
}
}

impl std::fmt::Debug for OneShotExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let val_guard = self.stream.lock().unwrap();
let stream = val_guard.borrow();
f.debug_struct("OneShotExec")
.field("exhausted", &stream.is_none())
.field("schema", self.schema.as_ref())
.finish()
}
}

impl DisplayAs for OneShotExec {
fn fmt_as(
&self,
t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
let val_guard = self.stream.lock().unwrap();
let stream = val_guard.borrow();
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let exhausted = if stream.is_some() { "" } else { "EXHUASTED " };
let columns = self
.schema
.field_names()
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
write!(
f,
"OneShotStream: {}columns=[{}]",
exhausted,
columns.join(",")
)
}
}
}
}

impl ExecutionPlan for OneShotExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow_schema::SchemaRef {
self.schema.clone()
}

fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning {
Partitioning::RoundRobinBatch(1)
}

fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
todo!()
}

fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
let mut val_guard = self.stream.lock().unwrap();
let stream = val_guard.get_mut();
let stream = stream.take();
if let Some(stream) = stream {
Ok(stream)
} else {
panic!("Attempt to use OneShotExec more than once");
}
}

fn statistics(&self) -> datafusion_common::Statistics {
todo!()
}
}

/// Executes a plan using default session & runtime configuration
///
/// Only executes a single partition. Panics if the plan has more than one partition.
pub fn execute_plan(plan: Arc<dyn ExecutionPlan>) -> Result<SendableRecordBatchStream> {
let session_config = SessionConfig::new();
let runtime_config = RuntimeConfig::new();
let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?);
let session_state = SessionState::new_with_config_rt(session_config, runtime_env);
// NOTE: we are only executing the first partition here. Therefore, if
// the plan has more than one partition, we will be missing data.
assert_eq!(plan.output_partitioning().partition_count(), 1);
Ok(plan.execute(0, session_state.task_ctx())?)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! Utilities for working with datafusion expressions

use arrow_schema::DataType;
use datafusion_common::ScalarValue;

Expand Down
3 changes: 3 additions & 0 deletions rust/lance-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod chunker;
pub mod exec;
pub mod expr;
3 changes: 3 additions & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ arrow-arith.workspace = true
arrow-select.workspace = true
async-recursion.workspace = true
async-trait.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr = "32.0.0"
datafusion-physical-expr = { version = "32.0.0", default-features = false }
futures.workspace = true
half.workspace = true
lance-arrow.workspace = true
lance-core.workspace = true
lance-datafusion.workspace = true
lance-linalg.workspace = true
log.workspace = true
nohash-hasher.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
object_store.workspace = true
pin-project.workspace = true
prost.workspace = true
rand.workspace = true
roaring.workspace = true
Expand Down
1 change: 0 additions & 1 deletion rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use lance_core::Result;
use roaring::RoaringBitmap;

pub mod scalar;
pub mod util;
pub mod vector;

pub const INDEX_FILE_NAME: &str = "index.idx";
Expand Down
8 changes: 8 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{any::Any, ops::Bound, sync::Arc};
use arrow_array::{RecordBatch, UInt64Array};
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::scalar::ScalarValue;

use lance_core::Result;
Expand Down Expand Up @@ -156,4 +157,11 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index {
mapping: &IntMap<u64, Option<u64>>,
dest_store: &dyn IndexStore,
) -> Result<()>;

/// Add the new data into the index, creating an updated version of the index in `dest_store`
async fn update(
&self,
new_data: SendableRecordBatchStream,
dest_store: &dyn IndexStore,
) -> Result<()>;
}
Loading
Loading