Skip to content

Commit

Permalink
feat: push all possible filters down to parquet exec (#1839)
Browse files Browse the repository at this point in the history
* feat: push all possible filters down to parquet exec

* fix: project

* test: add ut for DatafusionArrowPredicate

* fix: according to CR comments
  • Loading branch information
v0y4g3r committed Jun 28, 2023
1 parent bc33fdc commit 559d1f7
Show file tree
Hide file tree
Showing 12 changed files with 658 additions and 342 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ fn new_item_field(data_type: ArrowDataType) -> Field {
Field::new("item", data_type, false)
}

fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue {
pub fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue {
match unit {
TimeUnit::Second => ScalarValue::TimestampSecond(val, None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(val, None),
Expand Down
2 changes: 1 addition & 1 deletion src/query/src/tests/time_range_filter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl TimeRangeTester {
let _ = exec_selection(self.engine.clone(), sql).await;
let filters = self.table.get_filters().await;

let range = TimeRangePredicateBuilder::new("ts", &filters).build();
let range = TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &filters).build();
assert_eq!(expect, range);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datafusion.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
Expand Down
15 changes: 13 additions & 2 deletions src/storage/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,16 @@ impl ChunkReaderBuilder {
reader_builder = reader_builder.push_batch_iter(iter);
}

let predicate = Predicate::try_new(
self.filters.clone(),
self.schema.store_schema().schema().clone(),
)
.context(error::BuildPredicateSnafu)?;

let read_opts = ReadOptions {
batch_size: self.iter_ctx.batch_size,
projected_schema: schema.clone(),
predicate: Predicate::new(self.filters.clone()),
predicate,
time_range: *time_range,
};
for file in &self.files_to_read {
Expand Down Expand Up @@ -270,7 +276,12 @@ impl ChunkReaderBuilder {
/// Build time range predicate from schema and filters.
pub fn build_time_range_predicate(&self) -> TimestampRange {
let Some(ts_col) = self.schema.user_schema().timestamp_column() else { return TimestampRange::min_to_max() };
TimeRangePredicateBuilder::new(&ts_col.name, &self.filters).build()
let unit = ts_col
.data_type
.as_timestamp()
.expect("Timestamp column must have timestamp-compatible type")
.unit();
TimeRangePredicateBuilder::new(&ts_col.name, unit, &self.filters).build()
}

/// Check if SST file's time range matches predicate.
Expand Down
147 changes: 105 additions & 42 deletions src/storage/src/compaction/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

use common_query::logical_plan::{DfExpr, Expr};
use datafusion_common::ScalarValue;
use datafusion_expr::{BinaryExpr, Operator};
use common_time::timestamp::TimeUnit;
use datafusion_expr::Operator;
use datatypes::value::timestamp_to_scalar_value;

use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl};
use crate::error;
Expand All @@ -31,53 +32,84 @@ pub(crate) async fn build_sst_reader(
) -> error::Result<ChunkReaderImpl> {
// TODO(hl): Schemas in different SSTs may differ, thus we should infer
// timestamp column name from Parquet metadata.
let ts_col_name = schema
.user_schema()
.timestamp_column()
.unwrap()
.name
.clone();

// safety: Region schema's timestamp column must present
let ts_col = schema.user_schema().timestamp_column().unwrap();
let ts_col_unit = ts_col.data_type.as_timestamp().unwrap().unit();
let ts_col_name = ts_col.name.clone();

ChunkReaderBuilder::new(schema, sst_layer)
.pick_ssts(files)
.filters(vec![build_time_range_filter(
lower_sec_inclusive,
upper_sec_exclusive,
&ts_col_name,
)])
.filters(
build_time_range_filter(
lower_sec_inclusive,
upper_sec_exclusive,
&ts_col_name,
ts_col_unit,
)
.into_iter()
.collect(),
)
.build()
.await
}

fn build_time_range_filter(low_sec: i64, high_sec: i64, ts_col_name: &str) -> Expr {
let ts_col = Box::new(DfExpr::Column(datafusion_common::Column::from_name(
ts_col_name,
)));
let lower_bound_expr = Box::new(DfExpr::Literal(ScalarValue::TimestampSecond(
Some(low_sec),
None,
)));

let upper_bound_expr = Box::new(DfExpr::Literal(ScalarValue::TimestampSecond(
Some(high_sec),
None,
)));

let expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: ts_col.clone(),
op: Operator::GtEq,
right: lower_bound_expr,
})),
op: Operator::And,
right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
left: ts_col,
op: Operator::Lt,
right: upper_bound_expr,
})),
});

Expr::from(expr)
/// Build time range filter expr from lower (inclusive) and upper bound(exclusive).
/// Returns `None` if time range overflows.
fn build_time_range_filter(
low_sec: i64,
high_sec: i64,
ts_col_name: &str,
ts_col_unit: TimeUnit,
) -> Option<Expr> {
debug_assert!(low_sec <= high_sec);
let ts_col = DfExpr::Column(datafusion_common::Column::from_name(ts_col_name));

// Converting seconds to whatever unit won't lose precision.
// Here only handles overflow.
let low_ts = common_time::Timestamp::new_second(low_sec)
.convert_to(ts_col_unit)
.map(|ts| ts.value());
let high_ts = common_time::Timestamp::new_second(high_sec)
.convert_to(ts_col_unit)
.map(|ts| ts.value());

let expr = match (low_ts, high_ts) {
(Some(low), Some(high)) => {
let lower_bound_expr =
DfExpr::Literal(timestamp_to_scalar_value(ts_col_unit, Some(low)));
let upper_bound_expr =
DfExpr::Literal(timestamp_to_scalar_value(ts_col_unit, Some(high)));
Some(datafusion_expr::and(
datafusion_expr::binary_expr(ts_col.clone(), Operator::GtEq, lower_bound_expr),
datafusion_expr::binary_expr(ts_col, Operator::Lt, upper_bound_expr),
))
}

(Some(low), None) => {
let lower_bound_expr =
datafusion_expr::lit(timestamp_to_scalar_value(ts_col_unit, Some(low)));
Some(datafusion_expr::binary_expr(
ts_col,
Operator::GtEq,
lower_bound_expr,
))
}

(None, Some(high)) => {
let upper_bound_expr =
datafusion_expr::lit(timestamp_to_scalar_value(ts_col_unit, Some(high)));
Some(datafusion_expr::binary_expr(
ts_col,
Operator::Lt,
upper_bound_expr,
))
}

(None, None) => None,
};

expr.map(Expr::from)
}

#[cfg(test)]
Expand Down Expand Up @@ -490,4 +522,35 @@ mod tests {

assert_eq!(timestamps_in_outputs, timestamps_in_inputs);
}

#[test]
fn test_build_time_range_filter() {
assert!(build_time_range_filter(i64::MIN, i64::MAX, "ts", TimeUnit::Nanosecond).is_none());

assert_eq!(
Expr::from(datafusion_expr::binary_expr(
datafusion_expr::col("ts"),
Operator::Lt,
datafusion_expr::lit(timestamp_to_scalar_value(
TimeUnit::Nanosecond,
Some(TimeUnit::Second.factor() as i64 / TimeUnit::Nanosecond.factor() as i64)
))
)),
build_time_range_filter(i64::MIN, 1, "ts", TimeUnit::Nanosecond).unwrap()
);

assert_eq!(
Expr::from(datafusion_expr::binary_expr(
datafusion_expr::col("ts"),
Operator::GtEq,
datafusion_expr::lit(timestamp_to_scalar_value(
TimeUnit::Nanosecond,
Some(
2 * TimeUnit::Second.factor() as i64 / TimeUnit::Nanosecond.factor() as i64
)
))
)),
build_time_range_filter(2, i64::MAX, "ts", TimeUnit::Nanosecond).unwrap()
);
}
}
7 changes: 7 additions & 0 deletions src/storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ pub enum Error {
source: ArrowError,
location: Location,
},

#[snafu(display("Failed to build scan predicate, source: {}", source))]
BuildPredicate {
source: table::error::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -621,6 +627,7 @@ impl ErrorExt for Error {

TtlCalculation { source, .. } => source.status_code(),
ConvertColumnsToRows { .. } | SortArrays { .. } => StatusCode::Unexpected,
BuildPredicate { source, .. } => source.status_code(),
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/region/tests/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use arrow::compute::SortOptions;
use common_query::prelude::Expr;
use common_recordbatch::OrderOption;
use common_test_util::temp_dir::create_temp_dir;
use common_time::timestamp::TimeUnit;
use datafusion_common::Column;
use datatypes::value::timestamp_to_scalar_value;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, ScanRequest};

Expand Down Expand Up @@ -404,7 +406,10 @@ async fn test_flush_and_query_empty() {
filters: vec![Expr::from(datafusion_expr::binary_expr(
DfExpr::Column(Column::from("timestamp")),
datafusion_expr::Operator::GtEq,
datafusion_expr::lit(20000),
datafusion_expr::lit(timestamp_to_scalar_value(
TimeUnit::Millisecond,
Some(20000),
)),
))],
output_ordering: Some(vec![OrderOption {
name: "timestamp".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub(crate) mod parquet;
mod pruning;
mod stream_writer;

use std::collections::HashMap;
Expand Down
Loading

0 comments on commit 559d1f7

Please sign in to comment.