Skip to content

Commit

Permalink
Refactor datafusion utilities into a separate crate, rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Nov 14, 2023
1 parent 11435c6 commit 532c782
Show file tree
Hide file tree
Showing 20 changed files with 248 additions and 202 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ rust-version = "1.70"
lance = { version = "=0.8.14", path = "./lance" }
lance-arrow = { version = "=0.8.14", path = "./lance-arrow" }
lance-core = { version = "=0.8.14", path = "./lance-core" }
lance-datafusion = { version = "=0.8.14", path = "./lance-datafusion" }
lance-datagen = { version = "=0.8.14", path = "./lance-datagen" }
lance-index = { version = "=0.8.14", path = "./lance-index" }
lance-linalg = { version = "=0.8.14", path = "./lance-linalg" }
Expand Down
22 changes: 22 additions & 0 deletions rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "lance-datafusion"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
readme.workspace = true
keywords.workspace = true
categories.workspace = true
description = "Internal utilities used by other lance modules to simplify working with datafusion"

[dependencies]
arrow.workspace = true
arrow-array.workspace = true
arrow-schema.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr = "32.0.0"
datafusion-physical-expr = { version = "32.0.0", default-features = false }
futures.workspace = true
lance-arrow.workspace = true
lance-core.workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecord
use datafusion_common::DataFusionError;
use futures::{Stream, StreamExt, TryStreamExt};

use crate::Result;
use lance_core::Result;

/// Wraps a [`SendableRecordBatchStream`] into a stream of RecordBatch chunks of
/// a given size. This slices but does not copy any buffers.
Expand Down
185 changes: 185 additions & 0 deletions rust/lance-datafusion/src/exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2023 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Utilities for working with datafusion execution plans

use std::{
cell::RefCell,
sync::{Arc, Mutex},
};

use arrow_array::RecordBatchReader;
use arrow_schema::Schema as ArrowSchema;
use datafusion::{
execution::{
context::{SessionConfig, SessionState},
runtime_env::{RuntimeConfig, RuntimeEnv},
},
physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream,
},
};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::Partitioning;
use futures::TryStreamExt;

use lance_arrow::SchemaExt;
use lance_core::{datatypes::Schema, Error, Result};

pub struct OneShotExec {
stream: Mutex<RefCell<Option<SendableRecordBatchStream>>>,
// We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
// can still function after exhuasted
schema: Arc<ArrowSchema>,
}

impl OneShotExec {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema().clone();
Self {
stream: Mutex::new(RefCell::new(Some(stream))),
schema,
}
}
}

impl std::fmt::Debug for OneShotExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let val_guard = self.stream.lock().unwrap();
let stream = val_guard.borrow();
f.debug_struct("OneShotExec")
.field("exhausted", &stream.is_none())
.field("schema", self.schema.as_ref())
.finish()
}
}

impl DisplayAs for OneShotExec {
fn fmt_as(
&self,
t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
let val_guard = self.stream.lock().unwrap();
let stream = val_guard.borrow();
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let exhausted = if stream.is_some() { "" } else { "EXHUASTED" };
let columns = self
.schema
.field_names()
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
write!(
f,
"OneShotStream: {} columns=[{}]",
exhausted,
columns.join(",")
)
}
}
}
}

/// Convert reader to a stream and a schema.
///
/// Will peek the first batch to get the dictionaries for dictionary columns.
///
/// NOTE: this does not validate the schema. For example, for appends the schema
/// should be checked to make sure it matches the existing dataset schema before
/// writing.
pub fn reader_to_stream(
batches: Box<dyn RecordBatchReader + Send>,
) -> Result<(SendableRecordBatchStream, Schema)> {
let arrow_schema = batches.schema();
let mut schema: Schema = Schema::try_from(batches.schema().as_ref())?;
let mut peekable = batches.peekable();
if let Some(batch) = peekable.peek() {
if let Ok(b) = batch {
schema.set_dictionary(b)?;
} else {
return Err(Error::from(batch.as_ref().unwrap_err()));
}
}
schema.validate()?;

let stream = RecordBatchStreamAdapter::new(
arrow_schema,
futures::stream::iter(peekable).map_err(DataFusionError::from),
);
let stream = Box::pin(stream) as SendableRecordBatchStream;

Ok((stream, schema))
}

impl ExecutionPlan for OneShotExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow_schema::SchemaRef {
self.schema.clone()
}

fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning {
Partitioning::RoundRobinBatch(1)
}

fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
todo!()
}

fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
let mut val_guard = self.stream.lock().unwrap();
let stream = val_guard.get_mut();
let stream = stream.take();
if let Some(stream) = stream {
Ok(stream)
} else {
panic!("Attempt to use OneShotExec more than once");
}
}

fn statistics(&self) -> datafusion_common::Statistics {
todo!()
}
}

pub fn execute_plan(plan: Arc<dyn ExecutionPlan>) -> Result<SendableRecordBatchStream> {
let session_config = SessionConfig::new();
let runtime_config = RuntimeConfig::new();
let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?);
let session_state = SessionState::new_with_config_rt(session_config, runtime_env);
// NOTE: we are only executing the first partition here. Therefore, if
// the plan has more than one partition, we will be missing data.
assert_eq!(plan.output_partitioning().partition_count(), 1);
Ok(plan.execute(0, session_state.task_ctx())?)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
cell::RefCell,
sync::{Arc, Mutex},
};
//! Utilities for working with datafusion expressions

use arrow_array::RecordBatchReader;
use arrow_schema::{DataType, Schema};
use datafusion::{
execution::{
context::SessionState,
runtime_env::{RuntimeConfig, RuntimeEnv},
},
physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream,
},
prelude::SessionConfig,
};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_physical_expr::Partitioning;

use futures::{stream, TryStreamExt};
use lance_arrow::SchemaExt;
use lance_core::Result;
use arrow_schema::DataType;
use datafusion_common::ScalarValue;

// This is slightly tedious but when we convert expressions from SQL strings to logical
// datafusion expressions there is no type coercion that happens. In other words "x = 7"
Expand Down Expand Up @@ -264,125 +244,3 @@ pub fn safe_coerce_scalar(value: &ScalarValue, ty: &DataType) -> Option<ScalarVa
_ => None,
}
}

pub struct OneShotExec {
stream: Mutex<RefCell<Option<SendableRecordBatchStream>>>,
// We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
// can still function after exhuasted
schema: Arc<Schema>,
}

impl OneShotExec {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema().clone();
Self {
stream: Mutex::new(RefCell::new(Some(stream))),
schema,
}
}
}

impl std::fmt::Debug for OneShotExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let val_guard = self.stream.lock().unwrap();
let stream = val_guard.borrow();
f.debug_struct("OneShotExec")
.field("exhausted", &stream.is_none())
.field("schema", self.schema.as_ref())
.finish()
}
}

impl DisplayAs for OneShotExec {
fn fmt_as(
&self,
t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
let val_guard = self.stream.lock().unwrap();
let stream = val_guard.borrow();
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let exhausted = if stream.is_some() { "" } else { "EXHUASTED" };
let columns = self
.schema
.field_names()
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
write!(
f,
"OneShotStream: {} columns=[{}]",
exhausted,
columns.join(",")
)
}
}
}
}

/// Convert a sendable RecordBatchReader to a sendable RecordBatchStream
pub fn reader_to_stream(reader: Box<dyn RecordBatchReader + Send>) -> SendableRecordBatchStream {
let schema = reader.schema().clone();
let stream = stream::iter(reader).map_err(DataFusionError::from);
Box::pin(RecordBatchStreamAdapter::new(schema, stream))
}

impl ExecutionPlan for OneShotExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow_schema::SchemaRef {
self.schema.clone()
}

fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning {
Partitioning::RoundRobinBatch(1)
}

fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
todo!()
}

fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
let mut val_guard = self.stream.lock().unwrap();
let stream = val_guard.get_mut();
let stream = stream.take();
if let Some(stream) = stream {
Ok(stream)
} else {
panic!("Attempt to use OneShotExec more than once");
}
}

fn statistics(&self) -> datafusion_common::Statistics {
todo!()
}
}

pub fn execute_plan(plan: Arc<dyn ExecutionPlan>) -> Result<SendableRecordBatchStream> {
let session_config = SessionConfig::new();
let runtime_config = RuntimeConfig::new();
let runtime_env = Arc::new(RuntimeEnv::new(runtime_config)?);
let session_state = SessionState::new_with_config_rt(session_config, runtime_env);
// NOTE: we are only executing the first partition here. Therefore, if
// the plan has more than one partition, we will be missing data.
assert_eq!(plan.output_partitioning().partition_count(), 1);
Ok(plan.execute(0, session_state.task_ctx())?)
}
3 changes: 3 additions & 0 deletions rust/lance-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod chunker;
pub mod exec;
pub mod expr;
1 change: 1 addition & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures.workspace = true
half.workspace = true
lance-arrow.workspace = true
lance-core.workspace = true
lance-datafusion.workspace = true
lance-linalg.workspace = true
log.workspace = true
nohash-hasher.workspace = true
Expand Down
Loading

0 comments on commit 532c782

Please sign in to comment.