Skip to content

Commit

Permalink
feat: Update Parquet row filtering to handle type coercion (#10716)
Browse files Browse the repository at this point in the history
* test: Add a failing test to show the lack of type coercion in row filters

* feat: update parquet row filter to handle type coercion

* chore: lint/fmt

* chore: test improvements and cleanup
  • Loading branch information
jeffreyssmith2nd authored Jun 5, 2024
1 parent df5dab7 commit 70256ba
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl FileOpener for ParquetOpener {
builder.metadata(),
reorder_predicates,
&file_metrics,
Arc::clone(&schema_mapping),
);

match row_filter {
Expand Down
120 changes: 110 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
use std::collections::BTreeSet;
use std::sync::Arc;

use super::ParquetFileMetrics;
use crate::physical_plan::metrics;

use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;

use crate::datasource::schema_adapter::SchemaMapper;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
Expand All @@ -34,9 +36,9 @@ use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::reassign_predicate_columns;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};

use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use crate::physical_plan::metrics;

use super::ParquetFileMetrics;

/// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which
/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`.
Expand Down Expand Up @@ -78,6 +80,8 @@ pub(crate) struct DatafusionArrowPredicate {
rows_filtered: metrics::Count,
/// how long was spent evaluating this predicate
time: metrics::Time,
/// used to perform type coercion while filtering rows
schema_mapping: Arc<dyn SchemaMapper>,
}

impl DatafusionArrowPredicate {
Expand All @@ -87,6 +91,7 @@ impl DatafusionArrowPredicate {
metadata: &ParquetMetaData,
rows_filtered: metrics::Count,
time: metrics::Time,
schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Self> {
let schema = Arc::new(schema.project(&candidate.projection)?);
let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?;
Expand All @@ -108,6 +113,7 @@ impl DatafusionArrowPredicate {
),
rows_filtered,
time,
schema_mapping,
})
}
}
Expand All @@ -123,6 +129,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
false => batch.project(&self.projection)?,
};

let batch = self.schema_mapping.map_partial_batch(batch)?;

// scoped timer updates on drop
let mut timer = self.time.timer();
match self
Expand Down Expand Up @@ -323,6 +331,7 @@ pub fn build_row_filter(
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Option<RowFilter>> {
let rows_filtered = &file_metrics.pushdown_rows_filtered;
let time = &file_metrics.pushdown_eval_time;
Expand Down Expand Up @@ -360,6 +369,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -372,6 +382,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -387,6 +398,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -398,15 +410,23 @@ pub fn build_row_filter(

#[cfg(test)]
mod test {
use super::*;
use arrow::datatypes::Field;
use arrow_schema::TimeUnit::Nanosecond;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use rand::prelude::*;

use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
use crate::datasource::schema_adapter::SchemaAdapterFactory;

use datafusion_common::ToDFSchema;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{cast, col, lit, Expr};
use datafusion_physical_expr::create_physical_expr;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use rand::prelude::*;
use datafusion_physical_plan::metrics::{Count, Time};

use super::*;

// We should ignore predicate that read non-primitive columns
#[test]
Expand Down Expand Up @@ -473,6 +493,86 @@ mod test {
);
}

#[test]
fn test_filter_type_coercion() {
let testdata = crate::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");

let parquet_reader_builder =
ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader");
let metadata = parquet_reader_builder.metadata().clone();
let file_schema = parquet_reader_builder.schema().clone();

// This is the schema we would like to coerce to,
// which is different from the physical schema of the file.
let table_schema = Schema::new(vec![Field::new(
"timestamp_col",
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
false,
)]);

let schema_adapter =
DefaultSchemaAdapterFactory {}.create(Arc::new(table_schema.clone()));
let (schema_mapping, _) = schema_adapter
.map_schema(&file_schema)
.expect("creating schema mapping");

let mut parquet_reader = parquet_reader_builder.build().expect("building reader");

// Parquet file is small, we only need 1 recordbatch
let first_rb = parquet_reader
.next()
.expect("expected record batch")
.expect("expected error free record batch");

// Test all should fail
let expr = col("timestamp_col").lt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
));
let expr = logical2physical(&expr, &table_schema);
let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema)
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");

let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&file_schema,
&metadata,
Count::new(),
Time::new(),
Arc::clone(&schema_mapping),
)
.expect("creating filter predicate");

let filtered = row_filter.evaluate(first_rb.clone());
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8])));

// Test all should pass
let expr = col("timestamp_col").gt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
));
let expr = logical2physical(&expr, &table_schema);
let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema)
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");

let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&file_schema,
&metadata,
Count::new(),
Time::new(),
schema_mapping,
)
.expect("creating filter predicate");

let filtered = row_filter.evaluate(first_rb);
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8])));
}

#[test]
fn test_remap_projection() {
let mut rng = thread_rng();
Expand Down
46 changes: 45 additions & 1 deletion datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,21 @@ pub trait SchemaAdapter: Send + Sync {

/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
pub trait SchemaMapper: Send + Sync {
pub trait SchemaMapper: Debug + Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;

/// Adapts a [`RecordBatch`] that does not have all the columns from the
/// file schema.
///
/// This method is used when applying a filter to a subset of the columns during
/// an `ArrowPredicate`.
///
/// This method is slower than `map_batch` as it looks up columns by name.
fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch>;
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -185,6 +197,31 @@ impl SchemaMapper for SchemaMapping {
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}

fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
let batch_cols = batch.columns().to_vec();
let schema = batch.schema();

let mut cols = vec![];
let mut fields = vec![];
for (i, f) in schema.fields().iter().enumerate() {
let table_field = self.table_schema.field_with_name(f.name());
if let Ok(tf) = table_field {
cols.push(cast(&batch_cols[i], tf.data_type())?);
fields.push(tf.clone());
}
}

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = Arc::new(Schema::new(fields));
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -337,5 +374,12 @@ mod tests {

Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}

fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
self.map_batch(batch)
}
}
}

0 comments on commit 70256ba

Please sign in to comment.