From 35a37a6d6060ef7ac502393f491d712ddb40bcea Mon Sep 17 00:00:00 2001 From: berkaycpp Date: Thu, 15 Dec 2022 01:22:56 +0300 Subject: [PATCH 01/11] MIN, MAX Aggregate Functions with custom window frames The functions are run with only float64 columns now as in the test case. Support for all types will be implemented. --- datafusion/core/tests/sql/aggregates.rs | 27 +++++++++++++ datafusion/physical-expr/Cargo.toml | 1 + .../physical-expr/src/aggregate/min_max.rs | 39 ++++++++++++++++--- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 2dd3a8dec8d3..646f52a51b92 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -682,6 +682,33 @@ async fn aggregate_grouped_min() -> Result<()> { Ok(()) } +#[tokio::test] +async fn aggregate_min_max_w_custom_window_frames() -> Result<()> { + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx).await?; + let sql = + "SELECT + MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN 0.2 PRECEDING AND 0.2 FOLLOWING) as min1, + MAX(c12) OVER (ORDER BY C12 RANGE BETWEEN 0.2 PRECEDING AND 0.2 FOLLOWING) as max1 + FROM aggregate_test_100 + ORDER BY C9 + LIMIT 5"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+---------------------+---------------------+", + "| min1 | max1 |", + "+---------------------+---------------------+", + "| 0.01479305307777301 | 0.21535402343780985 |", + "| 0.09465635123783445 | 0.4830878559436823 |", + "| 0.01479305307777301 | 0.21535402343780985 |", + "| 0.36936304600612724 | 0.7631239070049998 |", + "| 0.4830878559436823 | 0.819715865079681 |", + "+---------------------+---------------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} + #[tokio::test] async fn aggregate_avg_add() -> Result<()> { let results = execute_with_partition( diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 094d233a9001..1009dde975f1 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -61,6 +61,7 @@ regex = { version = "^1.4.3", optional = true } sha2 = { version = "^0.10.1", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } uuid = { version = "^1.2", features = ["v4"] } +moving_min_max = "1.3.0" [dev-dependencies] criterion = "0.4" diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 73f898c426b1..c7c9268de177 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -38,6 +38,7 @@ use arrow::{ use datafusion_common::ScalarValue; use datafusion_common::{downcast_value, DataFusionError, Result}; use datafusion_expr::{Accumulator, AggregateState}; +use moving_min_max::MovingMin; use crate::aggregate::row_accumulator::RowAccumulator; use crate::expressions::format_state_name; @@ -541,6 +542,7 @@ pub fn max_row(index: usize, accessor: &mut RowAccessor, s: &ScalarValue) -> Res #[derive(Debug)] pub struct MaxAccumulator { max: ScalarValue, + moving_max: Box>, } impl MaxAccumulator { @@ -548,15 +550,27 @@ impl MaxAccumulator { pub fn try_new(datatype: &DataType) -> Result { Ok(Self { max: ScalarValue::try_from(datatype)?, + moving_max: Box::new(moving_min_max::MovingMax::::new()), }) } } impl Accumulator for MaxAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = &values[0]; - let delta = &max_batch(values)?; - self.max = max(&self.max, delta)?; + let values = downcast_value!(values[0], Float64Array); + for i in 0..values.len() { + (self.moving_max).push(values.value(i)); + } + self.max = ScalarValue::from(*self.moving_max.max().unwrap()); + Ok(()) + } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = downcast_value!(values[0], Float64Array); + for i in 0..values.len() { + (self.moving_max).pop(); + } + self.max = ScalarValue::from(*self.moving_max.max().unwrap()); Ok(()) } @@ -709,6 +723,7 @@ impl AggregateExpr for Min { #[derive(Debug)] pub struct MinAccumulator { min: ScalarValue, + moving_min: Box>, } impl MinAccumulator { @@ -716,6 +731,7 @@ impl MinAccumulator { pub fn try_new(datatype: &DataType) -> Result { Ok(Self { min: ScalarValue::try_from(datatype)?, + moving_min: Box::new(moving_min_max::MovingMin::::new()), }) } } @@ -726,9 +742,20 @@ impl Accumulator for MinAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = &values[0]; - let delta = &min_batch(values)?; - self.min = min(&self.min, delta)?; + let values = downcast_value!(values[0], Float64Array); + for i in 0..values.len() { + (self.moving_min).push(values.value(i)); + } + self.min = ScalarValue::from(*self.moving_min.min().unwrap()); + Ok(()) + } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = downcast_value!(values[0], Float64Array); + for i in 0..values.len() { + (self.moving_min).pop(); + } + self.min = ScalarValue::from(*self.moving_min.min().unwrap()); Ok(()) } From 951bd0a9df305dc466b394082fc248972783d66c Mon Sep 17 00:00:00 2001 From: berkaycpp Date: Fri, 16 Dec 2022 20:05:20 +0300 Subject: [PATCH 02/11] All ScalarValue types are supported now. --- .../physical-expr/src/aggregate/min_max.rs | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index c7c9268de177..ae88e0c156a5 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -542,7 +542,7 @@ pub fn max_row(index: usize, accessor: &mut RowAccessor, s: &ScalarValue) -> Res #[derive(Debug)] pub struct MaxAccumulator { max: ScalarValue, - moving_max: Box>, + moving_max: moving_min_max::MovingMax, } impl MaxAccumulator { @@ -550,27 +550,30 @@ impl MaxAccumulator { pub fn try_new(datatype: &DataType) -> Result { Ok(Self { max: ScalarValue::try_from(datatype)?, - moving_max: Box::new(moving_min_max::MovingMax::::new()), + moving_max: moving_min_max::MovingMax::::new(), }) } } impl Accumulator for MaxAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = downcast_value!(values[0], Float64Array); - for i in 0..values.len() { - (self.moving_max).push(values.value(i)); + for idx in 0..values[0].len() { + let val = ScalarValue::try_from_array(&values[0], idx)?; + self.moving_max.push(val); + } + if let Some(res) = self.moving_max.max() { + self.max = res.clone(); } - self.max = ScalarValue::from(*self.moving_max.max().unwrap()); Ok(()) } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = downcast_value!(values[0], Float64Array); - for i in 0..values.len() { + for i in 0..values[0].len() { (self.moving_max).pop(); } - self.max = ScalarValue::from(*self.moving_max.max().unwrap()); + if let Some(res) = self.moving_max.max() { + self.max = res.clone(); + } Ok(()) } @@ -723,7 +726,7 @@ impl AggregateExpr for Min { #[derive(Debug)] pub struct MinAccumulator { min: ScalarValue, - moving_min: Box>, + moving_min: moving_min_max::MovingMin, } impl MinAccumulator { @@ -731,7 +734,7 @@ impl MinAccumulator { pub fn try_new(datatype: &DataType) -> Result { Ok(Self { min: ScalarValue::try_from(datatype)?, - moving_min: Box::new(moving_min_max::MovingMin::::new()), + moving_min: moving_min_max::MovingMin::::new(), }) } } @@ -742,20 +745,23 @@ impl Accumulator for MinAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = downcast_value!(values[0], Float64Array); - for i in 0..values.len() { - (self.moving_min).push(values.value(i)); + for idx in 0..values[0].len() { + let val = ScalarValue::try_from_array(&values[0], idx)?; + self.moving_min.push(val); + } + if let Some(res) = self.moving_min.min() { + self.min = res.clone(); } - self.min = ScalarValue::from(*self.moving_min.min().unwrap()); Ok(()) } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = downcast_value!(values[0], Float64Array); - for i in 0..values.len() { + for i in 0..values[0].len() { (self.moving_min).pop(); } - self.min = ScalarValue::from(*self.moving_min.min().unwrap()); + if let Some(res) = self.moving_min.min() { + self.min = res.clone(); + } Ok(()) } From 6b2461732c8feb14820b4e1559aca08c084109ba Mon Sep 17 00:00:00 2001 From: berkaycpp Date: Sun, 18 Dec 2022 00:54:43 +0300 Subject: [PATCH 03/11] moving_min_max crate dependency is eliminated --- datafusion/core/tests/sql/aggregates.rs | 22 +- datafusion/physical-expr/Cargo.toml | 1 - .../physical-expr/src/aggregate/min_max.rs | 3 +- datafusion/physical-expr/src/aggregate/mod.rs | 1 + .../src/aggregate/moving_min_max.rs | 245 ++++++++++++++++++ 5 files changed, 259 insertions(+), 13 deletions(-) create mode 100644 datafusion/physical-expr/src/aggregate/moving_min_max.rs diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 646f52a51b92..0650d46337e8 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -688,22 +688,22 @@ async fn aggregate_min_max_w_custom_window_frames() -> Result<()> { register_aggregate_csv(&ctx).await?; let sql = "SELECT - MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN 0.2 PRECEDING AND 0.2 FOLLOWING) as min1, - MAX(c12) OVER (ORDER BY C12 RANGE BETWEEN 0.2 PRECEDING AND 0.2 FOLLOWING) as max1 + MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN 0.3 PRECEDING AND 0.2 FOLLOWING) as min1, + MAX(c12) OVER (ORDER BY C11 RANGE BETWEEN 0.1 PRECEDING AND 0.2 FOLLOWING) as max1 FROM aggregate_test_100 ORDER BY C9 LIMIT 5"; let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ - "+---------------------+---------------------+", - "| min1 | max1 |", - "+---------------------+---------------------+", - "| 0.01479305307777301 | 0.21535402343780985 |", - "| 0.09465635123783445 | 0.4830878559436823 |", - "| 0.01479305307777301 | 0.21535402343780985 |", - "| 0.36936304600612724 | 0.7631239070049998 |", - "| 0.4830878559436823 | 0.819715865079681 |", - "+---------------------+---------------------+", + "+---------------------+--------------------+", + "| min1 | max1 |", + "+---------------------+--------------------+", + "| 0.01479305307777301 | 0.9965400387585364 |", + "| 0.01479305307777301 | 0.9800193410444061 |", + "| 0.01479305307777301 | 0.9706712283358269 |", + "| 0.2667177795079635 | 0.9965400387585364 |", + "| 0.3600766362333053 | 0.9706712283358269 |", + "+---------------------+--------------------+", ]; assert_batches_eq!(expected, &actual); Ok(()) diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 1009dde975f1..094d233a9001 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -61,7 +61,6 @@ regex = { version = "^1.4.3", optional = true } sha2 = { version = "^0.10.1", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } uuid = { version = "^1.2", features = ["v4"] } -moving_min_max = "1.3.0" [dev-dependencies] criterion = "0.4" diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index ae88e0c156a5..a5dc15f8504c 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -38,7 +38,6 @@ use arrow::{ use datafusion_common::ScalarValue; use datafusion_common::{downcast_value, DataFusionError, Result}; use datafusion_expr::{Accumulator, AggregateState}; -use moving_min_max::MovingMin; use crate::aggregate::row_accumulator::RowAccumulator; use crate::expressions::format_state_name; @@ -46,6 +45,8 @@ use arrow::array::Array; use arrow::array::Decimal128Array; use datafusion_row::accessor::RowAccessor; +use super::moving_min_max; + // Min/max aggregation can take Dictionary encode input but always produces unpacked // (aka non Dictionary) output. We need to adjust the output data type to reflect this. // The reason min/max aggregate produces unpacked output because there is only one diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index f6374687403e..8b7cc0a395b3 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -41,6 +41,7 @@ pub(crate) mod median; pub(crate) mod min_max; pub mod build_in; mod hyperloglog; +pub mod moving_min_max; pub mod row_accumulator; pub(crate) mod stats; pub(crate) mod stddev; diff --git a/datafusion/physical-expr/src/aggregate/moving_min_max.rs b/datafusion/physical-expr/src/aggregate/moving_min_max.rs new file mode 100644 index 000000000000..7d7c9c30f75c --- /dev/null +++ b/datafusion/physical-expr/src/aggregate/moving_min_max.rs @@ -0,0 +1,245 @@ +//! Keep track of the minimum or maximum value in a sliding window. +//! +//! `moving min max` provides one data structure for keeping track of the +//! minimum value and one for keeping track of the maximum value in a sliding +//! window. +//! +//! Each element is stored with the current min/max. One stack to push and another one for pop. If pop stack is empty, +//! push to this stack all elements popped from first stack while updating their current min/max. Now pop from +//! the second stack (MovingMin/Max struct works as a queue). To find the minimum element of the queue, +//! look at the smallest/largest two elements of the individual stacks, then take the minimum of those two values. +//! +//! The complexity of the operations are +//! - O(1) for getting the minimum/maximum +//! - O(1) for push +//! - amortized O(1) for pop + +/// let mut moving_min = MovingMin::::new(); +/// moving_min.push(2); +/// moving_min.push(1); +/// moving_min.push(3); +/// +/// assert_eq!(moving_min.min(), Some(&1)); +/// assert_eq!(moving_min.pop(), Some(2)); +/// +/// assert_eq!(moving_min.min(), Some(&1)); +/// assert_eq!(moving_min.pop(), Some(1)); +/// +/// assert_eq!(moving_min.min(), Some(&3)); +/// assert_eq!(moving_min.pop(), Some(3)); +/// +/// assert_eq!(moving_min.min(), None); +/// assert_eq!(moving_min.pop(), None); +#[derive(Debug)] +pub struct MovingMin { + push_stack: Vec<(T, T)>, + pop_stack: Vec<(T, T)>, +} + +impl Default for MovingMin { + fn default() -> Self { + Self { + push_stack: Vec::new(), + pop_stack: Vec::new(), + } + } +} + +impl MovingMin { + /// Creates a new `MovingMin` to keep track of the minimum in a sliding + /// window. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Creates a new `MovingMin` to keep track of the minimum in a sliding + /// window with `capacity` allocated slots. + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + Self { + push_stack: Vec::with_capacity(capacity), + pop_stack: Vec::with_capacity(capacity), + } + } + + /// Returns the minimum of the sliding window or `None` if the window is + /// empty. + #[inline] + pub fn min(&self) -> Option<&T> { + match (self.push_stack.last(), self.pop_stack.last()) { + (None, None) => None, + (Some((_, min)), None) => Some(min), + (None, Some((_, min))) => Some(min), + (Some((_, a)), Some((_, b))) => Some(if a < b { a } else { b }), + } + } + + /// Pushes a new element into the sliding window. + #[inline] + pub fn push(&mut self, val: T) { + self.push_stack.push(match self.push_stack.last() { + Some((_, min)) => { + if val > *min { + (val, min.clone()) + } else { + (val.clone(), val) + } + } + None => (val.clone(), val), + }); + } + + /// Removes and returns the last value of the sliding window. + #[inline] + pub fn pop(&mut self) -> Option { + if self.pop_stack.is_empty() { + match self.push_stack.pop() { + Some((val, _)) => { + self.pop_stack.push((val.clone(), val)); + while let Some((val, _)) = self.push_stack.pop() { + // This is safe, because we just pushed one element onto + // pop_stack and therefore it cannot be empty. + let last = unsafe { + self.pop_stack.get_unchecked(self.pop_stack.len() - 1) + }; + let min = if last.1 < val { + last.1.clone() + } else { + val.clone() + }; + self.pop_stack.push((val.clone(), min)); + } + } + None => return None, + } + } + self.pop_stack.pop().map(|(val, _)| val) + } + + /// Returns the number of elements stored in the sliding window. + #[inline] + pub fn len(&self) -> usize { + self.push_stack.len() + self.pop_stack.len() + } + + /// Returns `true` if the moving window contains no elements. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +/// let mut moving_max = MovingMax::::new(); +/// moving_max.push(2); +/// moving_max.push(3); +/// moving_max.push(1); +/// +/// assert_eq!(moving_max.max(), Some(&3)); +/// assert_eq!(moving_max.pop(), Some(2)); +/// +/// assert_eq!(moving_max.max(), Some(&3)); +/// assert_eq!(moving_max.pop(), Some(3)); +/// +/// assert_eq!(moving_max.max(), Some(&1)); +/// assert_eq!(moving_max.pop(), Some(1)); +/// +/// assert_eq!(moving_max.max(), None); +/// assert_eq!(moving_max.pop(), None); +#[derive(Debug)] +pub struct MovingMax { + push_stack: Vec<(T, T)>, + pop_stack: Vec<(T, T)>, +} + +impl Default for MovingMax { + fn default() -> Self { + Self { + push_stack: Vec::new(), + pop_stack: Vec::new(), + } + } +} + +impl MovingMax { + /// Creates a new `MovingMax` to keep track of the maximum in a sliding window. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Creates a new `MovingMax` to keep track of the maximum in a sliding window with + /// `capacity` allocated slots. + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + Self { + push_stack: Vec::with_capacity(capacity), + pop_stack: Vec::with_capacity(capacity), + } + } + + /// Returns the maximum of the sliding window or `None` if the window is empty. + #[inline] + pub fn max(&self) -> Option<&T> { + match (self.push_stack.last(), self.pop_stack.last()) { + (None, None) => None, + (Some((_, max)), None) => Some(max), + (None, Some((_, max))) => Some(max), + (Some((_, a)), Some((_, b))) => Some(if a > b { a } else { b }), + } + } + + /// Pushes a new element into the sliding window. + #[inline] + pub fn push(&mut self, val: T) { + self.push_stack.push(match self.push_stack.last() { + Some((_, max)) => { + if val < *max { + (val, max.clone()) + } else { + (val.clone(), val) + } + } + None => (val.clone(), val), + }); + } + + /// Removes and returns the last value of the sliding window. + #[inline] + pub fn pop(&mut self) -> Option { + if self.pop_stack.is_empty() { + match self.push_stack.pop() { + Some((val, _)) => { + self.pop_stack.push((val.clone(), val)); + while let Some((val, _)) = self.push_stack.pop() { + // This is safe, because we just pushed one element onto + // pop_stack and therefore it cannot be empty. + let last = unsafe { + self.pop_stack.get_unchecked(self.pop_stack.len() - 1) + }; + let max = if last.1 > val { + last.1.clone() + } else { + val.clone() + }; + self.pop_stack.push((val.clone(), max)); + } + } + None => return None, + } + } + self.pop_stack.pop().map(|(val, _)| val) + } + + /// Returns the number of elements stored in the sliding window. + #[inline] + pub fn len(&self) -> usize { + self.push_stack.len() + self.pop_stack.len() + } + + /// Returns `true` if the moving window contains no elements. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} From 5c0ce53b7f900a8f3c7566f486dc7bc2bbaf73b8 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 19 Dec 2022 09:11:07 +0300 Subject: [PATCH 04/11] use optimized accumulators when retract is not used --- .../core/src/physical_plan/windows/mod.rs | 31 +++- datafusion/core/tests/sql/aggregates.rs | 27 +++ datafusion/expr/src/window_frame.rs | 10 ++ .../physical-expr/src/aggregate/min_max.rs | 13 +- .../src/aggregate/moving_min_max.rs | 17 ++ .../src/window/forward_aggregate.rs | 159 ++++++++++++++++++ datafusion/physical-expr/src/window/mod.rs | 2 + 7 files changed, 248 insertions(+), 11 deletions(-) create mode 100644 datafusion/physical-expr/src/window/forward_aggregate.rs diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index 76d39a199245..21296cb07081 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -33,7 +33,9 @@ use datafusion_expr::{ window_function::{signature_for_built_in, BuiltInWindowFunction, WindowFunction}, WindowFrame, }; -use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; +use datafusion_physical_expr::window::{ + BuiltInWindowFunctionExpr, ForwardAggregateWindowExpr, +}; use std::convert::TryInto; use std::sync::Arc; @@ -55,12 +57,27 @@ pub fn create_window_expr( input_schema: &Schema, ) -> Result> { Ok(match fun { - WindowFunction::AggregateFunction(fun) => Arc::new(AggregateWindowExpr::new( - aggregates::create_aggregate_expr(fun, false, args, input_schema, name)?, - partition_by, - order_by, - window_frame, - )), + WindowFunction::AggregateFunction(fun) => { + let aggregate = + aggregates::create_aggregate_expr(fun, false, args, input_schema, name)?; + if aggregate.row_accumulator_supported() + && window_frame.start_bound.is_unbounded() + { + Arc::new(ForwardAggregateWindowExpr::new( + aggregate, + partition_by, + order_by, + window_frame, + )) + } else { + Arc::new(AggregateWindowExpr::new( + aggregate, + partition_by, + order_by, + window_frame, + )) + } + } WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr::new( create_built_in_window_expr(fun, args, input_schema, name)?, partition_by, diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 0650d46337e8..d405fe365a37 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -709,6 +709,33 @@ async fn aggregate_min_max_w_custom_window_frames() -> Result<()> { Ok(()) } +#[tokio::test] +async fn aggregate_min_max_w_custom_window_frames_unbounded_start() -> Result<()> { + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx).await?; + let sql = + "SELECT + MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN UNBOUNDED PRECEDING AND 0.2 FOLLOWING) as min1, + MAX(c12) OVER (ORDER BY C11 RANGE BETWEEN UNBOUNDED PRECEDING AND 0.2 FOLLOWING) as max1 + FROM aggregate_test_100 + ORDER BY C9 + LIMIT 5"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+---------------------+--------------------+", + "| min1 | max1 |", + "+---------------------+--------------------+", + "| 0.01479305307777301 | 0.9965400387585364 |", + "| 0.01479305307777301 | 0.9800193410444061 |", + "| 0.01479305307777301 | 0.9800193410444061 |", + "| 0.01479305307777301 | 0.9965400387585364 |", + "| 0.01479305307777301 | 0.9800193410444061 |", + "+---------------------+--------------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} + #[tokio::test] async fn aggregate_avg_add() -> Result<()> { let results = execute_with_partition( diff --git a/datafusion/expr/src/window_frame.rs b/datafusion/expr/src/window_frame.rs index 35790885e02f..62c7c57d47ba 100644 --- a/datafusion/expr/src/window_frame.rs +++ b/datafusion/expr/src/window_frame.rs @@ -147,6 +147,16 @@ pub enum WindowFrameBound { Following(ScalarValue), } +impl WindowFrameBound { + pub fn is_unbounded(&self) -> bool { + match self { + WindowFrameBound::Preceding(elem) => elem.is_null(), + WindowFrameBound::CurrentRow => false, + WindowFrameBound::Following(elem) => elem.is_null(), + } + } +} + impl TryFrom for WindowFrameBound { type Error = DataFusionError; diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 2c963a1efc08..bb1ff84f1f8b 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -569,7 +569,7 @@ impl Accumulator for MaxAccumulator { } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - for i in 0..values[0].len() { + for _idx in 0..values[0].len() { (self.moving_max).pop(); } if let Some(res) = self.moving_max.max() { @@ -748,7 +748,9 @@ impl Accumulator for MinAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { for idx in 0..values[0].len() { let val = ScalarValue::try_from_array(&values[0], idx)?; - self.moving_min.push(val); + if !val.is_null() { + self.moving_min.push(val); + } } if let Some(res) = self.moving_min.min() { self.min = res.clone(); @@ -757,8 +759,11 @@ impl Accumulator for MinAccumulator { } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - for i in 0..values[0].len() { - (self.moving_min).pop(); + for idx in 0..values[0].len() { + let val = ScalarValue::try_from_array(&values[0], idx)?; + if !val.is_null() { + (self.moving_min).pop(); + } } if let Some(res) = self.moving_min.min() { self.min = res.clone(); diff --git a/datafusion/physical-expr/src/aggregate/moving_min_max.rs b/datafusion/physical-expr/src/aggregate/moving_min_max.rs index 7d7c9c30f75c..a9888ab49a6a 100644 --- a/datafusion/physical-expr/src/aggregate/moving_min_max.rs +++ b/datafusion/physical-expr/src/aggregate/moving_min_max.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! Keep track of the minimum or maximum value in a sliding window. //! //! `moving min max` provides one data structure for keeping track of the diff --git a/datafusion/physical-expr/src/window/forward_aggregate.rs b/datafusion/physical-expr/src/window/forward_aggregate.rs new file mode 100644 index 000000000000..0c498b31d075 --- /dev/null +++ b/datafusion/physical-expr/src/window/forward_aggregate.rs @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical exec for aggregate window function expressions. + +use std::any::Any; +use std::iter::IntoIterator; +use std::sync::Arc; + +use arrow::array::Array; +use arrow::compute::SortOptions; +use arrow::record_batch::RecordBatch; +use arrow::{array::ArrayRef, datatypes::Field}; +use arrow_schema::Schema; + +use datafusion_common::Result; +use datafusion_common::ScalarValue; +use datafusion_expr::WindowFrame; +use datafusion_row::accessor::RowAccessor; +use datafusion_row::layout::RowLayout; +use datafusion_row::RowType; + +use crate::{expressions::PhysicalSortExpr, PhysicalExpr}; +use crate::{window::WindowExpr, AggregateExpr}; + +use super::window_frame_state::WindowFrameContext; + +/// A window expr that takes the form of an aggregate function +#[derive(Debug)] +pub struct ForwardAggregateWindowExpr { + aggregate: Arc, + partition_by: Vec>, + order_by: Vec, + window_frame: Arc, +} + +impl ForwardAggregateWindowExpr { + /// create a new aggregate window function expression + pub fn new( + aggregate: Arc, + partition_by: &[Arc], + order_by: &[PhysicalSortExpr], + window_frame: Arc, + ) -> Self { + Self { + aggregate, + partition_by: partition_by.to_vec(), + order_by: order_by.to_vec(), + window_frame, + } + } + + /// Get aggregate expr of AggregateWindowExpr + pub fn get_aggregate_expr(&self) -> &Arc { + &self.aggregate + } +} + +/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns +/// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same +/// results for peers) and concatenate the results. + +impl WindowExpr for ForwardAggregateWindowExpr { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn field(&self) -> Result { + self.aggregate.field() + } + + fn name(&self) -> &str { + self.aggregate.name() + } + + fn expressions(&self) -> Vec> { + self.aggregate.expressions() + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let partition_columns = self.partition_columns(batch)?; + let partition_points = + self.evaluate_partition_points(batch.num_rows(), &partition_columns)?; + let sort_options: Vec = + self.order_by.iter().map(|o| o.options).collect(); + let mut row_wise_results: Vec = vec![]; + for partition_range in &partition_points { + let fields = self.aggregate.state_fields().unwrap(); + let aggr_schema = Arc::new(Schema::new(fields)); + let layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned)); + let mut buffer: Vec = vec![0; layout.fixed_part_width()]; + let mut accessor = RowAccessor::new_from_layout(layout); + accessor.point_to(0, &mut buffer); + let mut accumulator = self.aggregate.create_row_accumulator(0)?; + let length = partition_range.end - partition_range.start; + let (values, order_bys) = + self.get_values_orderbys(&batch.slice(partition_range.start, length))?; + + let mut window_frame_ctx = WindowFrameContext::new(&self.window_frame); + let mut last_range: (usize, usize) = (0, 0); + + // We iterate on each row to perform a running calculation. + // First, cur_range is calculated, then it is compared with last_range. + for i in 0..length { + let cur_range = window_frame_ctx.calculate_range( + &order_bys, + &sort_options, + length, + i, + )?; + let value = if cur_range.0 == cur_range.1 { + // We produce None if the window is empty. + ScalarValue::try_from(self.aggregate.field()?.data_type())? + } else { + // Accumulate any new rows that have entered the window: + let update_bound = cur_range.1 - last_range.1; + if update_bound > 0 { + let update: Vec = values + .iter() + .map(|v| v.slice(last_range.1, update_bound)) + .collect(); + accumulator.update_batch(&update, &mut accessor)?; + } + accumulator.evaluate(&accessor)? + }; + row_wise_results.push(value); + last_range = cur_range; + } + } + ScalarValue::iter_to_array(row_wise_results.into_iter()) + } + + fn partition_by(&self) -> &[Arc] { + &self.partition_by + } + + fn order_by(&self) -> &[PhysicalSortExpr] { + &self.order_by + } + + fn get_window_frame(&self) -> &Arc { + &self.window_frame + } +} diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs index 40ed658ee38a..636c659a447d 100644 --- a/datafusion/physical-expr/src/window/mod.rs +++ b/datafusion/physical-expr/src/window/mod.rs @@ -19,6 +19,7 @@ mod aggregate; mod built_in; mod built_in_window_function_expr; pub(crate) mod cume_dist; +mod forward_aggregate; pub(crate) mod lead_lag; pub(crate) mod nth_value; pub(crate) mod partition_evaluator; @@ -30,4 +31,5 @@ mod window_frame_state; pub use aggregate::AggregateWindowExpr; pub use built_in::BuiltInWindowExpr; pub use built_in_window_function_expr::BuiltInWindowFunctionExpr; +pub use forward_aggregate::ForwardAggregateWindowExpr; pub use window_expr::WindowExpr; From f287f7568c143f7d27cb20ac4e7dde93c88340f2 Mon Sep 17 00:00:00 2001 From: berkaycpp Date: Tue, 20 Dec 2022 21:38:15 +0300 Subject: [PATCH 05/11] Where the algorithm was taken from added as a comment --- datafusion/physical-expr/src/aggregate/moving_min_max.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-expr/src/aggregate/moving_min_max.rs b/datafusion/physical-expr/src/aggregate/moving_min_max.rs index a9888ab49a6a..8817bdf9dcb7 100644 --- a/datafusion/physical-expr/src/aggregate/moving_min_max.rs +++ b/datafusion/physical-expr/src/aggregate/moving_min_max.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// The implementation is taken from https://github.com/spebern/moving_min_max/blob/master/src/lib.rs. + //! Keep track of the minimum or maximum value in a sliding window. //! //! `moving min max` provides one data structure for keeping track of the From abe2dc0ec0d69ae98d37d68c9b262f246586e947 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 21 Dec 2022 10:49:55 +0300 Subject: [PATCH 06/11] Change design to use sliding implementation when absolutely necessary --- .../src/datasource/file_format/parquet.rs | 4 +- datafusion/core/src/datasource/mod.rs | 13 ++- .../core/src/physical_plan/windows/mod.rs | 8 +- .../physical-expr/src/aggregate/count.rs | 4 + .../physical-expr/src/aggregate/min_max.rs | 96 ++++++++++++++++++- datafusion/physical-expr/src/aggregate/mod.rs | 8 ++ datafusion/physical-expr/src/aggregate/sum.rs | 4 + .../physical-expr/src/expressions/mod.rs | 2 +- .../physical-expr/src/window/aggregate.rs | 9 -- datafusion/physical-expr/src/window/mod.rs | 4 +- ...ward_aggregate.rs => sliding_aggregate.rs} | 31 +++--- 11 files changed, 139 insertions(+), 44 deletions(-) rename datafusion/physical-expr/src/window/{forward_aggregate.rs => sliding_aggregate.rs} (85%) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6e1fa17824fc..fe2ede270896 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -47,7 +47,7 @@ use crate::config::OPT_PARQUET_SKIP_METADATA; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::logical_expr::Expr; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; +use crate::physical_plan::expressions::{MinAccumulator, SlidingMaxAccumulator}; use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter}; use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; @@ -220,7 +220,7 @@ impl FileFormat for ParquetFormat { } fn summarize_min_max( - max_values: &mut [Option], + max_values: &mut [Option], min_values: &mut [Option], fields: &[Field], i: usize, diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index a0d5121090ee..8a5c7313065c 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -41,7 +41,7 @@ pub use self::view::ViewTable; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; pub use crate::logical_expr::TableType; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; +use crate::physical_plan::expressions::{MinAccumulator, SlidingMaxAccumulator}; use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics}; use futures::StreamExt; @@ -155,11 +155,14 @@ pub async fn get_statistics_with_limit( fn create_max_min_accs( schema: &Schema, -) -> (Vec>, Vec>) { - let max_values: Vec> = schema +) -> ( + Vec>, + Vec>, +) { + let max_values: Vec> = schema .fields() .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) + .map(|field| SlidingMaxAccumulator::try_new(field.data_type()).ok()) .collect::>(); let min_values: Vec> = schema .fields() @@ -172,7 +175,7 @@ fn create_max_min_accs( fn get_col_stats( schema: &Schema, null_counts: Vec, - max_values: &mut [Option], + max_values: &mut [Option], min_values: &mut [Option], ) -> Vec { (0..schema.fields().len()) diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index 21296cb07081..b226f3413722 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -34,7 +34,7 @@ use datafusion_expr::{ WindowFrame, }; use datafusion_physical_expr::window::{ - BuiltInWindowFunctionExpr, ForwardAggregateWindowExpr, + BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr, }; use std::convert::TryInto; use std::sync::Arc; @@ -60,10 +60,8 @@ pub fn create_window_expr( WindowFunction::AggregateFunction(fun) => { let aggregate = aggregates::create_aggregate_expr(fun, false, args, input_schema, name)?; - if aggregate.row_accumulator_supported() - && window_frame.start_bound.is_unbounded() - { - Arc::new(ForwardAggregateWindowExpr::new( + if !window_frame.start_bound.is_unbounded() { + Arc::new(SlidingAggregateWindowExpr::new( aggregate, partition_by, order_by, diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index d8dd6b9b30f1..6c43344db97a 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -104,6 +104,10 @@ impl AggregateExpr for Count { ) -> Result> { Ok(Box::new(CountRowAccumulator::new(start_index))) } + + fn create_sliding_accumulator(&self) -> Result> { + Ok(Box::new(CountAccumulator::new())) + } } #[derive(Debug)] diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index bb1ff84f1f8b..9cb59b9c9162 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -143,6 +143,10 @@ impl AggregateExpr for Max { self.data_type.clone(), ))) } + + fn create_sliding_accumulator(&self) -> Result> { + Ok(Box::new(SlidingMaxAccumulator::try_new(&self.data_type)?)) + } } // Statically-typed version of min/max(array) -> ScalarValue for string types. @@ -543,7 +547,6 @@ pub fn max_row(index: usize, accessor: &mut RowAccessor, s: &ScalarValue) -> Res #[derive(Debug)] pub struct MaxAccumulator { max: ScalarValue, - moving_max: moving_min_max::MovingMax, } impl MaxAccumulator { @@ -551,12 +554,53 @@ impl MaxAccumulator { pub fn try_new(datatype: &DataType) -> Result { Ok(Self { max: ScalarValue::try_from(datatype)?, - moving_max: moving_min_max::MovingMax::::new(), }) } } impl Accumulator for MaxAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = &values[0]; + let delta = &max_batch(values)?; + self.max = max(&self.max, delta)?; + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } + + fn state(&self) -> Result> { + Ok(vec![self.max.clone()]) + } + + fn evaluate(&self) -> Result { + Ok(self.max.clone()) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size() + } +} + +/// An accumulator to compute the maximum value +#[derive(Debug)] +pub struct SlidingMaxAccumulator { + max: ScalarValue, + moving_max: moving_min_max::MovingMax, +} + +impl SlidingMaxAccumulator { + /// new max accumulator + pub fn try_new(datatype: &DataType) -> Result { + Ok(Self { + max: ScalarValue::try_from(datatype)?, + moving_max: moving_min_max::MovingMax::::new(), + }) + } +} + +impl Accumulator for SlidingMaxAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { for idx in 0..values[0].len() { let val = ScalarValue::try_from_array(&values[0], idx)?; @@ -721,13 +765,16 @@ impl AggregateExpr for Min { self.data_type.clone(), ))) } + + fn create_sliding_accumulator(&self) -> Result> { + Ok(Box::new(SlidingMinAccumulator::try_new(&self.data_type)?)) + } } /// An accumulator to compute the minimum value #[derive(Debug)] pub struct MinAccumulator { min: ScalarValue, - moving_min: moving_min_max::MovingMin, } impl MinAccumulator { @@ -735,7 +782,6 @@ impl MinAccumulator { pub fn try_new(datatype: &DataType) -> Result { Ok(Self { min: ScalarValue::try_from(datatype)?, - moving_min: moving_min_max::MovingMin::::new(), }) } } @@ -745,6 +791,48 @@ impl Accumulator for MinAccumulator { Ok(vec![self.min.clone()]) } + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = &values[0]; + let delta = &min_batch(values)?; + self.min = min(&self.min, delta)?; + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } + + fn evaluate(&self) -> Result { + Ok(self.min.clone()) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size() + } +} + +/// An accumulator to compute the minimum value +#[derive(Debug)] +pub struct SlidingMinAccumulator { + min: ScalarValue, + moving_min: moving_min_max::MovingMin, +} + +impl SlidingMinAccumulator { + /// new min accumulator + pub fn try_new(datatype: &DataType) -> Result { + Ok(Self { + min: ScalarValue::try_from(datatype)?, + moving_min: moving_min_max::MovingMin::::new(), + }) + } +} + +impl Accumulator for SlidingMinAccumulator { + fn state(&self) -> Result> { + Ok(vec![self.min.clone()]) + } + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { for idx in 0..values[0].len() { let val = ScalarValue::try_from_array(&values[0], idx)?; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 8b7cc0a395b3..436a2339663f 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -102,4 +102,12 @@ pub trait AggregateExpr: Send + Sync + Debug { self ))) } + + /// Creates accumulator implementation that supports retract + fn create_sliding_accumulator(&self) -> Result> { + Err(DataFusionError::NotImplemented(format!( + "Retractable Accumulator hasn't been implemented for {:?} yet", + self + ))) + } } diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index f40c85a39f27..f5f59ef5e504 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -132,6 +132,10 @@ impl AggregateExpr for Sum { self.data_type.clone(), ))) } + + fn create_sliding_accumulator(&self) -> Result> { + Ok(Box::new(SumAccumulator::try_new(&self.data_type)?)) + } } #[derive(Debug)] diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 8222fb6648ee..aeb58c0a07f6 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -54,7 +54,7 @@ pub use crate::aggregate::covariance::{Covariance, CovariancePop}; pub use crate::aggregate::grouping::Grouping; pub use crate::aggregate::median::Median; pub use crate::aggregate::min_max::{Max, Min}; -pub use crate::aggregate::min_max::{MaxAccumulator, MinAccumulator}; +pub use crate::aggregate::min_max::{MinAccumulator, SlidingMaxAccumulator}; pub use crate::aggregate::stats::StatsType; pub use crate::aggregate::stddev::{Stddev, StddevPop}; pub use crate::aggregate::sum::Sum; diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 52a43050b1cc..c42f7ff55a36 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -126,15 +126,6 @@ impl WindowExpr for AggregateWindowExpr { .collect(); accumulator.update_batch(&update)? } - // Remove rows that have now left the window: - let retract_bound = cur_range.0 - last_range.0; - if retract_bound > 0 { - let retract: Vec = values - .iter() - .map(|v| v.slice(last_range.0, retract_bound)) - .collect(); - accumulator.retract_batch(&retract)? - } accumulator.evaluate()? }; row_wise_results.push(value); diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs index 636c659a447d..c8501c0f333b 100644 --- a/datafusion/physical-expr/src/window/mod.rs +++ b/datafusion/physical-expr/src/window/mod.rs @@ -19,17 +19,17 @@ mod aggregate; mod built_in; mod built_in_window_function_expr; pub(crate) mod cume_dist; -mod forward_aggregate; pub(crate) mod lead_lag; pub(crate) mod nth_value; pub(crate) mod partition_evaluator; pub(crate) mod rank; pub(crate) mod row_number; +mod sliding_aggregate; mod window_expr; mod window_frame_state; pub use aggregate::AggregateWindowExpr; pub use built_in::BuiltInWindowExpr; pub use built_in_window_function_expr::BuiltInWindowFunctionExpr; -pub use forward_aggregate::ForwardAggregateWindowExpr; +pub use sliding_aggregate::SlidingAggregateWindowExpr; pub use window_expr::WindowExpr; diff --git a/datafusion/physical-expr/src/window/forward_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs similarity index 85% rename from datafusion/physical-expr/src/window/forward_aggregate.rs rename to datafusion/physical-expr/src/window/sliding_aggregate.rs index 0c498b31d075..9dbaca76e689 100644 --- a/datafusion/physical-expr/src/window/forward_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -25,14 +25,10 @@ use arrow::array::Array; use arrow::compute::SortOptions; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, datatypes::Field}; -use arrow_schema::Schema; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::WindowFrame; -use datafusion_row::accessor::RowAccessor; -use datafusion_row::layout::RowLayout; -use datafusion_row::RowType; use crate::{expressions::PhysicalSortExpr, PhysicalExpr}; use crate::{window::WindowExpr, AggregateExpr}; @@ -41,14 +37,14 @@ use super::window_frame_state::WindowFrameContext; /// A window expr that takes the form of an aggregate function #[derive(Debug)] -pub struct ForwardAggregateWindowExpr { +pub struct SlidingAggregateWindowExpr { aggregate: Arc, partition_by: Vec>, order_by: Vec, window_frame: Arc, } -impl ForwardAggregateWindowExpr { +impl SlidingAggregateWindowExpr { /// create a new aggregate window function expression pub fn new( aggregate: Arc, @@ -74,7 +70,7 @@ impl ForwardAggregateWindowExpr { /// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same /// results for peers) and concatenate the results. -impl WindowExpr for ForwardAggregateWindowExpr { +impl WindowExpr for SlidingAggregateWindowExpr { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self @@ -100,13 +96,7 @@ impl WindowExpr for ForwardAggregateWindowExpr { self.order_by.iter().map(|o| o.options).collect(); let mut row_wise_results: Vec = vec![]; for partition_range in &partition_points { - let fields = self.aggregate.state_fields().unwrap(); - let aggr_schema = Arc::new(Schema::new(fields)); - let layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned)); - let mut buffer: Vec = vec![0; layout.fixed_part_width()]; - let mut accessor = RowAccessor::new_from_layout(layout); - accessor.point_to(0, &mut buffer); - let mut accumulator = self.aggregate.create_row_accumulator(0)?; + let mut accumulator = self.aggregate.create_sliding_accumulator()?; let length = partition_range.end - partition_range.start; let (values, order_bys) = self.get_values_orderbys(&batch.slice(partition_range.start, length))?; @@ -134,9 +124,18 @@ impl WindowExpr for ForwardAggregateWindowExpr { .iter() .map(|v| v.slice(last_range.1, update_bound)) .collect(); - accumulator.update_batch(&update, &mut accessor)?; + accumulator.update_batch(&update)? } - accumulator.evaluate(&accessor)? + // Remove rows that have now left the window: + let retract_bound = cur_range.0 - last_range.0; + if retract_bound > 0 { + let retract: Vec = values + .iter() + .map(|v| v.slice(last_range.0, retract_bound)) + .collect(); + accumulator.retract_batch(&retract)? + } + accumulator.evaluate()? }; row_wise_results.push(value); last_range = cur_range; From 0ed62f3c2e89e6f898f141b0282951945aa544be Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 21 Dec 2022 11:42:48 +0300 Subject: [PATCH 07/11] add fuzzy tests for moving min max, remove unsafe block --- .../src/aggregate/moving_min_max.rs | 94 ++++++++++++++++--- 1 file changed, 80 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/moving_min_max.rs b/datafusion/physical-expr/src/aggregate/moving_min_max.rs index 8817bdf9dcb7..87e8277c5391 100644 --- a/datafusion/physical-expr/src/aggregate/moving_min_max.rs +++ b/datafusion/physical-expr/src/aggregate/moving_min_max.rs @@ -115,19 +115,16 @@ impl MovingMin { if self.pop_stack.is_empty() { match self.push_stack.pop() { Some((val, _)) => { - self.pop_stack.push((val.clone(), val)); + let mut last = (val.clone(), val); + self.pop_stack.push(last.clone()); while let Some((val, _)) = self.push_stack.pop() { - // This is safe, because we just pushed one element onto - // pop_stack and therefore it cannot be empty. - let last = unsafe { - self.pop_stack.get_unchecked(self.pop_stack.len() - 1) - }; let min = if last.1 < val { last.1.clone() } else { val.clone() }; - self.pop_stack.push((val.clone(), min)); + last = (val.clone(), min); + self.pop_stack.push(last.clone()); } } None => return None, @@ -229,19 +226,16 @@ impl MovingMax { if self.pop_stack.is_empty() { match self.push_stack.pop() { Some((val, _)) => { - self.pop_stack.push((val.clone(), val)); + let mut last = (val.clone(), val); + self.pop_stack.push(last.clone()); while let Some((val, _)) = self.push_stack.pop() { - // This is safe, because we just pushed one element onto - // pop_stack and therefore it cannot be empty. - let last = unsafe { - self.pop_stack.get_unchecked(self.pop_stack.len() - 1) - }; let max = if last.1 > val { last.1.clone() } else { val.clone() }; - self.pop_stack.push((val.clone(), max)); + last = (val.clone(), max); + self.pop_stack.push(last.clone()); } } None => return None, @@ -262,3 +256,75 @@ impl MovingMax { self.len() == 0 } } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_common::Result; + use rand::Rng; + + fn get_random_vec_i32(len: usize) -> Vec { + let mut rng = rand::thread_rng(); + let mut input = Vec::with_capacity(len); + for _i in 0..len { + input.push(rng.gen_range(0..100)); + } + input + } + + fn moving_min_i32(len: usize, n_sliding_window: usize) -> Result<()> { + let data = get_random_vec_i32(len); + let mut expected = Vec::with_capacity(len); + let mut moving_min = MovingMin::::new(); + let mut res = Vec::with_capacity(len); + for i in 0..len { + let start = i.saturating_sub(n_sliding_window); + expected.push(*data[start..i + 1].iter().min().unwrap()); + + moving_min.push(data[i]); + if i > n_sliding_window { + moving_min.pop(); + } + res.push(*moving_min.min().unwrap()); + } + assert_eq!(res, expected); + Ok(()) + } + + fn moving_max_i32(len: usize, n_sliding_window: usize) -> Result<()> { + let data = get_random_vec_i32(len); + let mut expected = Vec::with_capacity(len); + let mut moving_max = MovingMax::::new(); + let mut res = Vec::with_capacity(len); + for i in 0..len { + let start = i.saturating_sub(n_sliding_window); + expected.push(*data[start..i + 1].iter().max().unwrap()); + + moving_max.push(data[i]); + if i > n_sliding_window { + moving_max.pop(); + } + res.push(*moving_max.max().unwrap()); + } + assert_eq!(res, expected); + Ok(()) + } + + #[test] + fn moving_min_tests() -> Result<()> { + moving_min_i32(100, 10)?; + moving_min_i32(100, 20)?; + moving_min_i32(100, 50)?; + moving_min_i32(100, 100)?; + Ok(()) + } + + #[test] + fn moving_max_tests() -> Result<()> { + moving_max_i32(100, 10)?; + moving_max_i32(100, 20)?; + moving_max_i32(100, 50)?; + moving_max_i32(100, 100)?; + Ok(()) + } +} From a40a1df5553797abba3a9d184b62cf85bebe472d Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 21 Dec 2022 16:15:32 +0300 Subject: [PATCH 08/11] remove duplicated code --- .../physical-expr/src/aggregate/average.rs | 18 +++-------- .../physical-expr/src/aggregate/min_max.rs | 32 +++---------------- .../src/aggregate/row_accumulator.rs | 18 +++++++++++ datafusion/physical-expr/src/aggregate/sum.rs | 18 +++-------- 4 files changed, 31 insertions(+), 55 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index afb0791f213c..12f84ca1f798 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -21,7 +21,9 @@ use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; -use crate::aggregate::row_accumulator::RowAccumulator; +use crate::aggregate::row_accumulator::{ + is_row_accumulator_support_dtype, RowAccumulator, +}; use crate::aggregate::sum; use crate::expressions::format_state_name; use crate::{AggregateExpr, PhysicalExpr}; @@ -105,19 +107,7 @@ impl AggregateExpr for Avg { } fn row_accumulator_supported(&self) -> bool { - matches!( - self.data_type, - DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::Float32 - | DataType::Float64 - ) + is_row_accumulator_support_dtype(&self.data_type) } fn create_row_accumulator( diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 9cb59b9c9162..a7bd6c360a90 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -39,7 +39,9 @@ use datafusion_common::ScalarValue; use datafusion_common::{downcast_value, DataFusionError, Result}; use datafusion_expr::Accumulator; -use crate::aggregate::row_accumulator::RowAccumulator; +use crate::aggregate::row_accumulator::{ + is_row_accumulator_support_dtype, RowAccumulator, +}; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::array::Decimal128Array; @@ -119,19 +121,7 @@ impl AggregateExpr for Max { } fn row_accumulator_supported(&self) -> bool { - matches!( - self.data_type, - DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::Float32 - | DataType::Float64 - ) + is_row_accumulator_support_dtype(&self.data_type) } fn create_row_accumulator( @@ -741,19 +731,7 @@ impl AggregateExpr for Min { } fn row_accumulator_supported(&self) -> bool { - matches!( - self.data_type, - DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::Float32 - | DataType::Float64 - ) + is_row_accumulator_support_dtype(&self.data_type) } fn create_row_accumulator( diff --git a/datafusion/physical-expr/src/aggregate/row_accumulator.rs b/datafusion/physical-expr/src/aggregate/row_accumulator.rs index 386787454f85..d26da8f4cec9 100644 --- a/datafusion/physical-expr/src/aggregate/row_accumulator.rs +++ b/datafusion/physical-expr/src/aggregate/row_accumulator.rs @@ -18,6 +18,7 @@ //! Accumulator over row format use arrow::array::ArrayRef; +use arrow_schema::DataType; use datafusion_common::{Result, ScalarValue}; use datafusion_row::accessor::RowAccessor; use std::fmt::Debug; @@ -63,3 +64,20 @@ pub trait RowAccumulator: Send + Sync + Debug { /// State's starting field index in the row. fn state_index(&self) -> usize; } + +/// Returns if `data_type` is supported with `RowAccumulator` +pub fn is_row_accumulator_support_dtype(data_type: &DataType) -> bool { + matches!( + data_type, + DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + ) +} diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index f5f59ef5e504..8d2620296c2e 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -34,7 +34,9 @@ use arrow::{ use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; -use crate::aggregate::row_accumulator::RowAccumulator; +use crate::aggregate::row_accumulator::{ + is_row_accumulator_support_dtype, RowAccumulator, +}; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::array::Decimal128Array; @@ -108,19 +110,7 @@ impl AggregateExpr for Sum { } fn row_accumulator_supported(&self) -> bool { - matches!( - self.data_type, - DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::Float32 - | DataType::Float64 - ) + is_row_accumulator_support_dtype(&self.data_type) } fn create_row_accumulator( From b28b22cf0cb0058da03e794a2411ea13ff5fb720 Mon Sep 17 00:00:00 2001 From: berkaycpp Date: Thu, 22 Dec 2022 01:36:11 +0300 Subject: [PATCH 09/11] doc comments for example codes --- datafusion/physical-expr/src/aggregate/moving_min_max.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/aggregate/moving_min_max.rs b/datafusion/physical-expr/src/aggregate/moving_min_max.rs index 87e8277c5391..46f7f2e4ca6f 100644 --- a/datafusion/physical-expr/src/aggregate/moving_min_max.rs +++ b/datafusion/physical-expr/src/aggregate/moving_min_max.rs @@ -33,6 +33,8 @@ //! - O(1) for push //! - amortized O(1) for pop +/// ``` +/// /// let mut moving_min = MovingMin::::new(); /// moving_min.push(2); /// moving_min.push(1); @@ -49,6 +51,7 @@ /// /// assert_eq!(moving_min.min(), None); /// assert_eq!(moving_min.pop(), None); +/// ``` #[derive(Debug)] pub struct MovingMin { push_stack: Vec<(T, T)>, @@ -145,7 +148,8 @@ impl MovingMin { self.len() == 0 } } - +/// ``` +/// /// let mut moving_max = MovingMax::::new(); /// moving_max.push(2); /// moving_max.push(3); @@ -162,6 +166,7 @@ impl MovingMin { /// /// assert_eq!(moving_max.max(), None); /// assert_eq!(moving_max.pop(), None); +/// ``` #[derive(Debug)] pub struct MovingMax { push_stack: Vec<(T, T)>, From bc3172dc07b97cb0ed8fa82bfd90714e73172cf8 Mon Sep 17 00:00:00 2001 From: berkaycpp Date: Thu, 22 Dec 2022 03:39:01 +0300 Subject: [PATCH 10/11] cargo test error correction --- datafusion/physical-expr/src/aggregate/moving_min_max.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/moving_min_max.rs b/datafusion/physical-expr/src/aggregate/moving_min_max.rs index 46f7f2e4ca6f..c4fb07679747 100644 --- a/datafusion/physical-expr/src/aggregate/moving_min_max.rs +++ b/datafusion/physical-expr/src/aggregate/moving_min_max.rs @@ -34,7 +34,7 @@ //! - amortized O(1) for pop /// ``` -/// +/// # use datafusion_physical_expr::aggregate::moving_min_max::MovingMin; /// let mut moving_min = MovingMin::::new(); /// moving_min.push(2); /// moving_min.push(1); @@ -149,7 +149,7 @@ impl MovingMin { } } /// ``` -/// +/// # use datafusion_physical_expr::aggregate::moving_min_max::MovingMax; /// let mut moving_max = MovingMax::::new(); /// moving_max.push(2); /// moving_max.push(3); From 870511a8c84976f023a8b096b052ff3fafd1df92 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 22 Dec 2022 10:01:28 +0300 Subject: [PATCH 11/11] revert unnecessary changes --- .../core/src/datasource/file_format/parquet.rs | 4 ++-- datafusion/core/src/datasource/mod.rs | 13 +++++-------- datafusion/physical-expr/src/expressions/mod.rs | 2 +- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index fe2ede270896..6e1fa17824fc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -47,7 +47,7 @@ use crate::config::OPT_PARQUET_SKIP_METADATA; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::logical_expr::Expr; -use crate::physical_plan::expressions::{MinAccumulator, SlidingMaxAccumulator}; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter}; use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; @@ -220,7 +220,7 @@ impl FileFormat for ParquetFormat { } fn summarize_min_max( - max_values: &mut [Option], + max_values: &mut [Option], min_values: &mut [Option], fields: &[Field], i: usize, diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 8a5c7313065c..a0d5121090ee 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -41,7 +41,7 @@ pub use self::view::ViewTable; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; pub use crate::logical_expr::TableType; -use crate::physical_plan::expressions::{MinAccumulator, SlidingMaxAccumulator}; +use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics}; use futures::StreamExt; @@ -155,14 +155,11 @@ pub async fn get_statistics_with_limit( fn create_max_min_accs( schema: &Schema, -) -> ( - Vec>, - Vec>, -) { - let max_values: Vec> = schema +) -> (Vec>, Vec>) { + let max_values: Vec> = schema .fields() .iter() - .map(|field| SlidingMaxAccumulator::try_new(field.data_type()).ok()) + .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) .collect::>(); let min_values: Vec> = schema .fields() @@ -175,7 +172,7 @@ fn create_max_min_accs( fn get_col_stats( schema: &Schema, null_counts: Vec, - max_values: &mut [Option], + max_values: &mut [Option], min_values: &mut [Option], ) -> Vec { (0..schema.fields().len()) diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index aeb58c0a07f6..8222fb6648ee 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -54,7 +54,7 @@ pub use crate::aggregate::covariance::{Covariance, CovariancePop}; pub use crate::aggregate::grouping::Grouping; pub use crate::aggregate::median::Median; pub use crate::aggregate::min_max::{Max, Min}; -pub use crate::aggregate::min_max::{MinAccumulator, SlidingMaxAccumulator}; +pub use crate::aggregate::min_max::{MaxAccumulator, MinAccumulator}; pub use crate::aggregate::stats::StatsType; pub use crate::aggregate::stddev::{Stddev, StddevPop}; pub use crate::aggregate::sum::Sum;