Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (datafusion integration): convert datafusion expr filters to Iceberg Predicate #588

Merged
merged 14 commits into from
Sep 23, 2024
7 changes: 5 additions & 2 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use arrow_array::types::{
validate_decimal_precision_and_scale, Decimal128Type, TimestampMicrosecondType,
};
use arrow_array::{
BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, Int64Array,
PrimitiveArray, Scalar, StringArray, TimestampMicrosecondArray,
BooleanArray, Date32Array, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array,
Int64Array, PrimitiveArray, Scalar, StringArray, TimestampMicrosecondArray,
};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use bitvec::macros::internal::funty::Fundamental;
Expand Down Expand Up @@ -634,6 +634,9 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send
(PrimitiveType::String, PrimitiveLiteral::String(value)) => {
Ok(Box::new(StringArray::new_scalar(value.as_str())))
}
(PrimitiveType::Date, PrimitiveLiteral::Int(value)) => {
Ok(Box::new(Date32Array::new_scalar(*value)))
}
(PrimitiveType::Timestamp, PrimitiveLiteral::Long(value)) => {
Ok(Box::new(TimestampMicrosecondArray::new_scalar(*value)))
}
Expand Down
1 change: 1 addition & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
// specific language governing permissions and limitations
// under the License.

pub(crate) mod predicate_converter;
pub(crate) mod scan;
a-agmon marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{BinaryExpr, Cast, Expr, Operator};
use datafusion::scalar::ScalarValue;
use iceberg::expr::{Predicate, Reference};
use iceberg::spec::Datum;
#[derive(Default)]
pub struct PredicateConverter;
a-agmon marked this conversation as resolved.
Show resolved Hide resolved

impl PredicateConverter {
/// Convert a list of DataFusion expressions to an iceberg predicate.
pub fn visit_many(&self, exprs: &[Expr]) -> Option<Predicate> {
exprs
.iter()
.filter_map(|expr| self.visit(expr))
.reduce(Predicate::and)
}

/// Convert a single DataFusion expression to an iceberg predicate.
/// currently only supports binary (simple) expressions
pub fn visit(&self, expr: &Expr) -> Option<Predicate> {
match expr {
Expr::BinaryExpr(binary) => self.visit_binary_expr(binary),
_ => None,
}
}

/// Convert a binary expression to an iceberg predicate.
///
/// currently supports:
/// - column, basic op, and literal, e.g. `a = 1`
/// - column and casted literal, e.g. `a = cast(1 as bigint)`
/// - binary conditional (and, or), e.g. `a = 1 and b = 2`
fn visit_binary_expr(&self, binary: &BinaryExpr) -> Option<Predicate> {
match (&*binary.left, &binary.op, &*binary.right) {
// column, op, literal
(Expr::Column(col), op, Expr::Literal(lit)) => self.visit_column_literal(col, op, lit),
// column, op, casted literal
(Expr::Column(col), op, Expr::Cast(Cast { expr, data_type })) => {
self.visit_column_cast(col, op, expr, data_type)
}
// binary conditional (and, or)
(left, op, right) if matches!(op, Operator::And | Operator::Or) => {
self.visit_binary_conditional(left, op, right)
}
_ => None,
}
}

/// Convert a column and casted literal to an iceberg predicate.
/// The purpose of this function is to handle the common case in which there is a filter based on a casted literal.
/// These kinds of expressions are often not pushed down by query engines though its an important case to handle
/// for iceberg scan pushdown.
fn visit_column_cast(
&self,
col: &datafusion::common::Column,
op: &Operator,
expr: &Expr,
data_type: &DataType,
) -> Option<Predicate> {
if let (Expr::Literal(ScalarValue::Utf8(lit)), DataType::Date32) = (expr, data_type) {
let reference = Reference::new(col.name.clone());
let datum = lit
.clone()
.and_then(|date_str| Datum::date_from_str(date_str).ok())?;
return Some(binary_op_to_predicate(reference, op, datum));
}
None
}

/// Convert a binary conditional expression, i.e., (and, or), to an iceberg predicate.
///
/// When processing an AND expression:
/// - if both expressions are valid predicates then an AND predicate is returned
/// - if either expression is None then the valid one is returned
///
/// When processing an OR expression:
/// - only if both expressions are valid predicates then an OR predicate is returned
fn visit_binary_conditional(
&self,
left: &Expr,
op: &Operator,
right: &Expr,
) -> Option<Predicate> {
let preds: Vec<Predicate> = vec![self.visit(left), self.visit(right)]
.into_iter()
.flatten()
.collect();
let num_valid_preds = preds.len();
match (op, num_valid_preds) {
(Operator::And, 1) => preds.first().cloned(),
(Operator::And, 2) => Some(Predicate::and(preds[0].clone(), preds[1].clone())),
(Operator::Or, 2) => Some(Predicate::or(preds[0].clone(), preds[1].clone())),
_ => None,
}
}

/// Convert a simple expression based on column and literal (x > 1) to an iceberg predicate.
fn visit_column_literal(
&self,
col: &datafusion::common::Column,
op: &Operator,
lit: &ScalarValue,
) -> Option<Predicate> {
let reference = Reference::new(col.name.clone());
let datum = scalar_value_to_datum(lit)?;
Some(binary_op_to_predicate(reference, op, datum))
}
}

const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
/// Convert a scalar value to an iceberg datum.
fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
match value {
ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)),
ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)),
ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)),
ScalarValue::Int64(Some(v)) => Some(Datum::long(*v)),
ScalarValue::Float32(Some(v)) => Some(Datum::double(*v as f64)),
ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
_ => None,
}
}

/// convert the data fusion Exp to an iceberg [`Predicate`]
fn binary_op_to_predicate(reference: Reference, op: &Operator, datum: Datum) -> Predicate {
match op {
Operator::Eq => reference.equal_to(datum),
Operator::NotEq => reference.not_equal_to(datum),
Operator::Lt => reference.less_than(datum),
Operator::LtEq => reference.less_than_or_equal_to(datum),
Operator::Gt => reference.greater_than(datum),
Operator::GtEq => reference.greater_than_or_equal_to(datum),
_ => Predicate::AlwaysTrue,
}
}
140 changes: 134 additions & 6 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
};
use datafusion::prelude::Expr;
use futures::{Stream, TryStreamExt};
use iceberg::expr::Predicate;
use iceberg::table::Table;

use super::predicate_converter::PredicateConverter;
use crate::to_datafusion_error;

/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
Expand All @@ -44,17 +47,19 @@ pub(crate) struct IcebergTableScan {
/// Stores certain, often expensive to compute,
/// plan properties used in query optimization.
plan_properties: PlanProperties,
predicates: Option<Predicate>,
}

impl IcebergTableScan {
/// Creates a new [`IcebergTableScan`] object.
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef, filters: &[Expr]) -> Self {
let plan_properties = Self::compute_properties(schema.clone());

let predicates = convert_filters_to_predicate(filters);
Self {
table,
schema,
plan_properties,
predicates,
}
}

Expand Down Expand Up @@ -100,7 +105,7 @@ impl ExecutionPlan for IcebergTableScan {
_partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let fut = get_batch_stream(self.table.clone());
let fut = get_batch_stream(self.table.clone(), self.predicates.clone());
let stream = futures::stream::once(fut).try_flatten();

Ok(Box::pin(RecordBatchStreamAdapter::new(
Expand All @@ -127,14 +132,137 @@ impl DisplayAs for IcebergTableScan {
/// and then converts it into a stream of Arrow [`RecordBatch`]es.
async fn get_batch_stream(
table: Table,
predicates: Option<Predicate>,
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
let table_scan = table.scan().build().map_err(to_datafusion_error)?;

let mut scan_builder = table.scan();
if let Some(pred) = predicates {
scan_builder = scan_builder.with_filter(pred);
}
let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
let stream = table_scan
.to_arrow()
.await
.map_err(to_datafusion_error)?
.map_err(to_datafusion_error);

Ok(Box::pin(stream))
}

/// Converts DataFusion filters ([`Expr`]) to an iceberg [`Predicate`].
/// If none of the filters could be converted, return `None` which adds no predicates to the scan operation.
/// If the conversion was successful, return the converted predicates combined with an AND operator.
fn convert_filters_to_predicate(filters: &[Expr]) -> Option<Predicate> {
PredicateConverter.visit_many(filters)
}

#[cfg(test)]
mod tests {
a-agmon marked this conversation as resolved.
Show resolved Hide resolved
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::DFSchema;
use datafusion::prelude::SessionContext;
use iceberg::expr::Reference;
use iceberg::spec::Datum;

use super::*;

fn create_test_schema() -> DFSchema {
let arrow_schema = Schema::new(vec![
Field::new("foo", DataType::Int32, false),
Field::new("bar", DataType::Utf8, false),
]);
DFSchema::try_from_qualified_schema("my_table", &arrow_schema).unwrap()
}

#[test]
fn test_predicate_conversion_with_single_condition() {
let sql = "foo > 1";
let df_schema = create_test_schema();
let expr = SessionContext::new()
.parse_sql_expr(sql, &df_schema)
.unwrap();
let predicate = convert_filters_to_predicate(&[expr]).unwrap();
assert_eq!(
predicate,
Reference::new("foo").greater_than(Datum::long(1))
);
}
#[test]
fn test_predicate_conversion_with_single_unsupported_condition() {
let sql = "foo is null";
let df_schema = create_test_schema();
let expr = SessionContext::new()
.parse_sql_expr(sql, &df_schema)
.unwrap();
let predicate = convert_filters_to_predicate(&[expr]);
assert_eq!(predicate, None);
}

#[test]
fn test_predicate_conversion_with_and_condition() {
let sql = "foo > 1 and bar = 'test'";
let df_schema = create_test_schema();
let expr = SessionContext::new()
.parse_sql_expr(sql, &df_schema)
.unwrap();
let predicate = convert_filters_to_predicate(&[expr]).unwrap();
let expected_predicate = Predicate::and(
Reference::new("foo").greater_than(Datum::long(1)),
Reference::new("bar").equal_to(Datum::string("test")),
);
assert_eq!(predicate, expected_predicate);
}

#[test]
fn test_predicate_conversion_with_and_condition_unsupported() {
let sql = "foo > 1 and bar is not null";
let df_schema = create_test_schema();
let expr = SessionContext::new()
.parse_sql_expr(sql, &df_schema)
.unwrap();
let predicate = convert_filters_to_predicate(&[expr]).unwrap();
let expected_predicate = Reference::new("foo").greater_than(Datum::long(1));
assert_eq!(predicate, expected_predicate);
}

#[test]
fn test_predicate_conversion_with_or_condition_unsupported() {
let sql = "foo > 1 or bar is not null";
let df_schema = create_test_schema();
let expr = SessionContext::new()
.parse_sql_expr(sql, &df_schema)
.unwrap();
let predicate = convert_filters_to_predicate(&[expr]);
let expected_predicate = None;
assert_eq!(predicate, expected_predicate);
}

#[test]
fn test_predicate_conversion_with_complex_binary_expr() {
let sql = "(foo > 1 and bar = 'test') or foo < 0 ";
let df_schema = create_test_schema();
let expr = SessionContext::new()
.parse_sql_expr(sql, &df_schema)
.unwrap();
let predicate = convert_filters_to_predicate(&[expr]).unwrap();
let inner_predicate = Predicate::and(
Reference::new("foo").greater_than(Datum::long(1)),
Reference::new("bar").equal_to(Datum::string("test")),
);
let expected_predicate = Predicate::or(
inner_predicate,
Reference::new("foo").less_than(Datum::long(0)),
);
assert_eq!(predicate, expected_predicate);
}

#[test]
fn test_predicate_conversion_with_complex_binary_expr_unsupported() {
let sql = "(foo > 1 or bar in ('test', 'test2')) and foo < 0 ";
let df_schema = create_test_schema();
let expr = SessionContext::new()
.parse_sql_expr(sql, &df_schema)
.unwrap();
let predicate = convert_filters_to_predicate(&[expr]).unwrap();
let expected_predicate = Reference::new("foo").less_than(Datum::long(0));
assert_eq!(predicate, expected_predicate);
}
}
Loading
Loading