From 40628e2eb6464481f1226f25f370401be3cb80a5 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Wed, 1 Nov 2023 19:17:47 +0300 Subject: [PATCH 1/3] NTH_VALUE reverse support --- datafusion/core/tests/window.rs | 146 ++++++++++++++++++ .../physical-expr/src/window/nth_value.rs | 68 ++++++-- .../physical-expr/src/window/window_expr.rs | 2 +- .../proto/src/physical_plan/to_proto.rs | 2 +- datafusion/sqllogictest/test_files/window.slt | 59 +++++++ 5 files changed, 263 insertions(+), 14 deletions(-) create mode 100644 datafusion/core/tests/window.rs diff --git a/datafusion/core/tests/window.rs b/datafusion/core/tests/window.rs new file mode 100644 index 000000000000..45b413247155 --- /dev/null +++ b/datafusion/core/tests/window.rs @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test Window Queries +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::prelude::SessionContext; +use datafusion_common::{assert_batches_eq, Result, ScalarValue}; +use datafusion_execution::config::SessionConfig; +use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; +use datafusion_physical_expr::expressions::{col, NthValue}; +use datafusion_physical_expr::window::{ + BuiltInWindowExpr, BuiltInWindowFunctionExpr, WindowExpr, +}; +use datafusion_physical_plan::memory::MemoryExec; +use datafusion_physical_plan::windows::{BoundedWindowAggExec, PartitionSearchMode}; +use datafusion_physical_plan::{collect, displayable, ExecutionPlan}; +use std::sync::Arc; + +/// Utility function yielding a string representation of the given [`ExecutionPlan`]. +pub fn get_plan_string(plan: &Arc) -> Vec { + let formatted = displayable(plan.as_ref()).indent(true).to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + actual.iter().map(|elem| elem.to_string()).collect() +} + +// Tests NTH_VALUE(negative index) with memoize feature. +// To be able to trigger memoize feature for NTH_VALUE we need to +// - feed BoundedWindowAggExec with batch stream data. +// - Window frame should contain UNBOUNDED PRECEDING. +// It hard to ensure these conditions are met, from the sql query. +#[tokio::test] +async fn test_window_nth_value_bounded_memoize() -> Result<()> { + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + // Create a new batch of data to insert into the table + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], + )?; + + let memory_exec = Arc::new(MemoryExec::try_new( + &[vec![batch.clone(), batch.clone(), batch.clone()]], + schema.clone(), + None, + )?) as Arc; + let col_a = col("a", &schema)?; + let nth_value_func1 = + NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1)? + .reverse_expr() + .unwrap(); + let nth_value_func2 = + NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2)? + .reverse_expr() + .unwrap(); + let last_value_func = Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32)) + as Arc; + let window_exprs = vec![ + // LAST_VALUE(a) + Arc::new(BuiltInWindowExpr::new( + last_value_func, + &[], + &[], + Arc::new(WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + end_bound: WindowFrameBound::CurrentRow, + }), + )) as Arc, + // NTH_VALUE(a, -1) + Arc::new(BuiltInWindowExpr::new( + nth_value_func1, + &[], + &[], + Arc::new(WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + end_bound: WindowFrameBound::CurrentRow, + }), + )) as Arc, + // NTH_VALUE(a, -2) + Arc::new(BuiltInWindowExpr::new( + nth_value_func2, + &[], + &[], + Arc::new(WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + end_bound: WindowFrameBound::CurrentRow, + }), + )) as Arc, + ]; + let physical_plan = Arc::new(BoundedWindowAggExec::try_new( + window_exprs, + memory_exec.clone(), + vec![], + PartitionSearchMode::Sorted, + )?) as Arc; + + let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?; + + let expected = vec![ + "BoundedWindowAggExec: wdw=[last: Ok(Field { name: \"last\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }, nth_value(-1): Ok(Field { name: \"nth_value(-1)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }, nth_value(-2): Ok(Field { name: \"nth_value(-2)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]", + " MemoryExec: partitions=1, partition_sizes=[3]", + ]; + // Get string representation of the plan + let actual = get_plan_string(&physical_plan); + assert_eq!( + expected, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let expected = [ + "+---+------+---------------+---------------+", + "| a | last | nth_value(-1) | nth_value(-2) |", + "+---+------+---------------+---------------+", + "| 1 | 1 | 1 | |", + "| 2 | 2 | 2 | 1 |", + "| 3 | 3 | 3 | 2 |", + "| 1 | 1 | 1 | 3 |", + "| 2 | 2 | 2 | 1 |", + "| 3 | 3 | 3 | 2 |", + "| 1 | 1 | 1 | 3 |", + "| 2 | 2 | 2 | 1 |", + "| 3 | 3 | 3 | 2 |", + "+---+------+---------------+---------------+", + ]; + assert_batches_eq!(expected, &batches); + Ok(()) +} diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 262a50969b82..16c28c364544 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -28,6 +28,7 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_expr::window_state::WindowAggState; use datafusion_expr::PartitionEvaluator; use std::any::Any; +use std::cmp::Ordering; use std::ops::Range; use std::sync::Arc; @@ -82,7 +83,7 @@ impl NthValue { name: name.into(), expr, data_type, - kind: NthValueKind::Nth(n), + kind: NthValueKind::Nth(n as i64), }), } } @@ -125,7 +126,7 @@ impl BuiltInWindowFunctionExpr for NthValue { let reversed_kind = match self.kind { NthValueKind::First => NthValueKind::Last, NthValueKind::Last => NthValueKind::First, - NthValueKind::Nth(_) => return None, + NthValueKind::Nth(idx) => NthValueKind::Nth(-idx), }; Some(Arc::new(Self { name: self.name.clone(), @@ -152,7 +153,13 @@ impl PartitionEvaluator for NthValueEvaluator { fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> { let out = &state.out_col; let size = out.len(); - let (is_prunable, is_last) = match self.state.kind { + // Stores how many entries we need to keep track in the buffer to calculate correct result. + // If we can memoize a result (FIRST, NTH_VALUE(positive_index)). It is enough to keep only single row + // For LAST_VALUE also it is enough to keep single row (last row) + // However, For NTH_VALUE(negative_index) we need to keep at least ABS(negative_index) number of values + // in the buffer. + let mut n_buffer_size = 1; + let (is_prunable, is_reverse_direction) = match self.state.kind { NthValueKind::First => { let n_range = state.window_frame_range.end - state.window_frame_range.start; @@ -162,16 +169,30 @@ impl PartitionEvaluator for NthValueEvaluator { NthValueKind::Nth(n) => { let n_range = state.window_frame_range.end - state.window_frame_range.start; - (n_range >= (n as usize) && size >= (n as usize), false) + match n.cmp(&0) { + Ordering::Greater => { + (n_range >= (n as usize) && size > (n as usize), false) + } + Ordering::Less => { + let reverse_index = -n as usize; + n_buffer_size = reverse_index; + // Negative index represents reverse direction. + (n_range >= reverse_index, true) + } + Ordering::Equal => { + // n = 0 is not valid for nth_value (index starts from 0) + unreachable!(); + } + } } }; if is_prunable { - if self.state.finalized_result.is_none() && !is_last { + if self.state.finalized_result.is_none() && !is_reverse_direction { let result = ScalarValue::try_from_array(out, size - 1)?; self.state.finalized_result = Some(result); } state.window_frame_range.start = - state.window_frame_range.end.saturating_sub(1); + state.window_frame_range.end.saturating_sub(n_buffer_size); } Ok(()) } @@ -195,12 +216,35 @@ impl PartitionEvaluator for NthValueEvaluator { NthValueKind::First => ScalarValue::try_from_array(arr, range.start), NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1), NthValueKind::Nth(n) => { - // We are certain that n > 0. - let index = (n as usize) - 1; - if index >= n_range { - ScalarValue::try_from(arr.data_type()) - } else { - ScalarValue::try_from_array(arr, range.start + index) + match n.cmp(&0) { + Ordering::Greater => { + // SQL indices are not 0-based. + let index = (n as usize) - 1; + if index >= n_range { + // Outside the range, Return NULL + ScalarValue::try_from(arr.data_type()) + } else { + ScalarValue::try_from_array(arr, range.start + index) + } + } + // n < 0 + Ordering::Less => { + let reverse_index = -n as usize; + if n_range >= reverse_index { + ScalarValue::try_from_array( + arr, + // Calculate proper index using length(`n_range`) and distance(`reverse_index`) from the end + range.start + n_range - reverse_index, + ) + } else { + // Outside the range, Return NULL + ScalarValue::try_from(arr.data_type()) + } + } + Ordering::Equal => { + // n = 0 is not valid for nth_value (index starts from 0) + unreachable!(); + } } } } diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 9b0a02d329c4..d55addd6f0c6 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -290,7 +290,7 @@ pub struct NumRowsState { pub enum NthValueKind { First, Last, - Nth(u32), + Nth(i64), } #[derive(Debug, Clone)] diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 114baab6ccc4..18d3f0d3dee7 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -167,7 +167,7 @@ impl TryFrom> for protobuf::PhysicalWindowExprNode { args.insert( 1, Arc::new(Literal::new( - datafusion_common::ScalarValue::Int64(Some(n as i64)), + datafusion_common::ScalarValue::Int64(Some(n)), )), ); protobuf::BuiltInWindowFunction::NthValue diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 213f6daaef3e..aee3faf2df46 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3385,3 +3385,62 @@ query II select sum(1) over() x, sum(1) over () y ---- 1 1 + +# Create a table having 3 columns which are ordering equivalent by the source. In the next step, +# we will expect to observe the removed SortExec by propagating the orders across projection. +statement ok +CREATE EXTERNAL TABLE multiple_ordered_table ( + a0 INTEGER, + a INTEGER, + b INTEGER, + c INTEGER, + d INTEGER +) +STORED AS CSV +WITH HEADER ROW +WITH ORDER (a ASC) +WITH ORDER (b ASC) +WITH ORDER (c ASC) +LOCATION '../core/tests/data/window_2.csv'; + +query TT +EXPLAIN SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1 + FROM multiple_ordered_table + ORDER BY c ASC + LIMIT 5 +---- +logical_plan +Limit: skip=0, fetch=5 +--Sort: multiple_ordered_table.c ASC NULLS LAST, fetch=5 +----Projection: multiple_ordered_table.c, NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS nv1 +------WindowAggr: windowExpr=[[NTH_VALUE(multiple_ordered_table.c, Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +--------TableScan: multiple_ordered_table projection=[c] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--ProjectionExec: expr=[c@0 as c, NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as nv1] +----WindowAggExec: wdw=[NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "NTH_VALUE(multiple_ordered_table.c,Int64(2)) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int32(NULL)) }] +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true + +query II +SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1 + FROM multiple_ordered_table + ORDER BY c ASC + LIMIT 5 +---- +0 98 +1 98 +2 98 +3 98 +4 98 + +query II +SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1 + FROM multiple_ordered_table + ORDER BY c DESC + LIMIT 5 +---- +99 NULL +98 98 +97 98 +96 98 +95 98 From 201b6ac26a739fef5697d8914ad591565fd2be1f Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sat, 25 Nov 2023 17:33:37 +0300 Subject: [PATCH 2/3] Review --- .../enforce_distribution.rs | 6 +- .../src/physical_optimizer/enforce_sorting.rs | 3 +- .../physical_optimizer/projection_pushdown.rs | 3 +- .../replace_with_order_preserving_variants.rs | 9 +- .../core/src/physical_optimizer/utils.rs | 9 +- datafusion/core/tests/window.rs | 40 +++---- .../physical-expr/src/window/nth_value.rs | 112 ++++++++---------- .../physical-expr/src/window/window_expr.rs | 14 ++- datafusion/physical-plan/src/lib.rs | 7 ++ .../proto/src/physical_plan/to_proto.rs | 6 +- 10 files changed, 92 insertions(+), 117 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 4aedc3b0d1a9..95c44362049c 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -28,8 +28,8 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::utils::{ - add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions, - is_repartition, is_sort_preserving_merge, ExecTree, + add_sort_above, get_children_exectrees, is_coalesce_partitions, is_repartition, + is_sort_preserving_merge, ExecTree, }; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; @@ -54,8 +54,8 @@ use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ physical_exprs_equal, EquivalenceProperties, PhysicalExpr, }; -use datafusion_physical_plan::unbounded_output; use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; +use datafusion_physical_plan::{get_plan_string, unbounded_output}; use itertools::izip; diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 2590948d3b3e..14ed59c8e2af 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -763,9 +763,8 @@ mod tests { repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, spr_repartition_exec, union_exec, }; - use crate::physical_optimizer::utils::get_plan_string; use crate::physical_plan::repartition::RepartitionExec; - use crate::physical_plan::{displayable, Partitioning}; + use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::csv_exec_sorted; diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 74d0de507e4c..e737aa8410b0 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1089,7 +1089,6 @@ mod tests { use crate::physical_optimizer::projection_pushdown::{ join_table_borders, update_expr, ProjectionPushdown, }; - use crate::physical_optimizer::utils::get_plan_string; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::filter::FilterExec; @@ -1100,7 +1099,7 @@ mod tests { use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; - use crate::physical_plan::ExecutionPlan; + use crate::physical_plan::{get_plan_string, ExecutionPlan}; use arrow_schema::{DataType, Field, Schema, SortOptions}; use datafusion_common::config::ConfigOptions; diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 7f8c9b852cb1..bce5f417ae45 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -286,7 +286,7 @@ mod tests { use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; - use crate::physical_plan::{displayable, Partitioning}; + use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::SessionConfig; use arrow::compute::SortOptions; @@ -929,11 +929,4 @@ mod tests { FileCompressionType::UNCOMPRESSED, )) } - - // Util function to get string representation of a physical plan - fn get_plan_string(plan: &Arc) -> Vec { - let formatted = displayable(plan.as_ref()).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - actual.iter().map(|elem| elem.to_string()).collect() - } } diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 530df374ca7c..fccc1db0d359 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -28,7 +28,7 @@ use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use crate::physical_plan::{displayable, ExecutionPlan}; +use crate::physical_plan::{get_plan_string, ExecutionPlan}; use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement}; @@ -154,10 +154,3 @@ pub fn is_union(plan: &Arc) -> bool { pub fn is_repartition(plan: &Arc) -> bool { plan.as_any().is::() } - -/// Utility function yielding a string representation of the given [`ExecutionPlan`]. -pub fn get_plan_string(plan: &Arc) -> Vec { - let formatted = displayable(plan.as_ref()).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - actual.iter().map(|elem| elem.to_string()).collect() -} diff --git a/datafusion/core/tests/window.rs b/datafusion/core/tests/window.rs index 45b413247155..5a22aa1f7ec7 100644 --- a/datafusion/core/tests/window.rs +++ b/datafusion/core/tests/window.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! Test Window Queries +//! Tests for window queries +use std::sync::Arc; + use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::SessionContext; @@ -23,20 +25,10 @@ use datafusion_common::{assert_batches_eq, Result, ScalarValue}; use datafusion_execution::config::SessionConfig; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; use datafusion_physical_expr::expressions::{col, NthValue}; -use datafusion_physical_expr::window::{ - BuiltInWindowExpr, BuiltInWindowFunctionExpr, WindowExpr, -}; +use datafusion_physical_expr::window::{BuiltInWindowExpr, BuiltInWindowFunctionExpr}; use datafusion_physical_plan::memory::MemoryExec; use datafusion_physical_plan::windows::{BoundedWindowAggExec, PartitionSearchMode}; -use datafusion_physical_plan::{collect, displayable, ExecutionPlan}; -use std::sync::Arc; - -/// Utility function yielding a string representation of the given [`ExecutionPlan`]. -pub fn get_plan_string(plan: &Arc) -> Vec { - let formatted = displayable(plan.as_ref()).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - actual.iter().map(|elem| elem.to_string()).collect() -} +use datafusion_physical_plan::{collect, get_plan_string, ExecutionPlan}; // Tests NTH_VALUE(negative index) with memoize feature. // To be able to trigger memoize feature for NTH_VALUE we need to @@ -55,11 +47,12 @@ async fn test_window_nth_value_bounded_memoize() -> Result<()> { vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], )?; - let memory_exec = Arc::new(MemoryExec::try_new( + let memory_exec = MemoryExec::try_new( &[vec![batch.clone(), batch.clone(), batch.clone()]], schema.clone(), None, - )?) as Arc; + ) + .map(|e| Arc::new(e) as Arc)?; let col_a = col("a", &schema)?; let nth_value_func1 = NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1)? @@ -69,8 +62,8 @@ async fn test_window_nth_value_bounded_memoize() -> Result<()> { NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2)? .reverse_expr() .unwrap(); - let last_value_func = Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32)) - as Arc; + let last_value_func = + Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32)) as _; let window_exprs = vec![ // LAST_VALUE(a) Arc::new(BuiltInWindowExpr::new( @@ -82,7 +75,7 @@ async fn test_window_nth_value_bounded_memoize() -> Result<()> { start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), end_bound: WindowFrameBound::CurrentRow, }), - )) as Arc, + )) as _, // NTH_VALUE(a, -1) Arc::new(BuiltInWindowExpr::new( nth_value_func1, @@ -93,7 +86,7 @@ async fn test_window_nth_value_bounded_memoize() -> Result<()> { start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), end_bound: WindowFrameBound::CurrentRow, }), - )) as Arc, + )) as _, // NTH_VALUE(a, -2) Arc::new(BuiltInWindowExpr::new( nth_value_func2, @@ -104,14 +97,15 @@ async fn test_window_nth_value_bounded_memoize() -> Result<()> { start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), end_bound: WindowFrameBound::CurrentRow, }), - )) as Arc, + )) as _, ]; - let physical_plan = Arc::new(BoundedWindowAggExec::try_new( + let physical_plan = BoundedWindowAggExec::try_new( window_exprs, - memory_exec.clone(), + memory_exec, vec![], PartitionSearchMode::Sorted, - )?) as Arc; + ) + .map(|e| Arc::new(e) as Arc)?; let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?; diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 16c28c364544..bd9bf6fcd641 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -15,22 +15,23 @@ // specific language governing permissions and limitations // under the License. -//! Defines physical expressions for `first_value`, `last_value`, and `nth_value` -//! that can evaluated at runtime during query execution +//! Defines physical expressions for `FIRST_VALUE`, `LAST_VALUE`, and `NTH_VALUE` +//! functions that can be evaluated at run time during query execution. + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; use crate::window::window_expr::{NthValueKind, NthValueState}; use crate::window::BuiltInWindowFunctionExpr; use crate::PhysicalExpr; + use arrow::array::{Array, ArrayRef}; use arrow::datatypes::{DataType, Field}; use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::window_state::WindowAggState; use datafusion_expr::PartitionEvaluator; -use std::any::Any; -use std::cmp::Ordering; -use std::ops::Range; -use std::sync::Arc; /// nth_value expression #[derive(Debug)] @@ -78,7 +79,7 @@ impl NthValue { n: u32, ) -> Result { match n { - 0 => exec_err!("nth_value expect n to be > 0"), + 0 => exec_err!("NTH_VALUE expects n to be non-zero"), _ => Ok(Self { name: name.into(), expr, @@ -88,7 +89,7 @@ impl NthValue { } } - /// Get nth_value kind + /// Get the NTH_VALUE kind pub fn get_kind(&self) -> NthValueKind { self.kind } @@ -144,21 +145,16 @@ pub(crate) struct NthValueEvaluator { } impl PartitionEvaluator for NthValueEvaluator { - /// When the window frame has a fixed beginning (e.g UNBOUNDED - /// PRECEDING), for some functions such as FIRST_VALUE, LAST_VALUE and - /// NTH_VALUE we can memoize result. Once result is calculated it - /// will always stay same. Hence, we do not need to keep past data - /// as we process the entire dataset. This feature enables us to - /// prune rows from table. The default implementation does nothing + /// When the window frame has a fixed beginning (e.g UNBOUNDED PRECEDING), + /// for some functions such as FIRST_VALUE, LAST_VALUE and NTH_VALUE, we + /// can memoize the result. Once result is calculated, it will always stay + /// same. Hence, we do not need to keep past data as we process the entire + /// dataset. fn memoize(&mut self, state: &mut WindowAggState) -> Result<()> { let out = &state.out_col; let size = out.len(); - // Stores how many entries we need to keep track in the buffer to calculate correct result. - // If we can memoize a result (FIRST, NTH_VALUE(positive_index)). It is enough to keep only single row - // For LAST_VALUE also it is enough to keep single row (last row) - // However, For NTH_VALUE(negative_index) we need to keep at least ABS(negative_index) number of values - // in the buffer. - let mut n_buffer_size = 1; + let mut buffer_size = 1; + // Decide if we arrived at a final result yet: let (is_prunable, is_reverse_direction) = match self.state.kind { NthValueKind::First => { let n_range = @@ -169,20 +165,17 @@ impl PartitionEvaluator for NthValueEvaluator { NthValueKind::Nth(n) => { let n_range = state.window_frame_range.end - state.window_frame_range.start; - match n.cmp(&0) { - Ordering::Greater => { - (n_range >= (n as usize) && size > (n as usize), false) - } - Ordering::Less => { - let reverse_index = -n as usize; - n_buffer_size = reverse_index; - // Negative index represents reverse direction. - (n_range >= reverse_index, true) - } - Ordering::Equal => { - // n = 0 is not valid for nth_value (index starts from 0) - unreachable!(); - } + #[allow(clippy::comparison_chain)] + if n > 0 { + (n_range >= (n as usize) && size > (n as usize), false) + } else if n < 0 { + let reverse_index = (-n) as usize; + buffer_size = reverse_index; + // Negative index represents reverse direction. + (n_range >= reverse_index, true) + } else { + // The case n = 0 is not valid for the NTH_VALUE function. + unreachable!(); } } }; @@ -192,7 +185,7 @@ impl PartitionEvaluator for NthValueEvaluator { self.state.finalized_result = Some(result); } state.window_frame_range.start = - state.window_frame_range.end.saturating_sub(n_buffer_size); + state.window_frame_range.end.saturating_sub(buffer_size); } Ok(()) } @@ -216,35 +209,30 @@ impl PartitionEvaluator for NthValueEvaluator { NthValueKind::First => ScalarValue::try_from_array(arr, range.start), NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1), NthValueKind::Nth(n) => { - match n.cmp(&0) { - Ordering::Greater => { - // SQL indices are not 0-based. - let index = (n as usize) - 1; - if index >= n_range { - // Outside the range, Return NULL - ScalarValue::try_from(arr.data_type()) - } else { - ScalarValue::try_from_array(arr, range.start + index) - } - } - // n < 0 - Ordering::Less => { - let reverse_index = -n as usize; - if n_range >= reverse_index { - ScalarValue::try_from_array( - arr, - // Calculate proper index using length(`n_range`) and distance(`reverse_index`) from the end - range.start + n_range - reverse_index, - ) - } else { - // Outside the range, Return NULL - ScalarValue::try_from(arr.data_type()) - } + #[allow(clippy::comparison_chain)] + if n > 0 { + // SQL indices are not 0-based. + let index = (n as usize) - 1; + if index >= n_range { + // Outside the range, return NULL: + ScalarValue::try_from(arr.data_type()) + } else { + ScalarValue::try_from_array(arr, range.start + index) } - Ordering::Equal => { - // n = 0 is not valid for nth_value (index starts from 0) - unreachable!(); + } else if n < 0 { + let reverse_index = (-n) as usize; + if n_range >= reverse_index { + ScalarValue::try_from_array( + arr, + range.start + n_range - reverse_index, + ) + } else { + // Outside the range, return NULL: + ScalarValue::try_from(arr.data_type()) } + } else { + // The case n = 0 is not valid for the NTH_VALUE function. + unreachable!(); } } } diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 232dd4b89cf8..4211a616e100 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -15,7 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::fmt::Debug; +use std::ops::Range; +use std::sync::Arc; + use crate::{PhysicalExpr, PhysicalSortExpr}; + use arrow::array::{new_empty_array, Array, ArrayRef}; use arrow::compute::kernels::sort::SortColumn; use arrow::compute::SortOptions; @@ -25,13 +31,9 @@ use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::window_state::{ PartitionBatchState, WindowAggState, WindowFrameContext, }; -use datafusion_expr::PartitionEvaluator; -use datafusion_expr::{Accumulator, WindowFrame}; +use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame}; + use indexmap::IndexMap; -use std::any::Any; -use std::fmt::Debug; -use std::ops::Range; -use std::sync::Arc; /// Common trait for [window function] implementations /// diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index e5cd5e674cb1..b2c69b467e9c 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -570,5 +570,12 @@ pub fn unbounded_output(plan: &Arc) -> bool { .unwrap_or(true) } +/// Utility function yielding a string representation of the given [`ExecutionPlan`]. +pub fn get_plan_string(plan: &Arc) -> Vec { + let formatted = displayable(plan.as_ref()).indent(true).to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + actual.iter().map(|elem| elem.to_string()).collect() +} + #[cfg(test)] pub mod test; diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 27e720684b34..ea00b726b9d6 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -27,11 +27,11 @@ use crate::protobuf::{ physical_aggregate_expr_node, PhysicalSortExprNode, PhysicalSortExprNodeCollection, ScalarValue, }; + use datafusion::datasource::{ - file_format::json::JsonSink, physical_plan::FileScanConfig, -}; -use datafusion::datasource::{ + file_format::json::JsonSink, listing::{FileRange, PartitionedFile}, + physical_plan::FileScanConfig, physical_plan::FileSinkConfig, }; use datafusion::logical_expr::BuiltinScalarFunction; From 62b6e3333dcfa64c9656755703490ab31ab57f65 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 28 Nov 2023 17:08:17 +0300 Subject: [PATCH 3/3] Address reviews --- datafusion/core/tests/window.rs | 140 ------------------ .../physical-expr/src/window/nth_value.rs | 73 ++++----- .../src/windows/bounded_window_agg_exec.rs | 128 ++++++++++++++++ datafusion/sqllogictest/test_files/window.slt | 8 + 4 files changed, 176 insertions(+), 173 deletions(-) delete mode 100644 datafusion/core/tests/window.rs diff --git a/datafusion/core/tests/window.rs b/datafusion/core/tests/window.rs deleted file mode 100644 index 5a22aa1f7ec7..000000000000 --- a/datafusion/core/tests/window.rs +++ /dev/null @@ -1,140 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Tests for window queries -use std::sync::Arc; - -use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Schema}; -use datafusion::prelude::SessionContext; -use datafusion_common::{assert_batches_eq, Result, ScalarValue}; -use datafusion_execution::config::SessionConfig; -use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; -use datafusion_physical_expr::expressions::{col, NthValue}; -use datafusion_physical_expr::window::{BuiltInWindowExpr, BuiltInWindowFunctionExpr}; -use datafusion_physical_plan::memory::MemoryExec; -use datafusion_physical_plan::windows::{BoundedWindowAggExec, PartitionSearchMode}; -use datafusion_physical_plan::{collect, get_plan_string, ExecutionPlan}; - -// Tests NTH_VALUE(negative index) with memoize feature. -// To be able to trigger memoize feature for NTH_VALUE we need to -// - feed BoundedWindowAggExec with batch stream data. -// - Window frame should contain UNBOUNDED PRECEDING. -// It hard to ensure these conditions are met, from the sql query. -#[tokio::test] -async fn test_window_nth_value_bounded_memoize() -> Result<()> { - let config = SessionConfig::new().with_target_partitions(1); - let ctx = SessionContext::new_with_config(config); - - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - // Create a new batch of data to insert into the table - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], - )?; - - let memory_exec = MemoryExec::try_new( - &[vec![batch.clone(), batch.clone(), batch.clone()]], - schema.clone(), - None, - ) - .map(|e| Arc::new(e) as Arc)?; - let col_a = col("a", &schema)?; - let nth_value_func1 = - NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1)? - .reverse_expr() - .unwrap(); - let nth_value_func2 = - NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2)? - .reverse_expr() - .unwrap(); - let last_value_func = - Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32)) as _; - let window_exprs = vec![ - // LAST_VALUE(a) - Arc::new(BuiltInWindowExpr::new( - last_value_func, - &[], - &[], - Arc::new(WindowFrame { - units: WindowFrameUnits::Rows, - start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), - end_bound: WindowFrameBound::CurrentRow, - }), - )) as _, - // NTH_VALUE(a, -1) - Arc::new(BuiltInWindowExpr::new( - nth_value_func1, - &[], - &[], - Arc::new(WindowFrame { - units: WindowFrameUnits::Rows, - start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), - end_bound: WindowFrameBound::CurrentRow, - }), - )) as _, - // NTH_VALUE(a, -2) - Arc::new(BuiltInWindowExpr::new( - nth_value_func2, - &[], - &[], - Arc::new(WindowFrame { - units: WindowFrameUnits::Rows, - start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), - end_bound: WindowFrameBound::CurrentRow, - }), - )) as _, - ]; - let physical_plan = BoundedWindowAggExec::try_new( - window_exprs, - memory_exec, - vec![], - PartitionSearchMode::Sorted, - ) - .map(|e| Arc::new(e) as Arc)?; - - let batches = collect(physical_plan.clone(), ctx.task_ctx()).await?; - - let expected = vec![ - "BoundedWindowAggExec: wdw=[last: Ok(Field { name: \"last\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }, nth_value(-1): Ok(Field { name: \"nth_value(-1)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }, nth_value(-2): Ok(Field { name: \"nth_value(-2)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]", - " MemoryExec: partitions=1, partition_sizes=[3]", - ]; - // Get string representation of the plan - let actual = get_plan_string(&physical_plan); - assert_eq!( - expected, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected = [ - "+---+------+---------------+---------------+", - "| a | last | nth_value(-1) | nth_value(-2) |", - "+---+------+---------------+---------------+", - "| 1 | 1 | 1 | |", - "| 2 | 2 | 2 | 1 |", - "| 3 | 3 | 3 | 2 |", - "| 1 | 1 | 1 | 3 |", - "| 2 | 2 | 2 | 1 |", - "| 3 | 3 | 3 | 2 |", - "| 1 | 1 | 1 | 3 |", - "| 2 | 2 | 2 | 1 |", - "| 3 | 3 | 3 | 2 |", - "+---+------+---------------+---------------+", - ]; - assert_batches_eq!(expected, &batches); - Ok(()) -} diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index bd9bf6fcd641..b3c89122ebad 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -19,6 +19,7 @@ //! functions that can be evaluated at run time during query execution. use std::any::Any; +use std::cmp::Ordering; use std::ops::Range; use std::sync::Arc; @@ -165,17 +166,20 @@ impl PartitionEvaluator for NthValueEvaluator { NthValueKind::Nth(n) => { let n_range = state.window_frame_range.end - state.window_frame_range.start; - #[allow(clippy::comparison_chain)] - if n > 0 { - (n_range >= (n as usize) && size > (n as usize), false) - } else if n < 0 { - let reverse_index = (-n) as usize; - buffer_size = reverse_index; - // Negative index represents reverse direction. - (n_range >= reverse_index, true) - } else { - // The case n = 0 is not valid for the NTH_VALUE function. - unreachable!(); + match n.cmp(&0) { + Ordering::Greater => { + (n_range >= (n as usize) && size > (n as usize), false) + } + Ordering::Less => { + let reverse_index = (-n) as usize; + buffer_size = reverse_index; + // Negative index represents reverse direction. + (n_range >= reverse_index, true) + } + Ordering::Equal => { + // The case n = 0 is not valid for the NTH_VALUE function. + unreachable!(); + } } } }; @@ -209,30 +213,33 @@ impl PartitionEvaluator for NthValueEvaluator { NthValueKind::First => ScalarValue::try_from_array(arr, range.start), NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1), NthValueKind::Nth(n) => { - #[allow(clippy::comparison_chain)] - if n > 0 { - // SQL indices are not 0-based. - let index = (n as usize) - 1; - if index >= n_range { - // Outside the range, return NULL: - ScalarValue::try_from(arr.data_type()) - } else { - ScalarValue::try_from_array(arr, range.start + index) + match n.cmp(&0) { + Ordering::Greater => { + // SQL indices are not 0-based. + let index = (n as usize) - 1; + if index >= n_range { + // Outside the range, return NULL: + ScalarValue::try_from(arr.data_type()) + } else { + ScalarValue::try_from_array(arr, range.start + index) + } } - } else if n < 0 { - let reverse_index = (-n) as usize; - if n_range >= reverse_index { - ScalarValue::try_from_array( - arr, - range.start + n_range - reverse_index, - ) - } else { - // Outside the range, return NULL: - ScalarValue::try_from(arr.data_type()) + Ordering::Less => { + let reverse_index = (-n) as usize; + if n_range >= reverse_index { + ScalarValue::try_from_array( + arr, + range.start + n_range - reverse_index, + ) + } else { + // Outside the range, return NULL: + ScalarValue::try_from(arr.data_type()) + } + } + Ordering::Equal => { + // The case n = 0 is not valid for the NTH_VALUE function. + unreachable!(); } - } else { - // The case n = 0 is not valid for the NTH_VALUE function. - unreachable!(); } } } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index fb679b013863..8156ab1fa31b 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1109,3 +1109,131 @@ fn get_aggregate_result_out_column( result .ok_or_else(|| DataFusionError::Execution("Should contain something".to_string())) } + +#[cfg(test)] +mod tests { + use crate::common::collect; + use crate::memory::MemoryExec; + use crate::windows::{BoundedWindowAggExec, PartitionSearchMode}; + use crate::{get_plan_string, ExecutionPlan}; + use arrow_array::RecordBatch; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_common::{assert_batches_eq, Result, ScalarValue}; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::TaskContext; + use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr::expressions::NthValue; + use datafusion_physical_expr::window::BuiltInWindowExpr; + use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; + use std::sync::Arc; + + // Tests NTH_VALUE(negative index) with memoize feature. + // To be able to trigger memoize feature for NTH_VALUE we need to + // - feed BoundedWindowAggExec with batch stream data. + // - Window frame should contain UNBOUNDED PRECEDING. + // It hard to ensure these conditions are met, from the sql query. + #[tokio::test] + async fn test_window_nth_value_bounded_memoize() -> Result<()> { + let config = SessionConfig::new().with_target_partitions(1); + let task_ctx = Arc::new(TaskContext::default().with_session_config(config)); + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + // Create a new batch of data to insert into the table + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], + )?; + + let memory_exec = MemoryExec::try_new( + &[vec![batch.clone(), batch.clone(), batch.clone()]], + schema.clone(), + None, + ) + .map(|e| Arc::new(e) as Arc)?; + let col_a = col("a", &schema)?; + let nth_value_func1 = + NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1)? + .reverse_expr() + .unwrap(); + let nth_value_func2 = + NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2)? + .reverse_expr() + .unwrap(); + let last_value_func = + Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32)) as _; + let window_exprs = vec![ + // LAST_VALUE(a) + Arc::new(BuiltInWindowExpr::new( + last_value_func, + &[], + &[], + Arc::new(WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + end_bound: WindowFrameBound::CurrentRow, + }), + )) as _, + // NTH_VALUE(a, -1) + Arc::new(BuiltInWindowExpr::new( + nth_value_func1, + &[], + &[], + Arc::new(WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + end_bound: WindowFrameBound::CurrentRow, + }), + )) as _, + // NTH_VALUE(a, -2) + Arc::new(BuiltInWindowExpr::new( + nth_value_func2, + &[], + &[], + Arc::new(WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + end_bound: WindowFrameBound::CurrentRow, + }), + )) as _, + ]; + let physical_plan = BoundedWindowAggExec::try_new( + window_exprs, + memory_exec, + vec![], + PartitionSearchMode::Sorted, + ) + .map(|e| Arc::new(e) as Arc)?; + + let batches = collect(physical_plan.execute(0, task_ctx)?).await?; + + let expected = vec![ + "BoundedWindowAggExec: wdw=[last: Ok(Field { name: \"last\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }, nth_value(-1): Ok(Field { name: \"nth_value(-1)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }, nth_value(-2): Ok(Field { name: \"nth_value(-2)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]", + " MemoryExec: partitions=1, partition_sizes=[3]", + ]; + // Get string representation of the plan + let actual = get_plan_string(&physical_plan); + assert_eq!( + expected, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let expected = [ + "+---+------+---------------+---------------+", + "| a | last | nth_value(-1) | nth_value(-2) |", + "+---+------+---------------+---------------+", + "| 1 | 1 | 1 | |", + "| 2 | 2 | 2 | 1 |", + "| 3 | 3 | 3 | 2 |", + "| 1 | 1 | 1 | 3 |", + "| 2 | 2 | 2 | 1 |", + "| 3 | 3 | 3 | 2 |", + "| 1 | 1 | 1 | 3 |", + "| 2 | 2 | 2 | 1 |", + "| 3 | 3 | 3 | 2 |", + "+---+------+---------------+---------------+", + ]; + assert_batches_eq!(expected, &batches); + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index efbf325ae6a9..8ff6aeab5159 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3493,6 +3493,14 @@ select sum(1) over() x, sum(1) over () y ---- 1 1 +# NTH_VALUE requirement is c DESC, However existing ordering is c ASC +# if we reverse window expression: "NTH_VALUE(c, 2) OVER(order by c DESC ) as nv1" +# as "NTH_VALUE(c, -2) OVER(order by c ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as nv1" +# Please note that: "NTH_VALUE(c, 2) OVER(order by c DESC ) as nv1" is same with +# "NTH_VALUE(c, 2) OVER(order by c DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as nv1" " +# we can produce same result without re-sorting the table. +# Unfortunately since window expression names are string, this change is not seen the plan (we do not do string manipulation). +# TODO: Reflect window expression reversal in the plans. query TT EXPLAIN SELECT c, NTH_VALUE(c, 2) OVER(order by c DESC) as nv1 FROM multiple_ordered_table