From 43b96aad61f271c330d261770cf2517bc06137ab Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 4 Aug 2022 22:15:10 +0100 Subject: [PATCH 1/8] Add RowFilter API --- parquet/src/arrow/array_reader/builder.rs | 19 +- parquet/src/arrow/array_reader/list_array.rs | 10 +- parquet/src/arrow/array_reader/mod.rs | 6 +- parquet/src/arrow/arrow_reader/filter.rs | 95 +++++ .../{arrow_reader.rs => arrow_reader/mod.rs} | 122 +++--- parquet/src/arrow/arrow_reader/selection.rs | 215 ++++++++++ parquet/src/arrow/async_reader.rs | 399 +++++++++++++----- 7 files changed, 673 insertions(+), 193 deletions(-) create mode 100644 parquet/src/arrow/arrow_reader/filter.rs rename parquet/src/arrow/{arrow_reader.rs => arrow_reader/mod.rs} (96%) create mode 100644 parquet/src/arrow/arrow_reader/selection.rs diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index d9c1bedb246c..23407ac987c9 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -39,20 +39,18 @@ use crate::data_type::{ Int64Type, Int96Type, }; use crate::errors::Result; -use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type}; +use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; /// Create array reader from parquet schema, projection mask, and parquet file reader. pub fn build_array_reader( - parquet_schema: SchemaDescPtr, arrow_schema: SchemaRef, mask: ProjectionMask, - row_groups: Box, + row_groups: &dyn RowGroupCollection, ) -> Result> { - let field = - convert_schema(parquet_schema.as_ref(), mask, Some(arrow_schema.as_ref()))?; + let field = convert_schema(&row_groups.schema(), mask, Some(arrow_schema.as_ref()))?; match &field { - Some(field) => build_reader(field, row_groups.as_ref()), + Some(field) => build_reader(field, row_groups), None => Ok(make_empty_array_reader(row_groups.num_rows())), } } @@ -336,13 +334,8 @@ mod tests { ) .unwrap(); - let array_reader = build_array_reader( - file_reader.metadata().file_metadata().schema_descr_ptr(), - Arc::new(arrow_schema), - mask, - Box::new(file_reader), - ) - .unwrap(); + let array_reader = + build_array_reader(Arc::new(arrow_schema), mask, &file_reader).unwrap(); // Create arrow types let arrow_type = DataType::Struct(vec![Field::new( diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index c245c61312fc..b504f62352ee 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -593,13 +593,9 @@ mod tests { let schema = file_metadata.schema_descr_ptr(); let mask = ProjectionMask::leaves(&schema, vec![0]); - let mut array_reader = build_array_reader( - schema, - Arc::new(arrow_schema), - mask, - Box::new(file_reader), - ) - .unwrap(); + let mut array_reader = + build_array_reader(Arc::new(arrow_schema), mask, &file_reader) + .unwrap(); let batch = array_reader.next_batch(100).unwrap(); assert_eq!(batch.data_type(), array_reader.get_data_type()); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index d7665ef0f6b2..54c45a336a37 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -100,7 +100,7 @@ pub trait ArrayReader: Send { /// A collection of row groups pub trait RowGroupCollection { /// Get schema of parquet file. - fn schema(&self) -> Result; + fn schema(&self) -> SchemaDescPtr; /// Get the numer of rows in this collection fn num_rows(&self) -> usize; @@ -110,8 +110,8 @@ pub trait RowGroupCollection { } impl RowGroupCollection for Arc { - fn schema(&self) -> Result { - Ok(self.metadata().file_metadata().schema_descr_ptr()) + fn schema(&self) -> SchemaDescPtr { + self.metadata().file_metadata().schema_descr_ptr() } fn num_rows(&self) -> usize { diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs new file mode 100644 index 000000000000..a5beba5c971d --- /dev/null +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::ProjectionMask; +use arrow::array::BooleanArray; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; + +/// A predicate operating on [`RecordBatch`] +pub trait ArrowPredicate: Send + 'static { + /// Returns the projection mask for this predicate + fn projection(&self) -> &ProjectionMask; + + /// Called with a [`RecordBatch`] containing the columns identified by [`Self::mask`], + /// with `true` values in the returned [`BooleanArray`] indicating rows + /// matching the predicate. + /// + /// The returned [`BooleanArray`] must not contain any nulls + fn filter(&mut self, batch: RecordBatch) -> ArrowResult; +} + +/// An [`ArrowPredicate`] created from an [`FnMut`] +pub struct ArrowPredicateFn { + f: F, + projection: ProjectionMask, +} + +impl ArrowPredicateFn +where + F: FnMut(RecordBatch) -> ArrowResult + Send + 'static, +{ + /// Create a new [`ArrowPredicateFn`] + pub fn new(projection: ProjectionMask, f: F) -> Self { + Self { f, projection } + } +} + +impl ArrowPredicate for ArrowPredicateFn +where + F: FnMut(RecordBatch) -> ArrowResult + Send + 'static, +{ + fn projection(&self) -> &ProjectionMask { + &self.projection + } + + fn filter(&mut self, batch: RecordBatch) -> ArrowResult { + (self.f)(batch) + } +} + +/// A [`RowFilter`] allows pushing down a filter predicate to skip IO and decode +/// +/// This consists of a list of [`ArrowPredicate`] where only the rows that satisfy all +/// of the predicates will be returned. Any [`RowSelection`] will be applied prior +/// to the first predicate, and each predicate in turn will then be used to compute +/// a more refined [`RowSelection`] to use when evaluating the subsequent predicates. +/// +/// Once all predicates have been evaluated, the resulting [`RowSelection`] will be +/// used to return just the desired rows. +/// +/// This design has a couple of implications: +/// +/// * [`RowFilter`] can be used to skip fetching IO, in addition to decode overheads +/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`] +/// * IO will be deferred until needed by a [`ProjectionMask`] +/// +/// As such there is a trade-off between a single large predicate, or multiple predicates, +/// that will depend on the shape of the data. Whilst multiple smaller predicates may +/// minimise the amount of data scanned/decoded, it may not be faster overall. +/// +pub struct RowFilter { + /// A list of [`ArrowPredicate`] + pub(crate) predicates: Vec>, +} + +impl RowFilter { + /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`] + pub fn new(predicates: Vec>) -> Self { + Self { predicates } + } +} diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader/mod.rs similarity index 96% rename from parquet/src/arrow/arrow_reader.rs rename to parquet/src/arrow/arrow_reader/mod.rs index 67bd2a619ffe..d1f457763301 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -36,6 +36,15 @@ use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader}; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::schema::types::SchemaDescriptor; +mod filter; +mod selection; + +// TODO: Make these public once stable (#1792) +#[allow(unused_imports)] +pub(crate) use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; +#[allow(unused_imports)] +pub(crate) use selection::{RowSelection, RowSelector}; + /// Arrow reader api. /// With this api, user can get arrow schema from parquet file, and read parquet data /// into arrow arrays. @@ -72,39 +81,10 @@ pub trait ArrowReader { ) -> Result; } -/// [`RowSelection`] allows selecting or skipping a provided number of rows -/// when scanning the parquet file -#[derive(Debug, Clone, Copy)] -pub(crate) struct RowSelection { - /// The number of rows - pub row_count: usize, - - /// If true, skip `row_count` rows - pub skip: bool, -} - -impl RowSelection { - /// Select `row_count` rows - pub fn select(row_count: usize) -> Self { - Self { - row_count, - skip: false, - } - } - - /// Skip `row_count` rows - pub fn skip(row_count: usize) -> Self { - Self { - row_count, - skip: true, - } - } -} - #[derive(Debug, Clone, Default)] pub struct ArrowReaderOptions { skip_arrow_metadata: bool, - selection: Option>, + selection: Option, } impl ArrowReaderOptions { @@ -128,11 +108,8 @@ impl ArrowReaderOptions { /// Scan rows from the parquet file according to the provided `selection` /// - /// TODO: Make public once row selection fully implemented (#1792) - pub(crate) fn with_row_selection( - self, - selection: impl Into>, - ) -> Self { + /// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available + pub(crate) fn with_row_selection(self, selection: impl Into) -> Self { Self { selection: Some(selection.into()), ..self @@ -140,6 +117,9 @@ impl ArrowReaderOptions { } } +/// An `ArrowReader` that can be used to synchronously read parquet data as [`RecordBatch`] +/// +/// See [`crate::arrow::async_reader`] for an asynchronous interface pub struct ParquetFileArrowReader { file_reader: Arc, @@ -175,21 +155,13 @@ impl ArrowReader for ParquetFileArrowReader { mask: ProjectionMask, batch_size: usize, ) -> Result { - let array_reader = build_array_reader( - self.file_reader - .metadata() - .file_metadata() - .schema_descr_ptr(), - Arc::new(self.get_schema()?), - mask, - Box::new(self.file_reader.clone()), - )?; + let array_reader = + build_array_reader(Arc::new(self.get_schema()?), mask, &self.file_reader)?; - let selection = self.options.selection.clone().map(Into::into); Ok(ParquetRecordBatchReader::new( batch_size, array_reader, - selection, + self.options.selection.clone(), )) } } @@ -276,11 +248,13 @@ impl ParquetFileArrowReader { } } +/// An `Iterator>` that yields [`RecordBatch`] +/// read from a parquet data source pub struct ParquetRecordBatchReader { batch_size: usize, array_reader: Box, schema: SchemaRef, - selection: Option>, + selection: Option>, } impl Iterator for ParquetRecordBatchReader { @@ -312,7 +286,7 @@ impl Iterator for ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection.push_front(RowSelection::select(remaining)); + selection.push_front(RowSelector::select(remaining)); self.batch_size } _ => front.row_count, @@ -349,22 +323,13 @@ impl RecordBatchReader for ParquetRecordBatchReader { } impl ParquetRecordBatchReader { - pub fn try_new( - batch_size: usize, - array_reader: Box, - ) -> Result { - Ok(Self::new(batch_size, array_reader, None)) - } - /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned - /// - /// TODO: Make public once row selection fully implemented (#1792) pub(crate) fn new( batch_size: usize, array_reader: Box, - selection: Option>, + selection: Option, ) -> Self { let schema = match array_reader.get_data_type() { ArrowType::Struct(ref fields) => Schema::new(fields.clone()), @@ -375,11 +340,36 @@ impl ParquetRecordBatchReader { batch_size, array_reader, schema: Arc::new(schema), - selection, + selection: selection.map(Into::into), } } } +/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`] +/// +/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the +/// returned [`RowSelection`] will be the conjunction of this and +/// the rows selected by `predicate` +pub(crate) fn evaluate_predicate( + batch_size: usize, + array_reader: Box, + selection: Option, + predicate: &mut dyn ArrowPredicate, +) -> Result { + let reader = + ParquetRecordBatchReader::new(batch_size, array_reader, selection.clone()); + let mut filters = vec![]; + for maybe_batch in reader { + filters.push(predicate.filter(maybe_batch?)?); + } + + let raw = RowSelection::from_filters(&filters); + Ok(match selection { + Some(selection) => selection.and(&raw), + None => raw, + }) +} + #[cfg(test)] mod tests { use bytes::Bytes; @@ -402,7 +392,7 @@ mod tests { use crate::arrow::arrow_reader::{ ArrowReader, ArrowReaderOptions, ParquetFileArrowReader, - ParquetRecordBatchReader, RowSelection, + ParquetRecordBatchReader, RowSelection, RowSelector, }; use crate::arrow::buffer::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, @@ -1693,12 +1683,12 @@ mod tests { /// a `batch_size` and `selection` fn get_expected_batches( column: &RecordBatch, - selection: &[RowSelection], + selection: &RowSelection, batch_size: usize, ) -> Vec { let mut expected_batches = vec![]; - let mut selection: VecDeque<_> = selection.iter().cloned().collect(); + let mut selection: VecDeque<_> = selection.clone().into(); let mut row_offset = 0; let mut last_start = None; while row_offset < column.num_rows() && !selection.is_empty() { @@ -1792,7 +1782,7 @@ mod tests { fn create_skip_reader( test_file: &File, batch_size: usize, - selections: Vec, + selections: RowSelection, ) -> ParquetRecordBatchReader { let arrow_reader_options = ArrowReaderOptions::new().with_row_selection(selections); @@ -1809,7 +1799,7 @@ mod tests { step_len: usize, total_len: usize, skip_first: bool, - ) -> Vec { + ) -> RowSelection { let mut remaining = total_len; let mut skip = skip_first; let mut vec = vec![]; @@ -1819,14 +1809,14 @@ mod tests { } else { remaining }; - vec.push(RowSelection { + vec.push(RowSelector { row_count: step, skip, }); remaining -= step; skip = !skip; } - vec + vec.into() } } } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs new file mode 100644 index 000000000000..3a9b69ca9bcc --- /dev/null +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, BooleanArray}; +use arrow::compute::SlicesIterator; +use std::cmp::Ordering; +use std::collections::VecDeque; +use std::ops::Range; + +/// [`RowSelection`] is a collection of [`RowSelect`] used to skip rows when +/// scanning a parquet file +#[derive(Debug, Clone, Copy)] +pub struct RowSelector { + /// The number of rows + pub row_count: usize, + + /// If true, skip `row_count` rows + pub skip: bool, +} + +impl RowSelector { + /// Select `row_count` rows + pub fn select(row_count: usize) -> Self { + Self { + row_count, + skip: false, + } + } + + /// Skip `row_count` rows + pub fn skip(row_count: usize) -> Self { + Self { + row_count, + skip: true, + } + } +} + +/// [`RowSelection`] allows selecting or skipping a provided number of rows +/// when scanning the parquet file. +/// +/// This is applied prior to reading column data, and can therefore +/// be used to skip IO to fetch data into memory +/// +/// A typical use-case would be using the [`PageIndex`] to filter out rows +/// that don't satisfy a predicate +/// +/// [`PageIndex`]: [crate::file::page_index::index::PageIndex] +#[derive(Debug, Clone, Default)] +pub struct RowSelection { + selectors: Vec, +} + +impl RowSelection { + /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] + /// + /// # Panic + /// + /// Panics if any of the [`BooleanArray`] contain nulls + pub fn from_filters(filters: &[BooleanArray]) -> Self { + let mut next_offset = 0; + let total_rows = filters.iter().map(|x| x.len()).sum(); + + let iter = filters.iter().flat_map(|filter| { + let offset = next_offset; + next_offset += filter.len(); + assert_eq!(filter.null_count(), 0); + SlicesIterator::new(filter) + .map(move |(start, end)| start + offset..end + offset) + }); + + Self::from_consecutive_ranges(iter, total_rows) + } + + /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep + fn from_consecutive_ranges>>( + ranges: I, + total_rows: usize, + ) -> Self { + let mut selectors: Vec = Vec::with_capacity(ranges.size_hint().0); + let mut last_end = 0; + for range in ranges { + let len = range.end - range.start; + + match range.start.cmp(&last_end) { + Ordering::Equal => match selectors.last_mut() { + Some(last) => last.row_count += len, + None => selectors.push(RowSelector::select(len)), + }, + Ordering::Greater => { + selectors.push(RowSelector::skip(range.start - last_end)); + selectors.push(RowSelector::select(len)) + } + Ordering::Less => panic!("out of order"), + } + last_end = range.end; + } + + if last_end != total_rows { + selectors.push(RowSelector::skip(total_rows - last_end)) + } + + Self { selectors } + } + + /// Splits off `row_count` from this [`RowSelection`] + pub fn split_off(&mut self, row_count: usize) -> Self { + let mut total_count = 0; + + // Find the index where the selector exceeds the row count + let find = self.selectors.iter().enumerate().find(|(_, selector)| { + total_count += selector.row_count; + total_count >= row_count + }); + + let split_idx = match find { + Some((idx, _)) => idx, + None => return Self::default(), + }; + + let mut remaining = self.selectors.split_off(split_idx); + if total_count != row_count { + let overflow = total_count - row_count; + let rem = remaining.first_mut().unwrap(); + rem.row_count -= overflow; + + self.selectors.push(RowSelector { + row_count, + skip: rem.skip, + }) + } + + std::mem::swap(&mut remaining, &mut self.selectors); + Self { + selectors: remaining, + } + } + + /// Given a [`RowSelection`] computed under `self` returns the [`RowSelection`] + /// representing their conjunction + pub fn and(&self, other: &Self) -> Self { + let mut selectors = vec![]; + let mut first = self.selectors.iter().cloned().peekable(); + let mut second = other.selectors.iter().cloned().peekable(); + + let mut to_skip = 0; + while let (Some(a), Some(b)) = (first.peek_mut(), second.peek_mut()) { + if a.row_count == 0 { + first.next().unwrap(); + continue; + } + + if b.row_count == 0 { + second.next().unwrap(); + continue; + } + + if a.skip { + // Records were skipped when producing second + to_skip += a.row_count; + first.next().unwrap(); + continue; + } + + let skip = b.skip; + let to_process = a.row_count.min(b.row_count); + + a.row_count -= to_process; + b.row_count -= to_process; + + match skip { + true => to_skip += to_process, + false => { + if to_skip != 0 { + selectors.push(RowSelector::skip(to_skip)); + to_skip = 0; + } + selectors.push(RowSelector::select(to_process)) + } + } + } + Self { selectors } + } + + /// Returns `true` if this [`RowSelection`] selects any rows + pub fn selects_any(&self) -> bool { + self.selectors.iter().any(|x| !x.skip) + } +} + +impl From> for RowSelection { + fn from(selectors: Vec) -> Self { + Self { selectors } + } +} + +impl Into> for RowSelection { + fn into(self) -> VecDeque { + self.selectors.into() + } +} diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 640d1b81f827..2d3d4ef7000b 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -86,6 +86,7 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; +use futures::ready; use futures::stream::Stream; use parquet_format::{PageHeader, PageType}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -94,7 +95,9 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; -use crate::arrow::arrow_reader::ParquetRecordBatchReader; +use crate::arrow::arrow_reader::{ + evaluate_predicate, ParquetRecordBatchReader, RowFilter, RowSelection, +}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::ProjectionMask; use crate::basic::Compression; @@ -102,7 +105,7 @@ use crate::column::page::{Page, PageIterator, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; @@ -195,9 +198,13 @@ pub struct ParquetRecordBatchStreamBuilder { row_groups: Option>, projection: ProjectionMask, + + filter: Option, + + selection: Option, } -impl ParquetRecordBatchStreamBuilder { +impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file pub async fn new(mut input: T) -> Result { let metadata = input.get_metadata().await?; @@ -214,6 +221,8 @@ impl ParquetRecordBatchStreamBuilder { batch_size: 1024, row_groups: None, projection: ProjectionMask::all(), + filter: None, + selection: None, }) } @@ -253,6 +262,30 @@ impl ParquetRecordBatchStreamBuilder { } } + /// Provide a [`RowSelection] to filter out rows, and avoid fetching their + /// data into memory + /// + /// Row group filtering is applied prior to this, and rows from skipped + /// row groups should not be included in the [`RowSelection`] + /// + /// TODO: Make public once stable (#1792) + pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self { + Self { + selection: Some(selection), + ..self + } + } + + /// Provide a [`RowFilter`] to skip decoding rows + /// + /// TODO: Make public once stable (#1792) + pub(crate) fn with_row_filter(self, filter: RowFilter) -> Self { + Self { + filter: Some(filter), + ..self + } + } + /// Build a new [`ParquetRecordBatchStream`] pub fn build(self) -> Result> { let num_row_groups = self.metadata.row_groups().len(); @@ -271,25 +304,122 @@ impl ParquetRecordBatchStreamBuilder { None => (0..self.metadata.row_groups().len()).collect(), }; + let reader = ReaderFactory { + input: self.input, + filter: self.filter, + metadata: self.metadata.clone(), + schema: self.schema.clone(), + }; + Ok(ParquetRecordBatchStream { + metadata: self.metadata, + batch_size: self.batch_size, row_groups, projection: self.projection, - batch_size: self.batch_size, - metadata: self.metadata, + selection: self.selection, schema: self.schema, - input: Some(self.input), + reader: Some(reader), state: StreamState::Init, }) } } +type ReadResult = Result<(ReaderFactory, Option)>; + +/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create +/// [`ParquetRecordBatchReader`] +struct ReaderFactory { + metadata: Arc, + + schema: SchemaRef, + + input: T, + + filter: Option, +} + +impl ReaderFactory +where + T: AsyncFileReader + Send, +{ + /// Reads the next row group with the provided `selection`, `projection` and `batch_size` + /// + /// Note: this captures self so that the resulting future has a static lifetime + async fn read_row_group( + mut self, + row_group_idx: usize, + mut selection: Option, + projection: ProjectionMask, + batch_size: usize, + ) -> ReadResult { + // TODO: calling build_array multiple times is wasteful + let selects_any = |selection: Option<&RowSelection>| { + selection.map(|x| x.selects_any()).unwrap_or(true) + }; + + let meta = self.metadata.row_group(row_group_idx); + let mut row_group = InMemoryRowGroup { + schema: meta.schema_descr_ptr(), + row_count: meta.num_rows() as usize, + column_chunks: vec![None; meta.columns().len()], + }; + + if let Some(filter) = self.filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + if !selects_any(selection.as_ref()) { + return Ok((self, None)); + } + + let predicate_projection = predicate.projection().clone(); + row_group + .fetch( + &mut self.input, + meta, + &predicate_projection, + selection.as_ref(), + ) + .await?; + + let array_reader = build_array_reader( + self.schema.clone(), + predicate_projection, + &row_group, + )?; + + selection = Some(evaluate_predicate( + batch_size, + array_reader, + selection, + predicate.as_mut(), + )?); + } + } + + if !selects_any(selection.as_ref()) { + return Ok((self, None)); + } + + row_group + .fetch(&mut self.input, meta, &projection, selection.as_ref()) + .await?; + + let reader = ParquetRecordBatchReader::new( + batch_size, + build_array_reader(self.schema.clone(), projection, &row_group)?, + selection, + ); + + Ok((self, Some(reader))) + } +} + enum StreamState { /// At the start of a new row group, or the end of the parquet stream Init, /// Decoding a batch Decoding(ParquetRecordBatchReader), /// Reading data from input - Reading(BoxFuture<'static, Result<(T, InMemoryRowGroup)>>), + Reading(BoxFuture<'static, ReadResult>), /// Error Error, } @@ -305,20 +435,23 @@ impl std::fmt::Debug for StreamState { } } -/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file +/// An asynchronous [`Stream`] of [`RecordBatch`] for a parquet file that can be +/// constructed using [`ParquetRecordBatchStreamBuilder`] pub struct ParquetRecordBatchStream { metadata: Arc, schema: SchemaRef, - batch_size: usize, + row_groups: VecDeque, projection: ProjectionMask, - row_groups: VecDeque, + batch_size: usize, + + selection: Option, /// This is an option so it can be moved into a future - input: Option, + reader: Option>, state: StreamState, } @@ -370,101 +503,40 @@ where None => return Poll::Ready(None), }; - let metadata = self.metadata.clone(); - let mut input = match self.input.take() { - Some(input) => input, - None => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(general_err!( - "input stream lost" - )))); - } - }; + let reader = self.reader.take().expect("lost reader"); - let projection = self.projection.clone(); - self.state = StreamState::Reading( - async move { - let row_group_metadata = metadata.row_group(row_group_idx); - let mut column_chunks = - vec![None; row_group_metadata.columns().len()]; - - // TODO: Combine consecutive ranges - let fetch_ranges = (0..column_chunks.len()) - .into_iter() - .filter_map(|idx| { - if !projection.leaf_included(idx) { - None - } else { - let column = row_group_metadata.column(idx); - let (start, length) = column.byte_range(); - - Some(start as usize..(start + length) as usize) - } - }) - .collect(); - - let mut chunk_data = - input.get_byte_ranges(fetch_ranges).await?.into_iter(); - - for (idx, chunk) in column_chunks.iter_mut().enumerate() { - if !projection.leaf_included(idx) { - continue; - } - - let column = row_group_metadata.column(idx); - - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); - } - } - - Ok(( - input, - InMemoryRowGroup { - schema: metadata.file_metadata().schema_descr_ptr(), - row_count: row_group_metadata.num_rows() as usize, - column_chunks, - }, - )) - } - .boxed(), - ) - } - StreamState::Reading(f) => { - let result = futures::ready!(f.poll_unpin(cx)); - self.state = StreamState::Init; - - let row_group: Box = match result { - Ok((input, row_group)) => { - self.input = Some(input); - Box::new(row_group) - } - Err(e) => { - self.state = StreamState::Error; - return Poll::Ready(Some(Err(e))); - } - }; + let row_count = + self.metadata.row_group(row_group_idx).num_rows() as usize; - let parquet_schema = self.metadata.file_metadata().schema_descr_ptr(); + let selection = + self.selection.as_mut().map(|s| s.split_off(row_count)); - let array_reader = build_array_reader( - parquet_schema, - self.schema.clone(), - self.projection.clone(), - row_group, - )?; - - let batch_reader = - ParquetRecordBatchReader::try_new(self.batch_size, array_reader) - .expect("reader"); + let fut = reader + .read_row_group( + row_group_idx, + selection, + self.projection.clone(), + self.batch_size, + ) + .boxed(); - self.state = StreamState::Decoding(batch_reader) + self.state = StreamState::Reading(fut) } + StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) { + Ok((reader_factory, maybe_reader)) => { + self.reader = Some(reader_factory); + match maybe_reader { + // Read records from [`ParquetRecordBatchReader`] + Some(reader) => self.state = StreamState::Decoding(reader), + // All rows skipped, read next row group + None => self.state = StreamState::Init, + } + } + Err(e) => { + self.state = StreamState::Error; + return Poll::Ready(Some(Err(e))); + } + }, StreamState::Error => return Poll::Pending, } } @@ -478,9 +550,56 @@ struct InMemoryRowGroup { row_count: usize, } +impl InMemoryRowGroup { + /// Fetches the necessary column data into memory + async fn fetch( + &mut self, + input: &mut T, + metadata: &RowGroupMetaData, + projection: &ProjectionMask, + _selection: Option<&RowSelection>, + ) -> Result<()> { + // TODO: Use OffsetIndex and selection to prune pages + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); + + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + let column = metadata.column(idx); + + if let Some(data) = chunk_data.next() { + *chunk = Some(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data, + }); + } + } + Ok(()) + } +} + impl RowGroupCollection for InMemoryRowGroup { - fn schema(&self) -> Result { - Ok(self.schema.clone()) + fn schema(&self) -> SchemaDescPtr { + self.schema.clone() } fn num_rows(&self) -> usize { @@ -671,7 +790,10 @@ impl PageIterator for ColumnChunkIterator { #[cfg(test)] mod tests { use super::*; - use crate::arrow::{ArrowReader, ParquetFileArrowReader}; + use crate::arrow::arrow_reader::ArrowPredicateFn; + use crate::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; + use crate::file::footer::parse_metadata; + use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; use arrow::error::Result as ArrowResult; use futures::TryStreamExt; use std::sync::Mutex; @@ -844,4 +966,73 @@ mod tests { assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE); assert_eq!(second_page.num_values(), 8); } + + #[tokio::test] + async fn test_row_filter() { + let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); + let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]); + let c = Int32Array::from_iter(0..6); + let data = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ("c", Arc::new(c) as ArrayRef), + ]) + .unwrap(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap(); + writer.write(&data).unwrap(); + writer.close().unwrap(); + + let data: Bytes = buf.into(); + let metadata = parse_metadata(&data).unwrap(); + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + + let test = TestReader { + data, + metadata: Arc::new(metadata), + requests: Default::default(), + }; + let requests = test.requests.clone(); + + let a_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![0]), + |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "b"), + ); + + let b_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![1]), + |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "4"), + ); + + let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); + + let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); + let stream = ParquetRecordBatchStreamBuilder::new(test) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 2); + + let col = batch.column(0); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, "b"); + + let col = batch.column(1); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, 3); + + // Should only have made 3 requests + assert_eq!(requests.lock().unwrap().len(), 3); + } } From 7037126957b0de55abdc40a9342a2fa5338329c9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 8 Aug 2022 11:37:12 +0100 Subject: [PATCH 2/8] Review feedback --- parquet/src/arrow/arrow_reader/filter.rs | 26 ++++++++++++++++----- parquet/src/arrow/arrow_reader/mod.rs | 11 +++++---- parquet/src/arrow/arrow_reader/selection.rs | 3 +-- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index a5beba5c971d..f21374a21700 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -22,14 +22,17 @@ use arrow::record_batch::RecordBatch; /// A predicate operating on [`RecordBatch`] pub trait ArrowPredicate: Send + 'static { - /// Returns the projection mask for this predicate + /// Returns the [`ProjectionMask`] that describes the columns required + /// to evaluate this predicate. All projected columns will be provided in the `batch` + /// passed to [`filter`](Self::filter) fn projection(&self) -> &ProjectionMask; /// Called with a [`RecordBatch`] containing the columns identified by [`Self::mask`], /// with `true` values in the returned [`BooleanArray`] indicating rows /// matching the predicate. /// - /// The returned [`BooleanArray`] must not contain any nulls + /// All row that are `true` in returned [`BooleanArray`] will be returned to the reader. + /// Any rows that are `false` or `Null` will not be fn filter(&mut self, batch: RecordBatch) -> ArrowResult; } @@ -43,7 +46,10 @@ impl ArrowPredicateFn where F: FnMut(RecordBatch) -> ArrowResult + Send + 'static, { - /// Create a new [`ArrowPredicateFn`] + /// Create a new [`ArrowPredicateFn`]. `f` will be passed batches + /// that contains the columns specified in `projection` + /// and returns a [`BooleanArray`] that describes which rows should + /// be passed along pub fn new(projection: ProjectionMask, f: F) -> Self { Self { f, projection } } @@ -69,12 +75,12 @@ where /// to the first predicate, and each predicate in turn will then be used to compute /// a more refined [`RowSelection`] to use when evaluating the subsequent predicates. /// -/// Once all predicates have been evaluated, the resulting [`RowSelection`] will be -/// used to return just the desired rows. +/// Once all predicates have been evaluated, the final [`RowSelection`] is applied +/// to the overall [`ColumnProjection`] to produce the final output [`RecordBatch`]. /// /// This design has a couple of implications: /// -/// * [`RowFilter`] can be used to skip fetching IO, in addition to decode overheads +/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads /// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`] /// * IO will be deferred until needed by a [`ProjectionMask`] /// @@ -82,6 +88,14 @@ where /// that will depend on the shape of the data. Whilst multiple smaller predicates may /// minimise the amount of data scanned/decoded, it may not be faster overall. /// +/// For example, if a predicate that needs a single column of data filters out all but +/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly +/// improve performance. +/// +/// As a counter example, if a predicate needs several columns of data to evaluate but +/// leaves 99% of the rows, it may be better to not filter the data from parquet and +/// apply the filter after the RecordBatch has been fully decoded. +/// pub struct RowFilter { /// A list of [`ArrowPredicate`] pub(crate) predicates: Vec>, diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d1f457763301..75c5a16bb23b 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -21,6 +21,7 @@ use std::collections::VecDeque; use std::sync::Arc; use arrow::array::Array; +use arrow::compute::prep_null_mask_filter; use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; @@ -353,18 +354,20 @@ impl ParquetRecordBatchReader { pub(crate) fn evaluate_predicate( batch_size: usize, array_reader: Box, - selection: Option, + input_selection: Option, predicate: &mut dyn ArrowPredicate, ) -> Result { let reader = - ParquetRecordBatchReader::new(batch_size, array_reader, selection.clone()); + ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); let mut filters = vec![]; for maybe_batch in reader { - filters.push(predicate.filter(maybe_batch?)?); + let filter = predicate.filter(maybe_batch?)?; + let filter = prep_null_mask_filter(&filter)?; + filters.push(filter); } let raw = RowSelection::from_filters(&filters); - Ok(match selection { + Ok(match input_selection { Some(selection) => selection.and(&raw), None => raw, }) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 3a9b69ca9bcc..7333effc3aa1 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -21,8 +21,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -/// [`RowSelection`] is a collection of [`RowSelect`] used to skip rows when -/// scanning a parquet file +/// [`RowSelector`] represents a range of rows to scan from a parquet file #[derive(Debug, Clone, Copy)] pub struct RowSelector { /// The number of rows From ebd85b391ab55cdb55883e4023e9f26f6a2794c8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 9 Aug 2022 14:04:53 +0100 Subject: [PATCH 3/8] Fix doc --- parquet/src/arrow/arrow_reader/filter.rs | 5 +++-- parquet/src/arrow/async_reader.rs | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index f21374a21700..64cf034a52c9 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -27,7 +27,7 @@ pub trait ArrowPredicate: Send + 'static { /// passed to [`filter`](Self::filter) fn projection(&self) -> &ProjectionMask; - /// Called with a [`RecordBatch`] containing the columns identified by [`Self::mask`], + /// Called with a [`RecordBatch`] containing the columns identified by [`Self::projection`], /// with `true` values in the returned [`BooleanArray`] indicating rows /// matching the predicate. /// @@ -76,7 +76,7 @@ where /// a more refined [`RowSelection`] to use when evaluating the subsequent predicates. /// /// Once all predicates have been evaluated, the final [`RowSelection`] is applied -/// to the overall [`ColumnProjection`] to produce the final output [`RecordBatch`]. +/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`]. /// /// This design has a couple of implications: /// @@ -96,6 +96,7 @@ where /// leaves 99% of the rows, it may be better to not filter the data from parquet and /// apply the filter after the RecordBatch has been fully decoded. /// +/// [`RowSelection`]: [super::selection::RowSelection] pub struct RowFilter { /// A list of [`ArrowPredicate`] pub(crate) predicates: Vec>, diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index cb16354a455d..51c4a3c4a8ce 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -266,6 +266,7 @@ impl ParquetRecordBatchStreamBuilder { /// row groups should not be included in the [`RowSelection`] /// /// TODO: Make public once stable (#1792) + #[allow(unused)] pub(crate) fn with_row_selection(self, selection: RowSelection) -> Self { Self { selection: Some(selection), @@ -276,6 +277,7 @@ impl ParquetRecordBatchStreamBuilder { /// Provide a [`RowFilter`] to skip decoding rows /// /// TODO: Make public once stable (#1792) + #[allow(unused)] pub(crate) fn with_row_filter(self, filter: RowFilter) -> Self { Self { filter: Some(filter), From 3f3e20094a209a4f0c16bbcb70d04b12c1208aa1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 9 Aug 2022 16:15:07 +0100 Subject: [PATCH 4/8] Fix handling of NULL boolean array --- parquet/src/arrow/arrow_reader/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index c14af9f5ae0c..6734dca71463 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -378,7 +378,10 @@ pub(crate) fn evaluate_predicate( let mut filters = vec![]; for maybe_batch in reader { let filter = predicate.filter(maybe_batch?)?; - filters.push(prep_null_mask_filter(&filter)); + match filter.null_count() { + 0 => filters.push(filter), + _ => filters.push(prep_null_mask_filter(&filter)), + }; } let raw = RowSelection::from_filters(&filters); From 6f196c0efb410ad4438f098e8f48128907beef08 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 9 Aug 2022 17:45:33 +0100 Subject: [PATCH 5/8] Add tests, fix bugs --- parquet/src/arrow/arrow_reader/selection.rs | 237 ++++++++++++++++++-- 1 file changed, 221 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index b947156c5cd9..992d449a3374 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -22,7 +22,7 @@ use std::collections::VecDeque; use std::ops::Range; /// [`RowSelector`] represents a range of rows to scan from a parquet file -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct RowSelector { /// The number of rows pub row_count: usize, @@ -59,7 +59,7 @@ impl RowSelector { /// that don't satisfy a predicate /// /// [`PageIndex`]: [crate::file::page_index::index::PageIndex] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Eq, PartialEq)] pub struct RowSelection { selectors: Vec, } @@ -116,32 +116,37 @@ impl RowSelection { Self { selectors } } - /// Splits off `row_count` from this [`RowSelection`] + /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { let mut total_count = 0; // Find the index where the selector exceeds the row count let find = self.selectors.iter().enumerate().find(|(_, selector)| { total_count += selector.row_count; - total_count >= row_count + total_count > row_count }); let split_idx = match find { Some((idx, _)) => idx, - None => return Self::default(), + None => { + let selectors = std::mem::take(&mut self.selectors); + return Self { selectors }; + } }; let mut remaining = self.selectors.split_off(split_idx); - if total_count != row_count { - let overflow = total_count - row_count; - let rem = remaining.first_mut().unwrap(); - rem.row_count -= overflow; + // Always present as `split_idx < self.selectors.len` + let next = remaining.first_mut().unwrap(); + let overflow = total_count - row_count; + + if next.row_count != overflow { self.selectors.push(RowSelector { - row_count, - skip: rem.skip, + row_count: next.row_count - overflow, + skip: next.skip, }) } + next.row_count = overflow; std::mem::swap(&mut remaining, &mut self.selectors); Self { @@ -157,17 +162,19 @@ impl RowSelection { let mut second = other.selectors.iter().cloned().peekable(); let mut to_skip = 0; - while let (Some(a), Some(b)) = (first.peek_mut(), second.peek_mut()) { - if a.row_count == 0 { - first.next().unwrap(); - continue; - } + while let Some(b) = second.peek_mut() { + let a = first.peek_mut().unwrap(); if b.row_count == 0 { second.next().unwrap(); continue; } + if a.row_count == 0 { + first.next().unwrap(); + continue; + } + if a.skip { // Records were skipped when producing second to_skip += a.row_count; @@ -192,6 +199,18 @@ impl RowSelection { } } } + + for v in first { + if v.row_count != 0 { + assert!(v.skip); + to_skip += v.row_count + } + } + + if to_skip != 0 { + selectors.push(RowSelector::skip(to_skip)); + } + Self { selectors } } @@ -212,3 +231,189 @@ impl From for VecDeque { r.selectors.into() } } + +#[cfg(test)] +mod tests { + use super::*; + use rand::{thread_rng, Rng}; + + #[test] + fn test_from_filters() { + let filters = vec![ + BooleanArray::from(vec![false, false, false, true, true, true, true]), + BooleanArray::from(vec![true, true, false, false, true, true, true]), + BooleanArray::from(vec![false, false, false, false]), + BooleanArray::from(Vec::::new()), + ]; + + let selection = RowSelection::from_filters(&filters[..1]); + assert!(selection.selects_any()); + assert_eq!( + selection.selectors, + vec![RowSelector::skip(3), RowSelector::select(4)] + ); + + let selection = RowSelection::from_filters(&filters[..2]); + assert!(selection.selects_any()); + assert_eq!( + selection.selectors, + vec![ + RowSelector::skip(3), + RowSelector::select(6), + RowSelector::skip(2), + RowSelector::select(3) + ] + ); + + let selection = RowSelection::from_filters(&filters); + assert!(selection.selects_any()); + assert_eq!( + selection.selectors, + vec![ + RowSelector::skip(3), + RowSelector::select(6), + RowSelector::skip(2), + RowSelector::select(3), + RowSelector::skip(4) + ] + ); + + let selection = RowSelection::from_filters(&filters[2..3]); + assert!(!selection.selects_any()); + assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + } + + #[test] + fn test_split_off() { + let mut selection = RowSelection::from(vec![ + RowSelector::skip(34), + RowSelector::select(12), + RowSelector::skip(3), + RowSelector::select(35), + ]); + + let split = selection.split_off(34); + assert_eq!(split.selectors, vec![RowSelector::skip(34)]); + assert_eq!( + selection.selectors, + vec![ + RowSelector::select(12), + RowSelector::skip(3), + RowSelector::select(35) + ] + ); + + let split = selection.split_off(5); + assert_eq!(split.selectors, vec![RowSelector::select(5)]); + assert_eq!( + selection.selectors, + vec![ + RowSelector::select(7), + RowSelector::skip(3), + RowSelector::select(35) + ] + ); + + let split = selection.split_off(8); + assert_eq!( + split.selectors, + vec![RowSelector::select(7), RowSelector::skip(1)] + ); + assert_eq!( + selection.selectors, + vec![RowSelector::skip(2), RowSelector::select(35)] + ); + + let split = selection.split_off(200); + assert_eq!( + split.selectors, + vec![RowSelector::skip(2), RowSelector::select(35)] + ); + assert!(selection.selectors.is_empty()); + } + + #[test] + fn test_and() { + let mut a = RowSelection::from(vec![ + RowSelector::skip(12), + RowSelector::select(23), + RowSelector::skip(3), + RowSelector::select(5), + ]); + + let b = RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(15), + RowSelector::skip(4), + ]); + + let mut expected = RowSelection::from(vec![ + RowSelector::skip(12), + RowSelector::select(5), + RowSelector::skip(4), + RowSelector::select(14), + RowSelector::skip(3), + RowSelector::select(1), + RowSelector::skip(4), + ]); + + assert_eq!(a.and(&b), expected); + + a.split_off(7); + expected.split_off(7); + assert_eq!(a.and(&b), expected); + + let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]); + + let b = RowSelection::from(vec![ + RowSelector::select(2), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(1), + ]); + + assert_eq!( + a.and(&b).selectors, + vec![ + RowSelector::select(2), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(4) + ] + ); + } + + #[test] + fn test_and_fuzz() { + let mut rand = thread_rng(); + for _ in 0..100 { + let a_len = rand.gen_range(10..100); + let a_bools: Vec<_> = (0..a_len).map(|x| rand.gen_bool(0.2)).collect(); + let a = RowSelection::from_filters(&[BooleanArray::from(a_bools.clone())]); + + let b_len: usize = a_bools.iter().map(|x| *x as usize).sum(); + let b_bools: Vec<_> = (0..b_len).map(|x| rand.gen_bool(0.8)).collect(); + let b = RowSelection::from_filters(&[BooleanArray::from(b_bools.clone())]); + + let mut expected_bools = vec![false; a_len]; + + let mut iter_b = b_bools.iter(); + for (idx, b) in a_bools.iter().enumerate() { + if *b { + if *iter_b.next().unwrap() { + expected_bools[idx] = true; + } + } + } + + let expected = + RowSelection::from_filters(&[BooleanArray::from(expected_bools)]); + + let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum(); + assert_eq!(a_len, total_rows); + + assert_eq!(a.and(&b), expected); + } + } +} From 623587df909c6da185188ca20207917f402ef47d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 9 Aug 2022 17:51:08 +0100 Subject: [PATCH 6/8] Fix clippy --- parquet/src/arrow/arrow_reader/selection.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 992d449a3374..9d4092af9a9e 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -400,10 +400,8 @@ mod tests { let mut iter_b = b_bools.iter(); for (idx, b) in a_bools.iter().enumerate() { - if *b { - if *iter_b.next().unwrap() { - expected_bools[idx] = true; - } + if *b && *iter_b.next().unwrap() { + expected_bools[idx] = true; } } From 87604479520824fdfaa78eb944a4016885bff6a7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 11 Aug 2022 20:29:36 +0100 Subject: [PATCH 7/8] Review feedback --- parquet/src/arrow/arrow_reader/filter.rs | 13 ++++++------- parquet/src/arrow/arrow_reader/mod.rs | 4 ++-- parquet/src/arrow/arrow_reader/selection.rs | 21 +++++++++++++++------ 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index 64cf034a52c9..7dc81c219619 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -27,13 +27,12 @@ pub trait ArrowPredicate: Send + 'static { /// passed to [`filter`](Self::filter) fn projection(&self) -> &ProjectionMask; - /// Called with a [`RecordBatch`] containing the columns identified by [`Self::projection`], - /// with `true` values in the returned [`BooleanArray`] indicating rows - /// matching the predicate. + /// Evaluate this predicate for the given [`RecordBatch`] containing the columns + /// identified by [`Self::projection`] /// - /// All row that are `true` in returned [`BooleanArray`] will be returned to the reader. - /// Any rows that are `false` or `Null` will not be - fn filter(&mut self, batch: RecordBatch) -> ArrowResult; + /// Rows that are `true` in the returned [`BooleanArray`] will be returned by the + /// parquet reader, whereas rows that are `false` or `Null` will not be + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult; } /// An [`ArrowPredicate`] created from an [`FnMut`] @@ -63,7 +62,7 @@ where &self.projection } - fn filter(&mut self, batch: RecordBatch) -> ArrowResult { + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult { (self.f)(batch) } } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 03da81cffbeb..e363919f6516 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -377,7 +377,7 @@ pub(crate) fn evaluate_predicate( ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); let mut filters = vec![]; for maybe_batch in reader { - let filter = predicate.filter(maybe_batch?)?; + let filter = predicate.evaluate(maybe_batch?)?; match filter.null_count() { 0 => filters.push(filter), _ => filters.push(prep_null_mask_filter(&filter)), @@ -386,7 +386,7 @@ pub(crate) fn evaluate_predicate( let raw = RowSelection::from_filters(&filters); Ok(match input_selection { - Some(selection) => selection.and(&raw), + Some(selection) => selection.and_then(&raw), None => raw, }) } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 9d4092af9a9e..8e129f5667ec 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -154,9 +154,18 @@ impl RowSelection { } } - /// Given a [`RowSelection`] computed under `self` returns the [`RowSelection`] + /// Given a [`RowSelection`] computed under `self`, returns the [`RowSelection`] /// representing their conjunction - pub fn and(&self, other: &Self) -> Self { + /// + /// For example: + /// + /// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY + /// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN + /// + /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN + /// + /// + pub fn and_then(&self, other: &Self) -> Self { let mut selectors = vec![]; let mut first = self.selectors.iter().cloned().peekable(); let mut second = other.selectors.iter().cloned().peekable(); @@ -358,11 +367,11 @@ mod tests { RowSelector::skip(4), ]); - assert_eq!(a.and(&b), expected); + assert_eq!(a.and_then(&b), expected); a.split_off(7); expected.split_off(7); - assert_eq!(a.and(&b), expected); + assert_eq!(a.and_then(&b), expected); let a = RowSelection::from(vec![RowSelector::select(5), RowSelector::skip(3)]); @@ -374,7 +383,7 @@ mod tests { ]); assert_eq!( - a.and(&b).selectors, + a.and_then(&b).selectors, vec![ RowSelector::select(2), RowSelector::skip(1), @@ -411,7 +420,7 @@ mod tests { let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum(); assert_eq!(a_len, total_rows); - assert_eq!(a.and(&b), expected); + assert_eq!(a.and_then(&b), expected); } } } From acb44f8977a40712d12fed965b5c33fd052def87 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 11 Aug 2022 20:35:53 +0100 Subject: [PATCH 8/8] Fix doc --- parquet/src/arrow/arrow_reader/filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/filter.rs b/parquet/src/arrow/arrow_reader/filter.rs index 7dc81c219619..8945ccde4248 100644 --- a/parquet/src/arrow/arrow_reader/filter.rs +++ b/parquet/src/arrow/arrow_reader/filter.rs @@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch; pub trait ArrowPredicate: Send + 'static { /// Returns the [`ProjectionMask`] that describes the columns required /// to evaluate this predicate. All projected columns will be provided in the `batch` - /// passed to [`filter`](Self::filter) + /// passed to [`evaluate`](Self::evaluate) fn projection(&self) -> &ProjectionMask; /// Evaluate this predicate for the given [`RecordBatch`] containing the columns