Skip to content

Commit

Permalink
feat: respect time range when building parquet reader (#3947)
Browse files Browse the repository at this point in the history
* feat: convert timestamp range filters to predicates

* chore: rebase main

* fix: remove prediactes once they have been added to timestamp filters to avoid duplicate filtering

* fix: some comments

* fix: resolve conflicts
  • Loading branch information
v0y4g3r committed May 21, 2024
1 parent 43bf7bf commit e070ba3
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 235 deletions.
24 changes: 23 additions & 1 deletion src/common/recordbatch/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Util record batch stream wrapper that can perform precise filter.

use datafusion::logical_expr::{Expr, Operator};
use datafusion::logical_expr::{Expr, Literal, Operator};
use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar};
use datafusion_common::arrow::buffer::BooleanBuffer;
use datafusion_common::arrow::compute::kernels::cmp;
Expand Down Expand Up @@ -43,6 +43,28 @@ pub struct SimpleFilterEvaluator {
}

impl SimpleFilterEvaluator {
pub fn new<T: Literal>(column_name: String, lit: T, op: Operator) -> Option<Self> {
match op {
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq => {}
_ => return None,
}

let Expr::Literal(val) = lit.lit() else {
return None;
};

Some(Self {
column_name,
literal: val.to_scalar().ok()?,
op,
})
}

pub fn try_new(predicate: &Expr) -> Option<Self> {
match predicate {
Expr::BinaryExpr(binary) => {
Expand Down
19 changes: 10 additions & 9 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};

async fn check_prune_row_groups(expr: Expr, expected: &str) {
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str) {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

Expand Down Expand Up @@ -55,7 +55,7 @@ async fn check_prune_row_groups(expr: Expr, expected: &str) {
.scan_to_stream(
region_id,
ScanRequest {
filters: vec![expr],
filters: exprs,
..Default::default()
},
)
Expand All @@ -70,7 +70,9 @@ async fn test_read_parquet_stats() {
common_telemetry::init_default_ut_logging();

check_prune_row_groups(
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))),
vec![
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None)))
],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
Expand All @@ -94,7 +96,7 @@ async fn test_read_parquet_stats() {
async fn test_prune_tag() {
// prune result: only row group 1&2
check_prune_row_groups(
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
vec![datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string()))))],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
Expand All @@ -114,18 +116,17 @@ async fn test_prune_tag_and_field() {
common_telemetry::init_default_ut_logging();
// prune result: only row group 1
check_prune_row_groups(
col("tag_0")
.gt(lit(ScalarValue::Utf8(Some("4".to_string()))))
.and(col("field_0").lt(lit(8.0))),
vec![
col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
col("field_0").lt(lit(8.0)),
],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
Expand Down
9 changes: 9 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_runtime::JoinError;
use common_time::Timestamp;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use object_store::ErrorKind;
Expand Down Expand Up @@ -693,6 +694,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to build time range filters for value: {:?}", timestamp))]
BuildTimeRangeFilter {
timestamp: Timestamp,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -802,6 +810,7 @@ impl ErrorExt for Error {
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::RegionNotReady,
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner};
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;

Expand Down Expand Up @@ -235,7 +235,7 @@ impl ScanRegion {
}

/// Creates a scan input.
fn scan_input(self, filter_deleted: bool) -> Result<ScanInput> {
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
let time_range = self.build_time_range_predicate();

let ssts = &self.version.ssts;
Expand Down Expand Up @@ -300,16 +300,19 @@ impl ScanRegion {
}

/// Build time range predicate from filters.
fn build_time_range_predicate(&self) -> TimestampRange {
fn build_time_range_predicate(&mut self) -> TimestampRange {
let time_index = self.version.metadata.time_index_column();
let unit = time_index
.column_schema
.data_type
.as_timestamp()
.expect("Time index must have timestamp-compatible type")
.unit();
TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters)
.build()
build_time_range_predicate(
&time_index.column_schema.name,
unit,
&mut self.request.filters,
)
}

/// Use the latest schema to build the index applier.
Expand Down
80 changes: 78 additions & 2 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use datafusion_expr::Expr;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::{Expr, Operator};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
Expand All @@ -38,6 +41,7 @@ use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::cache::CacheManagerRef;
use crate::error;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result,
};
Expand Down Expand Up @@ -225,7 +229,7 @@ impl ParquetReaderBuilder {

metrics.build_cost = start.elapsed();

let filters = if let Some(predicate) = &self.predicate {
let mut filters = if let Some(predicate) = &self.predicate {
predicate
.exprs()
.iter()
Expand All @@ -240,6 +244,11 @@ impl ParquetReaderBuilder {
} else {
vec![]
};

if let Some(time_range) = &self.time_range {
filters.extend(time_range_to_predicate(*time_range, &region_meta)?);
}

let codec = McmpRowCodec::new(
read_format
.metadata()
Expand Down Expand Up @@ -449,6 +458,59 @@ impl ParquetReaderBuilder {
}
}

/// Transforms time range into [SimpleFilterEvaluator].
fn time_range_to_predicate(
time_range: TimestampRange,
metadata: &RegionMetadataRef,
) -> Result<Vec<SimpleFilterContext>> {
let ts_col = metadata.time_index_column();
let ts_col_id = ts_col.column_id;

let ts_to_filter = |op: Operator, timestamp: &Timestamp| {
let value = match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
TimeUnit::Millisecond => {
ScalarValue::TimestampMillisecond(Some(timestamp.value()), None)
}
TimeUnit::Microsecond => {
ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None)
}
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
};
let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op)
.context(error::BuildTimeRangeFilterSnafu {
timestamp: *timestamp,
})?;
Ok(SimpleFilterContext::new(
evaluator,
ts_col_id,
SemanticType::Timestamp,
ts_col.column_schema.data_type.clone(),
))
};

let predicates = match (time_range.start(), time_range.end()) {
(Some(start), Some(end)) => {
vec![
ts_to_filter(Operator::GtEq, start)?,
ts_to_filter(Operator::Lt, end)?,
]
}

(Some(start), None) => {
vec![ts_to_filter(Operator::GtEq, start)?]
}

(None, Some(end)) => {
vec![ts_to_filter(Operator::Lt, end)?]
}
(None, None) => {
vec![]
}
};
Ok(predicates)
}

/// Parquet reader metrics.
#[derive(Debug, Default)]
struct Metrics {
Expand Down Expand Up @@ -570,6 +632,20 @@ pub(crate) struct SimpleFilterContext {
}

impl SimpleFilterContext {
fn new(
filter: SimpleFilterEvaluator,
column_id: ColumnId,
semantic_type: SemanticType,
data_type: ConcreteDataType,
) -> Self {
Self {
filter,
column_id,
semantic_type,
data_type,
}
}

/// Creates a context for the `expr`.
///
/// Returns None if the column to filter doesn't exist in the SST metadata or the
Expand Down
10 changes: 5 additions & 5 deletions src/query/src/tests/time_range_filter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use store_api::data_source::{DataSource, DataSourceRef};
use store_api::storage::ScanRequest;
use table::metadata::FilterPushDownType;
use table::predicate::TimeRangePredicateBuilder;
use table::predicate::build_time_range_predicate;
use table::test_util::MemTable;
use table::{Table, TableRef};

Expand Down Expand Up @@ -114,14 +114,14 @@ struct TimeRangeTester {
impl TimeRangeTester {
async fn check(&self, sql: &str, expect: TimestampRange) {
let _ = exec_selection(self.engine.clone(), sql).await;
let filters = self.get_filters();
let mut filters = self.take_filters();

let range = TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &filters).build();
let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters);
assert_eq!(expect, range);
}

fn get_filters(&self) -> Vec<Expr> {
self.filter.write().unwrap().drain(..).collect()
fn take_filters(&self) -> Vec<Expr> {
std::mem::take(&mut self.filter.write().unwrap())
}
}

Expand Down
Loading

0 comments on commit e070ba3

Please sign in to comment.