Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet RowFilter API #2335

Merged
merged 11 commits into from
Aug 11, 2022
Merged

Add Parquet RowFilter API #2335

merged 11 commits into from
Aug 11, 2022

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Aug 5, 2022

Draft as needs a lot more test coverage and general cleanup

Which issue does this PR close?

Closes #2270

Rationale for this change

What changes are included in this PR?

This adds a RowFilter API and refines the existing RowSelection API. There are a couple of things worth highlighting here

  • The RowFilter is pushed down to the IO level. @crepererum gave a good use case of if this allows eliminating an entire column chunk from consideration, etc...
  • I think we need a RecordBatchReaderBuilder as the current API can't really be used for this purpose

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label Aug 5, 2022
arrow_schema: SchemaRef,
mask: ProjectionMask,
row_groups: Box<dyn RowGroupCollection>,
row_groups: &dyn RowGroupCollection,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by cleanup

@@ -110,8 +110,8 @@ pub trait RowGroupCollection {
}

impl RowGroupCollection for Arc<dyn FileReader> {
fn schema(&self) -> Result<SchemaDescPtr> {
Ok(self.metadata().file_metadata().schema_descr_ptr())
fn schema(&self) -> SchemaDescPtr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by cleanup

use arrow::record_batch::RecordBatch;

/// A predicate operating on [`RecordBatch`]
pub trait ArrowPredicate: Send + 'static {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make things more extensible in the long run

@@ -349,22 +323,13 @@ impl RecordBatchReader for ParquetRecordBatchReader {
}

impl ParquetRecordBatchReader {
pub fn try_new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module is not public, and this method was only being used in one place, so we can just remove it

) -> Result<RowSelection> {
let reader =
ParquetRecordBatchReader::new(batch_size, array_reader, selection.clone());
let mut filters = vec![];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could theoretically keep the decoded arrays around, but requires a lot of non-trivial take + concat in order to sync up the yielded batches. It also potentially balloons the memory consumption. I decided it was not worth it

/// Once all predicates have been evaluated, the resulting [`RowSelection`] will be
/// used to return just the desired rows.
///
/// This design has a couple of implications:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major change vs #2310, FYI @thinkharderdev

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like this

self,
selection: impl Into<Vec<RowSelection>>,
) -> Self {
/// TODO: Revisit this API, as [`Self`] is provided before the file metadata is available
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intend to revisit this as part of the next (21) arrow release, I suspect we can move to a builder and deprecate the current API which is quite clunky

fn into(self) -> VecDeque<RowSelector> {
self.selectors.into()
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file definitely needs some tests prior to merge. The code is largely lifted from #2201

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- I didn't review the logic in detail either as I figured we are just at the "API feedback" design phase

vec![None; row_group_metadata.columns().len()];

// TODO: Combine consecutive ranges
let fetch_ranges = (0..column_chunks.len())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is moved into InMemoryRowGroup::fetch

@tustvold
Copy link
Contributor Author

tustvold commented Aug 5, 2022

FYI @thinkharderdev @alamb @crepererum @Ted-Jiang I'd appreciate any feedback you might have on this

Comment on lines +402 to +410
row_group
.fetch(&mut self.input, meta, &projection, selection.as_ref())
.await?;

let reader = ParquetRecordBatchReader::new(
batch_size,
build_array_reader(self.schema.clone(), projection, &row_group)?,
selection,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would require decoding the filter columns twice (or multiple times in the case where we have multiple predicates) right?

Copy link
Contributor Author

@tustvold tustvold Aug 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a column appears in multiple predicates and/or the final projection, it will need to be decoded multiple times. I don't really see a way around this, keeping the data around and doing take + concat adds significant complexity, and it is unclear that it would necessarily be faster.

Eventually it might be possible to push simple predicates down to operate directly on the encoded data, which would avoid this. But that is a wee ways off 😅

Copy link
Contributor

@thinkharderdev thinkharderdev Aug 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think in many cases the difference would be negligible but in the degenerate cases (lots of predicates, filters which don't do much filtering, etc) I think it could potentially add up. The reason I worry about it in general is that we would have to rely on the engine to determine which predicates to apply and in which order. And in a situation where all we have is row group metadata we don't have a ton to go on.

I took a crack at seeing what it might look like preserving the decoded arrays and came up with tustvold#24. It certainly involves a lot of array slicing and dicing but the complexity seems manageable and would help ensure that applying filters doesn't ever come with a significant performance cost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think trying to eliminate redundant decoding is a good idea for the reasons that @thinkharderdev

Conveniently, it seems like nothing in the API of this PR requires decoding multiple times, so I think we could also potentially implement the 'use take rather than redundant decode' in a follow on PR as well.

In terms of "Eventually it might be possible to push simple predicates down to operate directly on the encoded data, which would avoid this." I agree it is a ways off however, I think it could fit into this API with something like adding a list of ParquetFilters to apply during the decode itself that could be efficiently implemented

/// Filter that is applied during decoding of a single column
/// semantically takes the form <col> <op> <constant>
struct ParquetFilter {
  op: ParquetOp
  left: ParquetConst,
}
enum ParquetFilterOp { EQ, NEQ }  
enum ParqeutConst {
  Int64(i64)
  Float(f64)
  ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conveniently, it seems like nothing in the API of this PR requires decoding multiple times, so I think we could also potentially implement the 'use take rather than redundant decode' in a follow on PR as well.

I agree. As @tustvold points out, we need to be careful about memory overhead so maybe the best course is to go with the current approach and tackle avoiding the redundant decoding in a follow up. I prototyped the DataFusion piece (based on my other draft PR but it should be roughly similar) to see how it affected our internal benchmarks and saw a roughly 50% improvement on queries with reasonably selective predicates. We went from being mostly CPU bound with parquet decoding to being mostly IO bound which means I expect there is even more room for improvement once we are using the selection and the page index to avoid IO altogether. That's all to say I'm super excited about this work and think it will be a huge step forward!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a follow up ticket to investigate this 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We went from being mostly CPU bound with parquet decoding to being mostly IO bound which means I expect there is even more room for improvement once we are using the selection and the page index to avoid IO altogether. That's all to say I'm super excited about this work and think it will be a huge step forward!

That is terrific news 🎉

@alamb alamb changed the title RFC: Add RowFilter API RFC: Add Parquet RowFilter API Aug 6, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also really like this API -- 👍 @tustvold

thanks for the feedback @thinkharderdev . I think this design will be better due to all the community feedback ❤️

parquet/src/arrow/arrow_reader/filter.rs Outdated Show resolved Hide resolved
parquet/src/arrow/arrow_reader/filter.rs Outdated Show resolved Hide resolved
parquet/src/arrow/arrow_reader/filter.rs Outdated Show resolved Hide resolved
parquet/src/arrow/arrow_reader/filter.rs Outdated Show resolved Hide resolved
parquet/src/arrow/arrow_reader/filter.rs Show resolved Hide resolved
parquet/src/arrow/arrow_reader/selection.rs Outdated Show resolved Hide resolved
parquet/src/arrow/arrow_reader/selection.rs Show resolved Hide resolved
fn into(self) -> VecDeque<RowSelector> {
self.selectors.into()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- I didn't review the logic in detail either as I figured we are just at the "API feedback" design phase

/// Row group filtering is applied prior to this, and rows from skipped
/// row groups should not be included in the [`RowSelection`]
///
/// TODO: Make public once stable (#1792)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably also a good idea to link to the docs that describe the order of filter application in decoding (RowSelection followed by RowFilter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc links should do this automatically?

Comment on lines +402 to +410
row_group
.fetch(&mut self.input, meta, &projection, selection.as_ref())
.await?;

let reader = ParquetRecordBatchReader::new(
batch_size,
build_array_reader(self.schema.clone(), projection, &row_group)?,
selection,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think trying to eliminate redundant decoding is a good idea for the reasons that @thinkharderdev

Conveniently, it seems like nothing in the API of this PR requires decoding multiple times, so I think we could also potentially implement the 'use take rather than redundant decode' in a follow on PR as well.

In terms of "Eventually it might be possible to push simple predicates down to operate directly on the encoded data, which would avoid this." I agree it is a ways off however, I think it could fit into this API with something like adding a list of ParquetFilters to apply during the decode itself that could be efficiently implemented

/// Filter that is applied during decoding of a single column
/// semantically takes the form <col> <op> <constant>
struct ParquetFilter {
  op: ParquetOp
  left: ParquetConst,
}
enum ParquetFilterOp { EQ, NEQ }  
enum ParqeutConst {
  Int64(i64)
  Float(f64)
  ...
}

async fn read_row_group(
mut self,
row_group_idx: usize,
mut selection: Option<RowSelection>,
Copy link
Member

@Ted-Jiang Ted-Jiang Aug 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a situation we need read row groups with previous row group selection? 🤔

Edit: sorry it may come from pageIndex, forgive me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need the previous each filter rate 😂(just a idea)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous each filter rate

I'm not sure what you mean?

projection: &ProjectionMask,
_selection: Option<&RowSelection>,
) -> Result<()> {
// TODO: Use OffsetIndex and selection to prune pages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this avoid huge IO work in some situation make pageIndex more useful !
I think it needs takes a lot of testing to decide when use random skip reads

@tustvold tustvold changed the title RFC: Add Parquet RowFilter API Add Parquet RowFilter API Aug 9, 2022
@tustvold tustvold marked this pull request as ready for review August 9, 2022 16:47
@tustvold
Copy link
Contributor Author

tustvold commented Aug 9, 2022

I think this is now good for review, it isn't public (yet) and so doesn't need to be perfect, but I think we can continue to iterate on this. I will file follow up tickets for follow on work tomorrow.

/// with `true` values in the returned [`BooleanArray`] indicating rows
/// matching the predicate.
///
/// All row that are `true` in returned [`BooleanArray`] will be returned to the reader.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor typo: All row is probably meant to be All rows (missing "s")

///
/// All row that are `true` in returned [`BooleanArray`] will be returned to the reader.
/// Any rows that are `false` or `Null` will not be
fn filter(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray>;
Copy link
Contributor

@yordan-pavlov yordan-pavlov Aug 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be named fn filter_array instead to better indicate that the result would be a Boolean filter array instead of actually filtering the RecordBatch passed in as the batch parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both names are kind of confusing tbh, I'll rename it to evaluate as I think that should be clear

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR looks good to go to me so we can start hooking it up and getting everything ready.

I definitely have some questions about the and test -- but since this code isn't used yet I don't think it would block merge.

@Ted-Jiang or @thinkharderdev do you have any other comments or suggestions? We can also address any additional changes in a follow on PR as this one is already fairly large

RowSelector::skip(4),
]);

let mut expected = RowSelection::from(vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected answer doesn't make sense to me. When I did it out there seems to be something wrong

N = Skip
Y = Select

a: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
b: YYYYYNNNNYYYYYYYYYYYYYYYNNN

What is here:
e: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN

What I think the answer should be:
e: NNNNNNNNNNNNYYYYYYYYYYYYNNNYYYYYYYNNNYYYYY

Though to be honest I am not sure sure what an AND and nulls should be

I am probably missing something obvious here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks right to me. The and should just be giving you the result of applying the filters sequentially. Visually, it makes sense (to me) like

a: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
b:             YYYYYNNNNYYYYYYYYYYYYY   YYNNN

   NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps and is the wrong name for this function? I will incorporate @thinkharderdev 's example as a comment as I think it is helpful 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think and is a little confusing since we're dealing with a boolean array and it is not clear from the name that this is essentially a composition operator. Maybe and_then?

@thinkharderdev
Copy link
Contributor

I think this PR looks good to go to me so we can start hooking it up and getting everything ready.

I definitely have some questions about the and test -- but since this code isn't used yet I don't think it would block merge.

@Ted-Jiang or @thinkharderdev do you have any other comments or suggestions? We can also address any additional changes in a follow on PR as this one is already fairly large

I think this should is good to merge.

@tustvold tustvold merged commit 21ba02e into apache:master Aug 11, 2022
@ursabot
Copy link

ursabot commented Aug 11, 2022

Benchmark runs are scheduled for baseline = 4481993 and contender = 21ba02e. 21ba02e is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👌

}

/// Given a [`RowSelection`] computed under `self`, returns the [`RowSelection`]
/// representing their conjunction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the wording saying this is a conjunction is misleading, it is more like the conjunction of the subsequent filters only with the rows that were selected previously

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Changes to ParquetRecordBatchStream to support row filtering in DataFusion
6 participants