Skip to content

Commit

Permalink
Support min max aggregates in window functions with sliding windows (#…
Browse files Browse the repository at this point in the history
…4675)

* MIN, MAX Aggregate Functions with custom window frames

The functions are run with only float64 columns now as in the test case. Support for all types will be implemented.

* All ScalarValue types are supported now.

* moving_min_max crate dependency is eliminated

* use optimized accumulators when retract is not used

* Where the algorithm was taken from added as a comment

* Change design to use sliding implementation when absolutely necessary

* add fuzzy tests for moving min max, remove unsafe block

* remove duplicated code

* doc comments for example codes

* cargo test error correction

* revert unnecessary changes

Co-authored-by: Mustafa Akur <[email protected]>
  • Loading branch information
Berkay Şahin and mustafasrepo authored Dec 22, 2022
1 parent 77991a3 commit afb1ae2
Show file tree
Hide file tree
Showing 13 changed files with 756 additions and 71 deletions.
29 changes: 22 additions & 7 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use datafusion_expr::{
window_function::{signature_for_built_in, BuiltInWindowFunction, WindowFunction},
WindowFrame,
};
use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
use datafusion_physical_expr::window::{
BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr,
};
use std::convert::TryInto;
use std::sync::Arc;

Expand All @@ -55,12 +57,25 @@ pub fn create_window_expr(
input_schema: &Schema,
) -> Result<Arc<dyn WindowExpr>> {
Ok(match fun {
WindowFunction::AggregateFunction(fun) => Arc::new(AggregateWindowExpr::new(
aggregates::create_aggregate_expr(fun, false, args, input_schema, name)?,
partition_by,
order_by,
window_frame,
)),
WindowFunction::AggregateFunction(fun) => {
let aggregate =
aggregates::create_aggregate_expr(fun, false, args, input_schema, name)?;
if !window_frame.start_bound.is_unbounded() {
Arc::new(SlidingAggregateWindowExpr::new(
aggregate,
partition_by,
order_by,
window_frame,
))
} else {
Arc::new(AggregateWindowExpr::new(
aggregate,
partition_by,
order_by,
window_frame,
))
}
}
WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr::new(
create_built_in_window_expr(fun, args, input_schema, name)?,
partition_by,
Expand Down
54 changes: 54 additions & 0 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,60 @@ async fn aggregate_grouped_min() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn aggregate_min_max_w_custom_window_frames() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
let sql =
"SELECT
MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN 0.3 PRECEDING AND 0.2 FOLLOWING) as min1,
MAX(c12) OVER (ORDER BY C11 RANGE BETWEEN 0.1 PRECEDING AND 0.2 FOLLOWING) as max1
FROM aggregate_test_100
ORDER BY C9
LIMIT 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+--------------------+",
"| min1 | max1 |",
"+---------------------+--------------------+",
"| 0.01479305307777301 | 0.9965400387585364 |",
"| 0.01479305307777301 | 0.9800193410444061 |",
"| 0.01479305307777301 | 0.9706712283358269 |",
"| 0.2667177795079635 | 0.9965400387585364 |",
"| 0.3600766362333053 | 0.9706712283358269 |",
"+---------------------+--------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn aggregate_min_max_w_custom_window_frames_unbounded_start() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
let sql =
"SELECT
MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN UNBOUNDED PRECEDING AND 0.2 FOLLOWING) as min1,
MAX(c12) OVER (ORDER BY C11 RANGE BETWEEN UNBOUNDED PRECEDING AND 0.2 FOLLOWING) as max1
FROM aggregate_test_100
ORDER BY C9
LIMIT 5";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+--------------------+",
"| min1 | max1 |",
"+---------------------+--------------------+",
"| 0.01479305307777301 | 0.9965400387585364 |",
"| 0.01479305307777301 | 0.9800193410444061 |",
"| 0.01479305307777301 | 0.9800193410444061 |",
"| 0.01479305307777301 | 0.9965400387585364 |",
"| 0.01479305307777301 | 0.9800193410444061 |",
"+---------------------+--------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn aggregate_avg_add() -> Result<()> {
let results = execute_with_partition(
Expand Down
10 changes: 10 additions & 0 deletions datafusion/expr/src/window_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ pub enum WindowFrameBound {
Following(ScalarValue),
}

impl WindowFrameBound {
pub fn is_unbounded(&self) -> bool {
match self {
WindowFrameBound::Preceding(elem) => elem.is_null(),
WindowFrameBound::CurrentRow => false,
WindowFrameBound::Following(elem) => elem.is_null(),
}
}
}

impl TryFrom<ast::WindowFrameBound> for WindowFrameBound {
type Error = DataFusionError;

Expand Down
18 changes: 4 additions & 14 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use std::any::Any;
use std::convert::TryFrom;
use std::sync::Arc;

use crate::aggregate::row_accumulator::RowAccumulator;
use crate::aggregate::row_accumulator::{
is_row_accumulator_support_dtype, RowAccumulator,
};
use crate::aggregate::sum;
use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};
Expand Down Expand Up @@ -105,19 +107,7 @@ impl AggregateExpr for Avg {
}

fn row_accumulator_supported(&self) -> bool {
matches!(
self.data_type,
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
)
is_row_accumulator_support_dtype(&self.data_type)
}

fn create_row_accumulator(
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ impl AggregateExpr for Count {
) -> Result<Box<dyn RowAccumulator>> {
Ok(Box::new(CountRowAccumulator::new(start_index)))
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(CountAccumulator::new()))
}
}

#[derive(Debug)]
Expand Down
159 changes: 132 additions & 27 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ use datafusion_common::ScalarValue;
use datafusion_common::{downcast_value, DataFusionError, Result};
use datafusion_expr::Accumulator;

use crate::aggregate::row_accumulator::RowAccumulator;
use crate::aggregate::row_accumulator::{
is_row_accumulator_support_dtype, RowAccumulator,
};
use crate::expressions::format_state_name;
use arrow::array::Array;
use arrow::array::Decimal128Array;
use datafusion_row::accessor::RowAccessor;

use super::moving_min_max;

// Min/max aggregation can take Dictionary encode input but always produces unpacked
// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
// The reason min/max aggregate produces unpacked output because there is only one
Expand Down Expand Up @@ -117,19 +121,7 @@ impl AggregateExpr for Max {
}

fn row_accumulator_supported(&self) -> bool {
matches!(
self.data_type,
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
)
is_row_accumulator_support_dtype(&self.data_type)
}

fn create_row_accumulator(
Expand All @@ -141,6 +133,10 @@ impl AggregateExpr for Max {
self.data_type.clone(),
)))
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(SlidingMaxAccumulator::try_new(&self.data_type)?))
}
}

// Statically-typed version of min/max(array) -> ScalarValue for string types.
Expand Down Expand Up @@ -577,6 +573,62 @@ impl Accumulator for MaxAccumulator {
}
}

/// An accumulator to compute the maximum value
#[derive(Debug)]
pub struct SlidingMaxAccumulator {
max: ScalarValue,
moving_max: moving_min_max::MovingMax<ScalarValue>,
}

impl SlidingMaxAccumulator {
/// new max accumulator
pub fn try_new(datatype: &DataType) -> Result<Self> {
Ok(Self {
max: ScalarValue::try_from(datatype)?,
moving_max: moving_min_max::MovingMax::<ScalarValue>::new(),
})
}
}

impl Accumulator for SlidingMaxAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
for idx in 0..values[0].len() {
let val = ScalarValue::try_from_array(&values[0], idx)?;
self.moving_max.push(val);
}
if let Some(res) = self.moving_max.max() {
self.max = res.clone();
}
Ok(())
}

fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
for _idx in 0..values[0].len() {
(self.moving_max).pop();
}
if let Some(res) = self.moving_max.max() {
self.max = res.clone();
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
self.update_batch(states)
}

fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.max.clone()])
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(self.max.clone())
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
}
}

#[derive(Debug)]
struct MaxRowAccumulator {
index: usize,
Expand Down Expand Up @@ -679,19 +731,7 @@ impl AggregateExpr for Min {
}

fn row_accumulator_supported(&self) -> bool {
matches!(
self.data_type,
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
)
is_row_accumulator_support_dtype(&self.data_type)
}

fn create_row_accumulator(
Expand All @@ -703,6 +743,10 @@ impl AggregateExpr for Min {
self.data_type.clone(),
)))
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(SlidingMinAccumulator::try_new(&self.data_type)?))
}
}

/// An accumulator to compute the minimum value
Expand Down Expand Up @@ -745,6 +789,67 @@ impl Accumulator for MinAccumulator {
}
}

/// An accumulator to compute the minimum value
#[derive(Debug)]
pub struct SlidingMinAccumulator {
min: ScalarValue,
moving_min: moving_min_max::MovingMin<ScalarValue>,
}

impl SlidingMinAccumulator {
/// new min accumulator
pub fn try_new(datatype: &DataType) -> Result<Self> {
Ok(Self {
min: ScalarValue::try_from(datatype)?,
moving_min: moving_min_max::MovingMin::<ScalarValue>::new(),
})
}
}

impl Accumulator for SlidingMinAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.min.clone()])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
for idx in 0..values[0].len() {
let val = ScalarValue::try_from_array(&values[0], idx)?;
if !val.is_null() {
self.moving_min.push(val);
}
}
if let Some(res) = self.moving_min.min() {
self.min = res.clone();
}
Ok(())
}

fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
for idx in 0..values[0].len() {
let val = ScalarValue::try_from_array(&values[0], idx)?;
if !val.is_null() {
(self.moving_min).pop();
}
}
if let Some(res) = self.moving_min.min() {
self.min = res.clone();
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
self.update_batch(states)
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(self.min.clone())
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
}
}

#[derive(Debug)]
struct MinRowAccumulator {
index: usize,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(crate) mod median;
pub(crate) mod min_max;
pub mod build_in;
mod hyperloglog;
pub mod moving_min_max;
pub mod row_accumulator;
pub(crate) mod stats;
pub(crate) mod stddev;
Expand Down Expand Up @@ -101,4 +102,12 @@ pub trait AggregateExpr: Send + Sync + Debug {
self
)))
}

/// Creates accumulator implementation that supports retract
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Err(DataFusionError::NotImplemented(format!(
"Retractable Accumulator hasn't been implemented for {:?} yet",
self
)))
}
}
Loading

0 comments on commit afb1ae2

Please sign in to comment.