Skip to content

Commit

Permalink
Window Linear Mode use smaller buffers (#9597)
Browse files Browse the repository at this point in the history
* Initial commit

* Add better linear mode pruning

* Tmp

* Minor changes

* Old behaviour

* Tmp

* Minor changes

* Tmp

* Tmp

* Minor changes

* Add new test

* Minor changes

* Minor cahnges

* Add range support

* Simplifications

* Resolve linter errors

* Add range current row, 0 support

* Refactor to use Array ref instead of RecordBatch

* Add range support

* Resolve linter errors

* Minor changes

* Tmp

* Resolve linter errors

* Groups preceding non-zero support

* Groups preceding 0 support

* Groups current row support

* Groups current row support

* Fix multi order linear bug

* Simplifications

* Minor changes

* Minor changes

* Simplifications

* Update comments

* Improve code and comments

* Add comment to window tests

* Update test

* Update test

* Update comments

* Review

* Update failing test

---------

Co-authored-by: metesynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Mustafa Akur <[email protected]>
  • Loading branch information
4 people authored Mar 17, 2024
1 parent cf0f8ee commit 7d3747c
Show file tree
Hide file tree
Showing 4 changed files with 913 additions and 103 deletions.
165 changes: 100 additions & 65 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
create_window_expr, BoundedWindowAggExec, WindowAggExec,
};
use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode};
use datafusion::physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};
use datafusion::physical_plan::{collect, InputOrderMode};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
Expand All @@ -44,8 +45,6 @@ use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion_physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
// make_staggered_batches gives result sorted according to a, b, c
Expand Down Expand Up @@ -515,7 +514,8 @@ fn get_random_window_frame(rng: &mut StdRng, is_linear: bool) -> WindowFrame {
} else {
WindowFrameUnits::Groups
};
match units {

let mut window_frame = match units {
// In range queries window frame boundaries should match column type
WindowFrameUnits::Range => {
let start_bound = if start_bound.is_preceding {
Expand Down Expand Up @@ -566,6 +566,47 @@ fn get_random_window_frame(rng: &mut StdRng, is_linear: bool) -> WindowFrame {
// should work only with WindowAggExec
window_frame
}
};
convert_bound_to_current_row_if_applicable(rng, &mut window_frame.start_bound);
convert_bound_to_current_row_if_applicable(rng, &mut window_frame.end_bound);
window_frame
}

/// This utility converts `PRECEDING(0)` or `FOLLOWING(0)` specifiers in window
/// frame bounds to `CURRENT ROW` with 50% probability. This enables us to test
/// behaviour of the system in the `CURRENT ROW` mode.
fn convert_bound_to_current_row_if_applicable(
rng: &mut StdRng,
bound: &mut WindowFrameBound,
) {
match bound {
WindowFrameBound::Preceding(value) | WindowFrameBound::Following(value) => {
if let Ok(zero) = ScalarValue::new_zero(&value.data_type()) {
if value == &zero && rng.gen_range(0..2) == 0 {
*bound = WindowFrameBound::CurrentRow;
}
}
}
_ => {}
}
}

/// This utility determines whether a given window frame can be executed with
/// multiple ORDER BY expressions. As an example, range frames with offset (such
/// as `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING`) cannot have ORDER BY clauses
/// of the form `\[ORDER BY a ASC, b ASC, ...]`
fn can_accept_multi_orderby(window_frame: &WindowFrame) -> bool {
match window_frame.units {
WindowFrameUnits::Rows => true,
WindowFrameUnits::Range => {
// Range can only accept multi ORDER BY clauses when bounds are
// CURRENT ROW or UNBOUNDED PRECEDING/FOLLOWING:
(window_frame.start_bound.is_unbounded()
|| window_frame.start_bound == WindowFrameBound::CurrentRow)
&& (window_frame.end_bound.is_unbounded()
|| window_frame.end_bound == WindowFrameBound::CurrentRow)
}
WindowFrameUnits::Groups => true,
}
}

Expand All @@ -588,13 +629,16 @@ async fn run_window_test(
let mut orderby_exprs = vec![];
for column in &orderby_columns {
orderby_exprs.push(PhysicalSortExpr {
expr: col(column, &schema).unwrap(),
expr: col(column, &schema)?,
options: SortOptions::default(),
})
}
if orderby_exprs.len() > 1 && !can_accept_multi_orderby(&window_frame) {
orderby_exprs = orderby_exprs[0..1].to_vec();
}
let mut partitionby_exprs = vec![];
for column in &partition_by_columns {
partitionby_exprs.push(col(column, &schema).unwrap());
partitionby_exprs.push(col(column, &schema)?);
}
let mut sort_keys = vec![];
for partition_by_expr in &partitionby_exprs {
Expand All @@ -609,7 +653,7 @@ async fn run_window_test(
}
}

let concat_input_record = concat_batches(&schema, &input1).unwrap();
let concat_input_record = concat_batches(&schema, &input1)?;
let source_sort_keys = vec![
PhysicalSortExpr {
expr: col("a", &schema)?,
Expand All @@ -624,73 +668,59 @@ async fn run_window_test(
options: Default::default(),
},
];
let memory_exec =
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap();
let memory_exec = memory_exec.with_sort_information(vec![source_sort_keys.clone()]);
let mut exec1 = Arc::new(memory_exec) as Arc<dyn ExecutionPlan>;
let mut exec1 = Arc::new(
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None)?
.with_sort_information(vec![source_sort_keys.clone()]),
) as _;
// Table is ordered according to ORDER BY a, b, c In linear test we use PARTITION BY b, ORDER BY a
// For WindowAggExec to produce correct result it need table to be ordered by b,a. Hence add a sort.
if is_linear {
exec1 = Arc::new(SortExec::new(sort_keys.clone(), exec1)) as _;
exec1 = Arc::new(SortExec::new(sort_keys, exec1)) as _;
}

let usual_window_exec = Arc::new(
WindowAggExec::try_new(
vec![create_window_expr(
&window_fn,
fn_name.clone(),
&args,
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
schema.as_ref(),
false,
)
.unwrap()],
exec1,
vec![],
)
.unwrap(),
) as _;
let usual_window_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
&window_fn,
fn_name.clone(),
&args,
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
schema.as_ref(),
false,
)?],
exec1,
vec![],
)?) as _;
let exec2 = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
.unwrap()
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)?
.with_sort_information(vec![source_sort_keys.clone()]),
);
let running_window_exec = Arc::new(
BoundedWindowAggExec::try_new(
vec![create_window_expr(
&window_fn,
fn_name,
&args,
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
schema.as_ref(),
false,
)
.unwrap()],
exec2,
vec![],
search_mode,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
vec![create_window_expr(
&window_fn,
fn_name,
&args,
&partitionby_exprs,
&orderby_exprs,
Arc::new(window_frame.clone()),
schema.as_ref(),
false,
)?],
exec2,
vec![],
search_mode.clone(),
)?) as _;
let task_ctx = ctx.task_ctx();
let collected_usual = collect(usual_window_exec, task_ctx.clone()).await.unwrap();

let collected_running = collect(running_window_exec, task_ctx.clone())
.await
.unwrap();
let collected_usual = collect(usual_window_exec, task_ctx.clone()).await?;
let collected_running = collect(running_window_exec, task_ctx).await?;

// BoundedWindowAggExec should produce more chunk than the usual WindowAggExec.
// Otherwise it means that we cannot generate result in running mode.
assert!(collected_running.len() > collected_usual.len());
// compare
let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
let running_formatted = pretty_format_batches(&collected_running)
.unwrap()
.to_string();
let usual_formatted = pretty_format_batches(&collected_usual)?.to_string();
let running_formatted = pretty_format_batches(&collected_running)?.to_string();

let mut usual_formatted_sorted: Vec<&str> = usual_formatted.trim().lines().collect();
usual_formatted_sorted.sort_unstable();
Expand All @@ -703,11 +733,16 @@ async fn run_window_test(
.zip(&running_formatted_sorted)
.enumerate()
{
assert_eq!(
(i, usual_line),
(i, running_line),
"Inconsistent result for window_frame: {window_frame:?}, window_fn: {window_fn:?}, args:{args:?}"
);
if !usual_line.eq(running_line) {
println!("Inconsistent result for window_frame at line:{i:?}: {window_frame:?}, window_fn: {window_fn:?}, args:{args:?}, pb_cols:{partition_by_columns:?}, ob_cols:{orderby_columns:?}, search_mode:{search_mode:?}");
println!("--------usual_formatted_sorted----------------running_formatted_sorted--------");
for (line1, line2) in
usual_formatted_sorted.iter().zip(running_formatted_sorted)
{
println!("{:?} --- {:?}", line1, line2);
}
unreachable!();
}
}
Ok(())
}
Expand Down
44 changes: 37 additions & 7 deletions datafusion/expr/src/window_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

use std::{collections::VecDeque, ops::Range, sync::Arc};

use crate::{WindowFrame, WindowFrameBound, WindowFrameUnits};

use arrow::{
array::ArrayRef,
compute::{concat, SortOptions},
datatypes::DataType,
compute::{concat, concat_batches, SortOptions},
datatypes::{DataType, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::{
Expand All @@ -31,8 +33,6 @@ use datafusion_common::{
DataFusionError, Result, ScalarValue,
};

use crate::{WindowFrame, WindowFrameBound, WindowFrameUnits};

/// Holds the state of evaluating a window function
#[derive(Debug)]
pub struct WindowAggState {
Expand Down Expand Up @@ -246,14 +246,42 @@ impl WindowFrameContext {
/// State for each unique partition determined according to PARTITION BY column(s)
#[derive(Debug)]
pub struct PartitionBatchState {
/// The record_batch belonging to current partition
/// The record batch belonging to current partition
pub record_batch: RecordBatch,
/// The record batch that contains the most recent row at the input.
/// Please note that this batch doesn't necessarily have the same partitioning
/// with `record_batch`. Keeping track of this batch enables us to prune
/// `record_batch` when cardinality of the partition is sparse.
pub most_recent_row: Option<RecordBatch>,
/// Flag indicating whether we have received all data for this partition
pub is_end: bool,
/// Number of rows emitted for each partition
pub n_out_row: usize,
}

impl PartitionBatchState {
pub fn new(schema: SchemaRef) -> Self {
Self {
record_batch: RecordBatch::new_empty(schema),
most_recent_row: None,
is_end: false,
n_out_row: 0,
}
}

pub fn extend(&mut self, batch: &RecordBatch) -> Result<()> {
self.record_batch =
concat_batches(&self.record_batch.schema(), [&self.record_batch, batch])?;
Ok(())
}

pub fn set_most_recent_row(&mut self, batch: RecordBatch) {
// It is enough for the batch to contain only a single row (the rest
// are not necessary).
self.most_recent_row = Some(batch);
}
}

/// This structure encapsulates all the state information we require as we scan
/// ranges of data while processing RANGE frames.
/// Attribute `sort_options` stores the column ordering specified by the ORDER
Expand Down Expand Up @@ -639,12 +667,14 @@ fn check_equality(current: &[ScalarValue], target: &[ScalarValue]) -> Result<boo

#[cfg(test)]
mod tests {
use std::ops::Range;
use std::sync::Arc;

use super::*;
use crate::{WindowFrame, WindowFrameBound, WindowFrameUnits};

use arrow::array::{ArrayRef, Float64Array};
use datafusion_common::{Result, ScalarValue};
use std::ops::Range;
use std::sync::Arc;

fn get_test_data() -> (Vec<ArrayRef>, Vec<SortOptions>) {
let range_columns: Vec<ArrayRef> = vec![Arc::new(Float64Array::from(vec![
Expand Down
Loading

0 comments on commit 7d3747c

Please sign in to comment.