Skip to content

Commit

Permalink
Add support for filter pushdown rule (#924)
Browse files Browse the repository at this point in the history
* Enable inexact filters for predicate pushdown, add helper to get fitlers from TableScan struct

* Update table scan logic to add filters

* Update PyTableScan to include input schema

* Update DaskTableSource to allow filtering on all expr's

* Change order to apply filters before projections

* Clean up filter conjuction application

* use filter_pushdown_rule from datafusion

* Update predicate pushdown tests

* Update predicate pushdown tests

* unxfail q21

* Update DaskTableSource filterPushDown comments

* Reenable clippy check for supports_filter_pushdown

* Simplify apply_filter conditional check

* Un-xfail q40

* Rerun tests

Co-authored-by: Charles Blackmon-Luca <[email protected]>
  • Loading branch information
ayushdg and charlesbluca authored Dec 1, 2022
1 parent 6ae69a8 commit cf719c5
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 676 deletions.
35 changes: 30 additions & 5 deletions dask_planner/src/sql/logical/table_scan.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use datafusion_expr::logical_plan::TableScan;
use std::sync::Arc;

use datafusion_common::DFSchema;
use datafusion_expr::{logical_plan::TableScan, LogicalPlan};
use pyo3::prelude::*;

use crate::sql::{exceptions::py_type_err, logical};
use crate::{
expression::{py_expr_list, PyExpr},
sql::exceptions::py_type_err,
};

#[pyclass(name = "TableScan", module = "dask_planner", subclass)]
#[derive(Clone)]
pub struct PyTableScan {
pub(crate) table_scan: TableScan,
input: Arc<LogicalPlan>,
}

#[pymethods]
Expand All @@ -31,14 +38,32 @@ impl PyTableScan {
fn contains_projections(&self) -> bool {
self.table_scan.projection.is_some()
}

#[pyo3(name = "getFilters")]
fn scan_filters(&self) -> PyResult<Vec<PyExpr>> {
py_expr_list(&self.input, &self.table_scan.filters)
}
}

impl TryFrom<logical::LogicalPlan> for PyTableScan {
impl TryFrom<LogicalPlan> for PyTableScan {
type Error = PyErr;

fn try_from(logical_plan: logical::LogicalPlan) -> Result<Self, Self::Error> {
fn try_from(logical_plan: LogicalPlan) -> Result<Self, Self::Error> {
match logical_plan {
logical::LogicalPlan::TableScan(table_scan) => Ok(PyTableScan { table_scan }),
LogicalPlan::TableScan(table_scan) => {
// Create an input logical plan that's identical to the table scan with schema from the table source
let mut input = table_scan.clone();
input.projected_schema = DFSchema::try_from_qualified_schema(
&table_scan.table_name,
&table_scan.source.schema(),
)
.map_or(input.projected_schema, Arc::new);

Ok(PyTableScan {
table_scan,
input: Arc::new(LogicalPlan::TableScan(input)),
})
}
_ => Err(py_type_err("unexpected plan")),
}
}
Expand Down
4 changes: 1 addition & 3 deletions dask_planner/src/sql/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion_optimizer::{
// eliminate_filter::EliminateFilter,
eliminate_limit::EliminateLimit,
filter_null_join_keys::FilterNullJoinKeys,
filter_push_down::FilterPushDown,
inline_table_scan::InlineTableScan,
limit_push_down::LimitPushDown,
optimizer::{Optimizer, OptimizerRule},
Expand All @@ -29,9 +30,6 @@ use log::trace;
mod eliminate_agg_distinct;
use eliminate_agg_distinct::EliminateAggDistinct;

mod filter_push_down;
use filter_push_down::FilterPushDown;

/// Houses the optimization logic for Dask-SQL. This optimization controls the optimizations
/// and their ordering in regards to their impact on the underlying `LogicalPlan` instance
pub struct DaskSqlOptimizer {
Expand Down
Loading

0 comments on commit cf719c5

Please sign in to comment.