Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve decoded arrays from filtering #24

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions parquet/src/arrow/arrow_reader/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait ArrowPredicate: Send + 'static {
/// matching the predicate.
///
/// The returned [`BooleanArray`] must not contain any nulls
fn filter(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray>;
fn filter(&mut self, batch: &RecordBatch) -> ArrowResult<BooleanArray>;
}

/// An [`ArrowPredicate`] created from an [`FnMut`]
Expand All @@ -41,7 +41,7 @@ pub struct ArrowPredicateFn<F> {

impl<F> ArrowPredicateFn<F>
where
F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
F: FnMut(&RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
{
/// Create a new [`ArrowPredicateFn`]
pub fn new(projection: ProjectionMask, f: F) -> Self {
Expand All @@ -51,13 +51,13 @@ where

impl<F> ArrowPredicate for ArrowPredicateFn<F>
where
F: FnMut(RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
F: FnMut(&RecordBatch) -> ArrowResult<BooleanArray> + Send + 'static,
{
fn projection(&self) -> &ProjectionMask {
&self.projection
}

fn filter(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
fn filter(&mut self, batch: &RecordBatch) -> ArrowResult<BooleanArray> {
(self.f)(batch)
}
}
Expand Down
27 changes: 19 additions & 8 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array::Array;
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
Expand Down Expand Up @@ -353,21 +354,31 @@ impl ParquetRecordBatchReader {
pub(crate) fn evaluate_predicate(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
selection: Option<RowSelection>,
selection: &Option<RowSelection>,
predicate: &mut dyn ArrowPredicate,
) -> Result<RowSelection> {
) -> Result<(RowSelection, RecordBatch)> {
let reader =
ParquetRecordBatchReader::new(batch_size, array_reader, selection.clone());
let mut filters = vec![];
let mut batches = vec![];

let schema = reader.schema();

for maybe_batch in reader {
filters.push(predicate.filter(maybe_batch?)?);
let batch = maybe_batch?;
let filter = predicate.filter(&batch)?;
batches.push(filter_record_batch(&batch, &filter)?);
filters.push(filter);
}

let raw = RowSelection::from_filters(&filters);
Ok(match selection {
Some(selection) => selection.and(&raw),
None => raw,
})
let batch = RecordBatch::concat(&schema, &batches)?;
Copy link
Owner

@tustvold tustvold Aug 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a not very selective predicate, won't this massively balloon your memory consumption? As you will effectively hydrate the entire projected row group into a RecordBatch?

Edit: Perhaps that is fine, and we could just have a caveat that the entire projected row group must fit into memory for pushed down predicates. DataFusion could then make a judgement call based on the size of the column chunks 🤔

Edit Edit: We could even push this into the parquet reader

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow. What would get pushed into the parquet reader?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we've buffered too much in memory, which is highly likely with any non-selective predicate, the parquet reader could drop the buffer, and fall back to decoding it again

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we've buffered too much in memory, which is highly likely with any non-selective predicate, the parquet reader could drop the buffer, and fall back to decoding it again

Ah, yeah that makes sense


Ok((RowSelection::from_filters(&filters), batch))
// let raw = RowSelection::from_filters(&filters);
// Ok(match selection {
// Some(selection) => selection.and(&raw),
// None => raw,
// })
}

#[cfg(test)]
Expand Down
24 changes: 22 additions & 2 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Array, BooleanArray};
use arrow::compute::SlicesIterator;
use arrow::array::{Array, ArrayRef, BooleanArray};
use arrow::compute::{concat, SlicesIterator};
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;
Expand Down Expand Up @@ -196,6 +196,26 @@ impl RowSelection {
Self { selectors }
}

/// Applies `self` to a single array. Returns `None` if
pub fn filter_array(&self, array: &ArrayRef) -> Option<ArrayRef> {
if self.selects_any() {
let mut offset = 0;
let mut slices = vec![];
for selector in &self.selectors {
if !selector.skip {
slices.push(array.slice(offset, selector.row_count));
}
offset += selector.row_count;
}

let slice_refs: Vec<&dyn Array> = slices.iter().map(|a| a.as_ref()).collect();

Some(concat(&slice_refs).unwrap())
} else {
None
}
}

/// Returns `true` if this [`RowSelection`] selects any rows
pub fn selects_any(&self) -> bool {
self.selectors.iter().any(|x| !x.skip)
Expand Down
95 changes: 81 additions & 14 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::array::{Array, ArrayRef};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
Expand Down Expand Up @@ -311,20 +312,27 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
schema: self.schema.clone(),
};

let projected_schema = self.projection.project(&self.schema);

Ok(ParquetRecordBatchStream {
metadata: self.metadata,
batch_size: self.batch_size,
row_groups,
projection: self.projection,
selection: self.selection,
schema: self.schema,
projected_schema,
reader: Some(reader),
state: StreamState::Init,
})
}
}

type ReadResult<T> = Result<(ReaderFactory<T>, Option<ParquetRecordBatchReader>)>;
type ReadResult<T> = Result<(
ReaderFactory<T>,
Option<ParquetRecordBatchReader>,
Vec<Option<ArrayRef>>,
)>;

/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
/// [`ParquetRecordBatchReader`]
Expand All @@ -349,7 +357,7 @@ where
mut self,
row_group_idx: usize,
mut selection: Option<RowSelection>,
projection: ProjectionMask,
mut projection: ProjectionMask,
batch_size: usize,
) -> ReadResult<T> {
// TODO: calling build_array multiple times is wasteful
Expand All @@ -364,13 +372,17 @@ where
column_chunks: vec![None; meta.columns().len()],
};

let num_columns = projection.num_leaves(meta.num_columns());
let mut buffer: Vec<Option<ArrayRef>> = vec![None; num_columns];

if let Some(filter) = self.filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
if !selects_any(selection.as_ref()) {
return Ok((self, None));
return Ok((self, None, vec![]));
}

let predicate_projection = predicate.projection().clone();

row_group
.fetch(
&mut self.input,
Expand All @@ -380,23 +392,48 @@ where
)
.await?;

let projected_indices =
predicate_projection.leaf_indices(meta.num_columns());

let array_reader = build_array_reader(
self.schema.clone(),
predicate_projection,
&row_group,
)?;

selection = Some(evaluate_predicate(
let (next_selection, batch) = evaluate_predicate(
batch_size,
array_reader,
selection,
&selection,
predicate.as_mut(),
)?);
)?;

if !next_selection.selects_any() {
return Ok((self, None, vec![]));
}

for array in buffer.iter_mut().flatten() {
*array = next_selection.filter_array(array).unwrap();
}

for (array, idx) in batch.columns().iter().zip(projected_indices) {
if projection.leaf_included(idx) {
buffer[idx] = Some(array.clone());
projection.exclude_leaf(idx, meta.num_columns());
}
}

selection = Some(
selection
.as_ref()
.map(|current_selection| current_selection.and(&next_selection))
.unwrap_or(next_selection),
);
}
}

if !selects_any(selection.as_ref()) {
return Ok((self, None));
return Ok((self, None, vec![]));
}

row_group
Expand All @@ -409,15 +446,15 @@ where
selection,
);

Ok((self, Some(reader)))
Ok((self, Some(reader), buffer))
}
}

enum StreamState<T> {
/// At the start of a new row group, or the end of the parquet stream
Init,
/// Decoding a batch
Decoding(ParquetRecordBatchReader),
Decoding(ParquetRecordBatchReader, Vec<Option<ArrayRef>>),
/// Reading data from input
Reading(BoxFuture<'static, ReadResult<T>>),
/// Error
Expand All @@ -428,7 +465,7 @@ impl<T> std::fmt::Debug for StreamState<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
StreamState::Init => write!(f, "StreamState::Init"),
StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
StreamState::Decoding(_, _) => write!(f, "StreamState::Decoding"),
StreamState::Reading(_) => write!(f, "StreamState::Reading"),
StreamState::Error => write!(f, "StreamState::Error"),
}
Expand All @@ -442,6 +479,8 @@ pub struct ParquetRecordBatchStream<T> {

schema: SchemaRef,

projected_schema: SchemaRef,

row_groups: VecDeque<usize>,

projection: ProjectionMask,
Expand Down Expand Up @@ -487,8 +526,34 @@ where
) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
StreamState::Decoding(batch_reader) => match batch_reader.next() {
Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))),
StreamState::Decoding(batch_reader, decode_buffer) => match batch_reader
.next()
{
Some(Ok(batch)) => {
let mut next_col_idx = 0;
let num_rows = batch.num_rows();

let mut arrays = vec![];

for buffered_array in decode_buffer.iter_mut() {
if let Some(array) = buffered_array.take() {
let array_len = array.len();
arrays.push(array.slice(0, num_rows));

*buffered_array =
Some(array.slice(num_rows, array_len - num_rows));
} else {
arrays.push(batch.column(next_col_idx).clone());
next_col_idx += 1;
}
}

let final_batch =
RecordBatch::try_new(self.projected_schema.clone(), arrays)
.unwrap();

return Poll::Ready(Some(Ok(final_batch)));
}
Some(Err(e)) => {
self.state = StreamState::Error;
return Poll::Ready(Some(Err(ParquetError::ArrowError(
Expand Down Expand Up @@ -523,11 +588,13 @@ where
self.state = StreamState::Reading(fut)
}
StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
Ok((reader_factory, maybe_reader)) => {
Ok((reader_factory, maybe_reader, decode_buffer)) => {
self.reader = Some(reader_factory);
match maybe_reader {
// Read records from [`ParquetRecordBatchReader`]
Some(reader) => self.state = StreamState::Decoding(reader),
Some(reader) => {
self.state = StreamState::Decoding(reader, decode_buffer)
}
// All rows skipped, read next row group
None => self.state = StreamState::Init,
}
Expand Down
48 changes: 48 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
use crate::schema::types::SchemaDescriptor;
use arrow::datatypes::SchemaRef;
use std::sync::Arc;

pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
Expand Down Expand Up @@ -229,4 +231,50 @@ impl ProjectionMask {
pub fn leaf_included(&self, leaf_idx: usize) -> bool {
self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
}

/// Return the indices of leaves selected by this `ProjectionMask`
pub fn leaf_indices(&self, num_columns: usize) -> Vec<usize> {
match &self.mask {
Some(mask) => mask
.iter()
.enumerate()
.filter_map(|(idx, b)| b.then(|| idx))
.collect(),
None => Vec::from_iter(0..num_columns),
}
}

pub fn exclude_leaf(&mut self, idx: usize, num_columns: usize) {
let mut mask = self.mask.take().unwrap_or_else(|| vec![true; num_columns]);

mask[idx] = false;

self.mask = Some(mask);
}

pub fn exclude_leaves(&mut self, indices: &[usize], num_columns: usize) {
let mut mask = self.mask.take().unwrap_or_else(|| vec![true; num_columns]);

for idx in indices {
mask[*idx] = false;
}

self.mask = Some(mask);
}

pub fn num_leaves(&self, num_columns: usize) -> usize {
self.mask
.as_ref()
.map(|mask| mask.iter().map(|b| if *b { 1 } else { 0 }).sum())
.unwrap_or(num_columns)
}

pub fn project(&self, schema: &SchemaRef) -> SchemaRef {
if self.mask.is_some() {
let indices = self.leaf_indices(schema.fields().len());
Arc::new(schema.project(&indices).unwrap())
} else {
schema.clone()
}
}
}