Skip to content

Commit

Permalink
v0 Datafusion with late materialization (#414)
Browse files Browse the repository at this point in the history
This PR augments the original Vortex connection for Datafusion, with an
implementation of filter pushdown that allows us to perform late
materialization on as many columns as possible.

Pushdown support can be flagged on/off so we can run
benchmarks testing different strategies.
  • Loading branch information
a10y authored Jun 26, 2024
1 parent 3d915fb commit 40616b1
Show file tree
Hide file tree
Showing 17 changed files with 1,111 additions and 102 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ workspace = true

[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
bytes = { workspace = true }
bzip2 = { workspace = true }
csv = { workspace = true }
datafusion = { workspace = true }
enum-iterator = { workspace = true }
flexbuffers = { workspace = true }
futures = { workspace = true }
Expand All @@ -29,6 +31,7 @@ lazy_static = { workspace = true }
log = { workspace = true }
mimalloc = { workspace = true }
parquet = { workspace = true, features = [] }
rand = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
simplelog = { workspace = true }
Expand All @@ -37,6 +40,7 @@ uuid = { workspace = true, features = ["v4"] }
vortex-alp = { path = "../encodings/alp" }
vortex-array = { path = "../vortex-array" }
vortex-buffer = { path = "../vortex-buffer" }
vortex-datafusion = { path = "../vortex-datafusion" }
vortex-datetime-parts = { path = "../encodings/datetime-parts" }
vortex-dict = { path = "../encodings/dict" }
vortex-dtype = { path = "../vortex-dtype" }
Expand All @@ -56,3 +60,7 @@ harness = false
[[bench]]
name = "random_access"
harness = false

[[bench]]
name = "datafusion_benchmark"
harness = false
195 changes: 195 additions & 0 deletions bench-vortex/benches/datafusion_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use std::sync::Arc;

use arrow_array::builder::{StringBuilder, UInt32Builder};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use criterion::measurement::Measurement;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion};
use datafusion::common::Result as DFResult;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::memory_pool::human_readable_size;
use datafusion::logical_expr::lit;
use datafusion::prelude::{col, count_distinct, DataFrame, SessionContext};
use lazy_static::lazy_static;
use vortex::compress::Compressor;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray, ToArrayData};
use vortex_datafusion::{VortexMemTable, VortexMemTableOptions};
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};

lazy_static! {
pub static ref CTX: Context = Context::default().with_encodings([
&BitPackedEncoding as EncodingRef,
&DictEncoding,
&FoREncoding,
&DeltaEncoding,
]);
}

fn toy_dataset_arrow() -> RecordBatch {
// 64,000 rows of string and numeric data.
// 8,000 values of first string, second string, third string, etc.

let names = [
"Alexander",
"Anastasia",
"Archibald",
"Bartholomew",
"Benjamin",
"Christopher",
"Elizabeth",
"Gabriella",
];

let mut col1 = StringBuilder::with_capacity(640_000, 64_000_000);
let mut col2 = UInt32Builder::with_capacity(640_000);
for i in 0..640_000 {
col1.append_value(names[i % 8]);
col2.append_value(u32::try_from(i).unwrap());
}

let col1 = col1.finish();
let col2 = col2.finish();

RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("names", DataType::Utf8, false),
Field::new("scores", DataType::UInt32, false),
])),
vec![Arc::new(col1), Arc::new(col2)],
)
.unwrap()
}

fn toy_dataset_vortex(compress: bool) -> Array {
let uncompressed = toy_dataset_arrow().to_array_data().into_array();

if !compress {
return uncompressed;
}

println!(
"uncompressed size: {:?}",
human_readable_size(uncompressed.nbytes())
);
let compressor = Compressor::new(&CTX);
let compressed = compressor.compress(&uncompressed, None).unwrap();
println!(
"vortex compressed size: {:?}",
human_readable_size(compressed.nbytes())
);
compressed
}

fn filter_agg_query(df: DataFrame) -> DFResult<DataFrame> {
// SELECT SUM(scores) FROM table WHERE scores >= 3000 AND scores <= 4000
df.filter(col("scores").gt_eq(lit(3_000)))?
.filter(col("scores").lt_eq(lit(4_000)))?
.aggregate(vec![], vec![count_distinct(col("names"))])
}

fn measure_provider<M: Measurement>(
group: &mut BenchmarkGroup<M>,
session: &SessionContext,
table: Arc<dyn TableProvider>,
) {
group.bench_function("planning", |b| {
b.to_async(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap(),
)
.iter(|| async {
// Force physical planner to execute on our TableProvider.
filter_agg_query(black_box(session).read_table(table.clone()).unwrap())
.unwrap()
.create_physical_plan()
.await
.unwrap();
});
});

group.bench_function("exec", |b| {
b.to_async(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap(),
)
.iter(|| async {
// Force full query execution with .collect()
filter_agg_query(black_box(session).read_table(table.clone()).unwrap())
.unwrap()
.collect()
.await
.unwrap();
});
});
}

fn bench_arrow<M: Measurement>(mut group: BenchmarkGroup<M>, session: &SessionContext) {
let arrow_dataset = toy_dataset_arrow();
let arrow_table =
Arc::new(MemTable::try_new(arrow_dataset.schema(), vec![vec![arrow_dataset]]).unwrap());

measure_provider(&mut group, session, arrow_table);
}

fn bench_vortex<M: Measurement>(
mut group: BenchmarkGroup<M>,
session: &SessionContext,
disable_pushdown: bool,
compress: bool,
) {
let vortex_dataset = toy_dataset_vortex(compress);
let vortex_table = Arc::new(
VortexMemTable::try_new(
vortex_dataset,
VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown),
)
.unwrap(),
);

measure_provider(&mut group, session, vortex_table);
}

fn bench_datafusion(c: &mut Criterion) {
bench_arrow(c.benchmark_group("arrow"), &SessionContext::new());

// compress=true, pushdown enabled
bench_vortex(
c.benchmark_group("vortex-pushdown-compressed"),
&SessionContext::new(),
false,
true,
);

// compress=false, pushdown enabled
bench_vortex(
c.benchmark_group("vortex-pushdown-uncompressed"),
&SessionContext::new(),
false,
false,
);

// compress=true, pushdown disabled
bench_vortex(
c.benchmark_group("vortex-nopushdown-compressed"),
&SessionContext::new(),
true,
true,
);

// compress=false, pushdown disabled
bench_vortex(
c.benchmark_group("vortex-nopushdown-uncompressed"),
&SessionContext::new(),
true,
false,
);
}

criterion_group!(benches, bench_datafusion);
criterion_main!(benches);
3 changes: 1 addition & 2 deletions pyvortex/test/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ def test_varbin_array_round_trip():

def test_varbin_array_take():
a = vortex.encode(pa.array(["a", "b", "c", "d"]))
# TODO(ngates): ensure we correctly round-trip to a string and not large_string
assert a.take(vortex.encode(pa.array([0, 2]))).to_pyarrow().combine_chunks() == pa.array(
["a", "c"],
type=pa.large_utf8(),
type=pa.utf8(),
)


Expand Down
1 change: 1 addition & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# features: []
# all-features: false
# with-sources: false
# generate-hashes: false

-e file:.
-e file:pyvortex
Expand Down
1 change: 1 addition & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# features: []
# all-features: false
# with-sources: false
# generate-hashes: false

-e file:.
-e file:pyvortex
2 changes: 0 additions & 2 deletions vortex-array/src/array/bool/accessors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use itertools::Itertools;
use vortex_error::VortexResult;

use crate::accessor::ArrayAccessor;
Expand All @@ -22,7 +21,6 @@ impl ArrayAccessor<bool> for BoolArray {
Validity::AllInvalid => Ok(f(&mut (0..self.len()).map(|_| None))),
Validity::Array(valid) => {
let valids = valid.into_bool()?.boolean_buffer();
println!("nulls: {:?}", valids.iter().collect_vec());
let mut iter = valids.iter().zip(bools.iter()).map(|(is_valid, value)| {
if is_valid {
Some(if value { &TRUE } else { &FALSE })
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/array/bool/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::compute::compare::CompareFn;
use crate::{Array, ArrayTrait, IntoArray, IntoArrayVariant};

impl CompareFn for BoolArray {
// TODO(aduffy): replace these with Arrow compute kernels.
fn compare(&self, other: &Array, op: Operator) -> VortexResult<Array> {
let flattened = other.clone().into_bool()?;
let lhs = self.boolean_buffer();
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/array/constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ impl ConstantArray {
Scalar: From<S>,
{
let scalar: Scalar = scalar.into();
// TODO(aduffy): add stats for bools, ideally there should be a
// StatsSet::constant(Scalar) constructor that does this for us, like StatsSet::nulls.
let stats = StatsSet::from(HashMap::from([
(Stat::Max, scalar.clone()),
(Stat::Min, scalar.clone()),
Expand Down
21 changes: 17 additions & 4 deletions vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use vortex_dtype::{FieldName, FieldNames, Nullability, StructDType};
use vortex_error::{vortex_bail, vortex_err};
use vortex_error::vortex_bail;

use crate::stats::ArrayStatisticsCompute;
use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata};
Expand All @@ -27,6 +27,15 @@ impl StructArray {
self.array().child(idx, dtype)
}

pub fn field_by_name(&self, name: &str) -> Option<Array> {
let field_idx = self
.names()
.iter()
.position(|field_name| field_name.as_ref() == name);

field_idx.and_then(|field_idx| self.field(field_idx))
}

pub fn names(&self) -> &FieldNames {
let DType::Struct(st, _) = self.dtype() else {
unreachable!()
Expand Down Expand Up @@ -111,22 +120,26 @@ impl StructArray {
}

impl StructArray {
// TODO(aduffy): Add equivalent function to support field masks for nested column access.

/// Return a new StructArray with the given projection applied.
///
/// Projection does not copy data arrays. Projection is defined by an ordinal array slice
/// which specifies the new ordering of columns in the struct. The projection can be used to
/// perform column re-ordering, deletion, or duplication at a logical level, without any data
/// copying.
///
/// This function will return an error if the projection includes invalid column IDs.
pub fn project(self, projection: &[usize]) -> VortexResult<Self> {
/// # Panics
/// This function will panic an error if the projection references columns not within the
/// schema boundaries.
pub fn project(&self, projection: &[usize]) -> VortexResult<Self> {
let mut children = Vec::with_capacity(projection.len());
let mut names = Vec::with_capacity(projection.len());

for column_idx in projection {
children.push(
self.field(*column_idx)
.ok_or_else(|| vortex_err!(InvalidArgument: "column index out of bounds"))?,
.expect("column must not exceed bounds"),
);
names.push(self.names()[*column_idx].clone());
}
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn take<I: NativePType, O: NativePType>(
return Ok(take_nullable(dtype, offsets, data, indices, v));
}

let mut builder = VarBinBuilder::<I>::with_capacity(indices.len());
let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
for &idx in indices {
let idx = idx.to_usize().unwrap();
let start = offsets[idx].to_usize().unwrap();
Expand Down
Loading

0 comments on commit 40616b1

Please sign in to comment.