diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index a6c2cf700cc4..b9881c9f23cf 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -45,6 +45,8 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use test_utils::add_empty_batches; use datafusion::functions_window::row_number::row_number_udwf; +use datafusion_functions_window::dense_rank::dense_rank_udwf; +use datafusion_functions_window::rank::rank_udwf; use hashbrown::HashMap; use rand::distributions::Alphanumeric; use rand::rngs::StdRng; @@ -224,9 +226,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> { // ) ( // Window function - WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Rank), + WindowFunctionDefinition::WindowUDF(rank_udwf()), // its name - "RANK", + "rank", // no argument vec![], // Expected causality, for None cases causality will be determined from window frame boundaries @@ -238,11 +240,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> { // ) ( // Window function - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::DenseRank, - ), + WindowFunctionDefinition::WindowUDF(dense_rank_udwf()), // its name - "DENSE_RANK", + "dense_rank", // no argument vec![], // Expected causality, for None cases causality will be determined from window frame boundaries @@ -382,19 +382,12 @@ fn get_random_function( ); window_fn_map.insert( "rank", - ( - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::Rank, - ), - vec![], - ), + (WindowFunctionDefinition::WindowUDF(rank_udwf()), vec![]), ); window_fn_map.insert( "dense_rank", ( - WindowFunctionDefinition::BuiltInWindowFunction( - BuiltInWindowFunction::DenseRank, - ), + WindowFunctionDefinition::WindowUDF(dense_rank_udwf()), vec![], ), ); diff --git a/datafusion/expr/src/built_in_window_function.rs b/datafusion/expr/src/built_in_window_function.rs index b136d6cacec8..117ff08253b6 100644 --- a/datafusion/expr/src/built_in_window_function.rs +++ b/datafusion/expr/src/built_in_window_function.rs @@ -40,12 +40,6 @@ impl fmt::Display for BuiltInWindowFunction { /// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL) #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)] pub enum BuiltInWindowFunction { - /// rank of the current row with gaps; same as row_number of its first peer - Rank, - /// rank of the current row without gaps; this function counts peer groups - DenseRank, - /// relative rank of the current row: (rank - 1) / (total rows - 1) - PercentRank, /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows) CumeDist, /// integer ranging from 1 to the argument value, dividing the partition as equally as possible @@ -72,9 +66,6 @@ impl BuiltInWindowFunction { pub fn name(&self) -> &str { use BuiltInWindowFunction::*; match self { - Rank => "RANK", - DenseRank => "DENSE_RANK", - PercentRank => "PERCENT_RANK", CumeDist => "CUME_DIST", Ntile => "NTILE", Lag => "LAG", @@ -90,9 +81,6 @@ impl FromStr for BuiltInWindowFunction { type Err = DataFusionError; fn from_str(name: &str) -> Result { Ok(match name.to_uppercase().as_str() { - "RANK" => BuiltInWindowFunction::Rank, - "DENSE_RANK" => BuiltInWindowFunction::DenseRank, - "PERCENT_RANK" => BuiltInWindowFunction::PercentRank, "CUME_DIST" => BuiltInWindowFunction::CumeDist, "NTILE" => BuiltInWindowFunction::Ntile, "LAG" => BuiltInWindowFunction::Lag, @@ -127,12 +115,8 @@ impl BuiltInWindowFunction { })?; match self { - BuiltInWindowFunction::Rank - | BuiltInWindowFunction::DenseRank - | BuiltInWindowFunction::Ntile => Ok(DataType::UInt64), - BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => { - Ok(DataType::Float64) - } + BuiltInWindowFunction::Ntile => Ok(DataType::UInt64), + BuiltInWindowFunction::CumeDist => Ok(DataType::Float64), BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead | BuiltInWindowFunction::FirstValue @@ -145,10 +129,7 @@ impl BuiltInWindowFunction { pub fn signature(&self) -> Signature { // note: the physical expression must accept the type returned by this function or the execution panics. match self { - BuiltInWindowFunction::Rank - | BuiltInWindowFunction::DenseRank - | BuiltInWindowFunction::PercentRank - | BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable), + BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable), BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => { Signature::one_of( vec![ diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 02a2edb98016..85a843042974 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -2628,9 +2628,6 @@ mod test { #[test] fn test_window_function_case_insensitive() -> Result<()> { let names = vec![ - "rank", - "dense_rank", - "percent_rank", "cume_dist", "ntile", "lag", diff --git a/datafusion/expr/src/window_function.rs b/datafusion/expr/src/window_function.rs index a80718147c3a..7ac6fb7d167c 100644 --- a/datafusion/expr/src/window_function.rs +++ b/datafusion/expr/src/window_function.rs @@ -19,27 +19,6 @@ use datafusion_common::ScalarValue; use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal}; -/// Create an expression to represent the `rank` window function -pub fn rank() -> Expr { - Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Rank, vec![])) -} - -/// Create an expression to represent the `dense_rank` window function -pub fn dense_rank() -> Expr { - Expr::WindowFunction(WindowFunction::new( - BuiltInWindowFunction::DenseRank, - vec![], - )) -} - -/// Create an expression to represent the `percent_rank` window function -pub fn percent_rank() -> Expr { - Expr::WindowFunction(WindowFunction::new( - BuiltInWindowFunction::PercentRank, - vec![], - )) -} - /// Create an expression to represent the `cume_dist` window function pub fn cume_dist() -> Expr { Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::CumeDist, vec![])) diff --git a/datafusion/functions-window/src/dense_rank.rs b/datafusion/functions-window/src/dense_rank.rs new file mode 100644 index 000000000000..a0a020cf1287 --- /dev/null +++ b/datafusion/functions-window/src/dense_rank.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expression for `dense_rank` that can evaluated at runtime during query execution + +use std::any::Any; +use std::fmt::Debug; +use std::ops::Range; + +use datafusion_common::arrow::array::ArrayRef; +use datafusion_common::arrow::array::UInt64Array; +use datafusion_common::arrow::compute::SortOptions; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_common::arrow::datatypes::Field; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::expr::WindowFunction; +use datafusion_expr::{Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; +use datafusion_functions_window_common::field; +use field::WindowUDFFieldArgs; + +/// Create a [`WindowFunction`](Expr::WindowFunction) expression for +/// `dense_rank` user-defined window function. +pub fn dense_rank() -> Expr { + Expr::WindowFunction(WindowFunction::new(dense_rank_udwf(), vec![])) +} + +/// Singleton instance of `dense_rank`, ensures the UDWF is only created once. +#[allow(non_upper_case_globals)] +static STATIC_DenseNumber: std::sync::OnceLock< + std::sync::Arc, +> = std::sync::OnceLock::new(); + +/// Returns a [`WindowUDF`](datafusion_expr::WindowUDF) for `dense_rank` +/// user-defined window function. +pub fn dense_rank_udwf() -> std::sync::Arc { + STATIC_DenseNumber + .get_or_init(|| { + std::sync::Arc::new(datafusion_expr::WindowUDF::from(DenseNumber::default())) + }) + .clone() +} + +/// dense_rank expression +#[derive(Debug)] +pub struct DenseNumber { + signature: Signature, +} + +impl DenseNumber { + /// Create a new `dense_rank` function + pub fn new() -> Self { + Self { + signature: Signature::any(0, Volatility::Immutable), + } + } +} + +impl Default for DenseNumber { + fn default() -> Self { + Self::new() + } +} + +impl WindowUDFImpl for DenseNumber { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "dense_rank" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn partition_evaluator(&self) -> Result> { + Ok(Box::::default()) + } + + fn field(&self, field_args: WindowUDFFieldArgs) -> Result { + Ok(Field::new(field_args.name(), DataType::UInt64, false)) + } + + fn sort_options(&self) -> Option { + Some(SortOptions { + descending: false, + nulls_first: false, + }) + } +} + +/// State for the `dense_rank` built-in window function. +#[derive(Debug, Default)] +struct NumDensesEvaluator { + n_rows: usize, +} + +impl PartitionEvaluator for NumDensesEvaluator { + fn is_causal(&self) -> bool { + // The dense_rank function doesn't need "future" values to emit results: + true + } + + fn evaluate_all( + &mut self, + _values: &[ArrayRef], + num_rows: usize, + ) -> Result { + Ok(std::sync::Arc::new(UInt64Array::from_iter_values( + 1..(num_rows as u64) + 1, + ))) + } + + fn evaluate( + &mut self, + _values: &[ArrayRef], + _range: &Range, + ) -> Result { + self.n_rows += 1; + Ok(ScalarValue::UInt64(Some(self.n_rows as u64))) + } + + fn supports_bounded_execution(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion_common::arrow::array::{Array, BooleanArray}; + use datafusion_common::cast::as_uint64_array; + + use super::*; + + #[test] + fn dense_rank_all_null() -> Result<()> { + let values: ArrayRef = Arc::new(BooleanArray::from(vec![ + None, None, None, None, None, None, None, None, + ])); + let num_rows = values.len(); + + let actual = DenseNumber::default() + .partition_evaluator()? + .evaluate_all(&[values], num_rows)?; + let actual = as_uint64_array(&actual)?; + + assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values()); + Ok(()) + } + + #[test] + fn dense_rank_all_values() -> Result<()> { + let values: ArrayRef = Arc::new(BooleanArray::from(vec![ + true, false, true, false, false, true, false, true, + ])); + let num_rows = values.len(); + + let actual = DenseNumber::default() + .partition_evaluator()? + .evaluate_all(&[values], num_rows)?; + let actual = as_uint64_array(&actual)?; + + assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values()); + Ok(()) + } +} diff --git a/datafusion/functions-window/src/lib.rs b/datafusion/functions-window/src/lib.rs index 790a500f1f3f..2f5e2729de6b 100644 --- a/datafusion/functions-window/src/lib.rs +++ b/datafusion/functions-window/src/lib.rs @@ -29,16 +29,27 @@ use log::debug; use datafusion_expr::registry::FunctionRegistry; use datafusion_expr::WindowUDF; +pub mod dense_rank; +pub mod percent_rank; +pub mod rank; pub mod row_number; /// Fluent-style API for creating `Expr`s pub mod expr_fn { + pub use super::dense_rank::dense_rank; + pub use super::percent_rank::percent_rank; + pub use super::rank::rank; pub use super::row_number::row_number; } /// Returns all default window functions pub fn all_default_window_functions() -> Vec> { - vec![row_number::row_number_udwf()] + vec![ + row_number::row_number_udwf(), + rank::rank_udwf(), + dense_rank::dense_rank_udwf(), + percent_rank::percent_rank_udwf(), + ] } /// Registers all enabled packages with a [`FunctionRegistry`] pub fn register_all( diff --git a/datafusion/functions-window/src/percent_rank.rs b/datafusion/functions-window/src/percent_rank.rs new file mode 100644 index 000000000000..745bac267abd --- /dev/null +++ b/datafusion/functions-window/src/percent_rank.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expression for `percent_rank` that can evaluated at runtime during query execution + +use std::any::Any; +use std::fmt::Debug; +use std::ops::Range; + +use datafusion_common::arrow::array::ArrayRef; +use datafusion_common::arrow::array::UInt64Array; +use datafusion_common::arrow::compute::SortOptions; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_common::arrow::datatypes::Field; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::expr::WindowFunction; +use datafusion_expr::{Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; +use datafusion_functions_window_common::field; +use field::WindowUDFFieldArgs; + +/// Create a [`WindowFunction`](Expr::WindowFunction) expression for +/// `percent_rank` user-defined window function. +pub fn percent_rank() -> Expr { + Expr::WindowFunction(WindowFunction::new(percent_rank_udwf(), vec![])) +} + +/// Singleton instance of `percent_rank`, ensures the UDWF is only created once. +#[allow(non_upper_case_globals)] +static STATIC_RowNumber: std::sync::OnceLock> = + std::sync::OnceLock::new(); + +/// Returns a [`WindowUDF`](datafusion_expr::WindowUDF) for `percent_rank` +/// user-defined window function. +pub fn percent_rank_udwf() -> std::sync::Arc { + STATIC_RowNumber + .get_or_init(|| { + std::sync::Arc::new(datafusion_expr::WindowUDF::from(RowNumber::default())) + }) + .clone() +} + +/// percent_rank expression +#[derive(Debug)] +pub struct RowNumber { + signature: Signature, +} + +impl RowNumber { + /// Create a new `percent_rank` function + pub fn new() -> Self { + Self { + signature: Signature::any(0, Volatility::Immutable), + } + } +} + +impl Default for RowNumber { + fn default() -> Self { + Self::new() + } +} + +impl WindowUDFImpl for RowNumber { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "percent_rank" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn partition_evaluator(&self) -> Result> { + Ok(Box::::default()) + } + + fn field(&self, field_args: WindowUDFFieldArgs) -> Result { + Ok(Field::new(field_args.name(), DataType::UInt64, false)) + } + + fn sort_options(&self) -> Option { + Some(SortOptions { + descending: false, + nulls_first: false, + }) + } +} + +/// State for the `percent_rank` built-in window function. +#[derive(Debug, Default)] +struct NumRowsEvaluator { + n_rows: usize, +} + +impl PartitionEvaluator for NumRowsEvaluator { + fn is_causal(&self) -> bool { + // The percent_rank function doesn't need "future" values to emit results: + true + } + + fn evaluate_all( + &mut self, + _values: &[ArrayRef], + num_rows: usize, + ) -> Result { + Ok(std::sync::Arc::new(UInt64Array::from_iter_values( + 1..(num_rows as u64) + 1, + ))) + } + + fn evaluate( + &mut self, + _values: &[ArrayRef], + _range: &Range, + ) -> Result { + self.n_rows += 1; + Ok(ScalarValue::UInt64(Some(self.n_rows as u64))) + } + + fn supports_bounded_execution(&self) -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion_common::arrow::array::{Array, BooleanArray}; + use datafusion_common::cast::as_uint64_array; + + use super::*; + + #[test] + fn percent_rank_all_null() -> Result<()> { + let values: ArrayRef = Arc::new(BooleanArray::from(vec![ + None, None, None, None, None, None, None, None, + ])); + let num_rows = values.len(); + + let actual = RowNumber::default() + .partition_evaluator()? + .evaluate_all(&[values], num_rows)?; + let actual = as_uint64_array(&actual)?; + + assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values()); + Ok(()) + } + + #[test] + fn percent_rank_all_values() -> Result<()> { + let values: ArrayRef = Arc::new(BooleanArray::from(vec![ + true, false, true, false, false, true, false, true, + ])); + let num_rows = values.len(); + + let actual = RowNumber::default() + .partition_evaluator()? + .evaluate_all(&[values], num_rows)?; + let actual = as_uint64_array(&actual)?; + + assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values()); + Ok(()) + } +} diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs new file mode 100644 index 000000000000..feebe170d2ea --- /dev/null +++ b/datafusion/functions-window/src/rank.rs @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expression for `rank` that can evaluated at runtime during query execution + +use std::any::Any; +use std::fmt::Debug; +use std::iter; +use std::ops::Range; +use std::sync::Arc; + +use datafusion_common::arrow::array::ArrayRef; +use datafusion_common::arrow::array::UInt64Array; +use datafusion_common::arrow::compute::SortOptions; +use datafusion_common::arrow::datatypes::DataType; +use datafusion_common::arrow::datatypes::Field; +use datafusion_common::utils::get_row_at_idx; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::expr::WindowFunction; +use datafusion_expr::{Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; +use datafusion_functions_window_common::field; +use field::WindowUDFFieldArgs; + +/// Create a [`WindowFunction`](Expr::WindowFunction) expression for +/// `rank` user-defined window function. +pub fn rank() -> Expr { + Expr::WindowFunction(WindowFunction::new(rank_udwf(), vec![])) +} + +/// Singleton instance of `rank`, ensures the UDWF is only created once. +#[allow(non_upper_case_globals)] +static STATIC_Rank: std::sync::OnceLock> = + std::sync::OnceLock::new(); + +/// Returns a [`WindowUDF`](datafusion_expr::WindowUDF) for `rank` +/// user-defined window function. +pub fn rank_udwf() -> std::sync::Arc { + STATIC_Rank + .get_or_init(|| { + std::sync::Arc::new(datafusion_expr::WindowUDF::from(Rank::default())) + }) + .clone() +} + +/// rank expression +#[derive(Debug)] +pub struct Rank { + signature: Signature, +} + +impl Rank { + /// Create a new `rank` function + pub fn new() -> Self { + Self { + signature: Signature::any(0, Volatility::Immutable), + } + } +} + +impl Default for Rank { + fn default() -> Self { + Self::new() + } +} + +impl WindowUDFImpl for Rank { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "rank" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn partition_evaluator(&self) -> Result> { + Ok(Box::::default()) + } + + fn field(&self, field_args: WindowUDFFieldArgs) -> Result { + Ok(Field::new(field_args.name(), DataType::UInt64, false)) + } + + fn sort_options(&self) -> Option { + Some(SortOptions { + descending: false, + nulls_first: false, + }) + } +} + +// only use the default implementation for the partition evaluator +// todo!(); +/// State for the RANK(rank) built-in window function. +#[derive(Debug, Clone, Default)] +pub struct RankState { + /// The last values for rank as these values change, we increase n_rank + pub last_rank_data: Option>, + /// The index where last_rank_boundary is started + pub last_rank_boundary: usize, + /// Keep the number of entries in current rank + pub current_group_count: usize, + /// Rank number kept from the start + pub n_rank: usize, +} + +#[derive(Debug, Copy, Clone)] +pub enum RankType { + Basic, + Dense, + Percent, +} + +/// State for the `rank` built-in window function. +#[derive(Debug, Default)] +struct RankEvaluator { + state: RankState, +} + +impl PartitionEvaluator for RankEvaluator { + fn is_causal(&self) -> bool { + // The rank function doesn't need "future" values to emit results: + true + } + + fn evaluate( + &mut self, + values: &[ArrayRef], + range: &Range, + ) -> Result { + let row_idx = range.start; + // There is no argument, values are order by column values (where rank is calculated) + let range_columns = values; + let last_rank_data = get_row_at_idx(range_columns, row_idx)?; + let new_rank_encountered = + if let Some(state_last_rank_data) = &self.state.last_rank_data { + // if rank data changes, new rank is encountered + state_last_rank_data != &last_rank_data + } else { + // First rank seen + true + }; + if new_rank_encountered { + self.state.last_rank_data = Some(last_rank_data); + self.state.last_rank_boundary += self.state.current_group_count; + self.state.current_group_count = 1; + self.state.n_rank += 1; + } else { + // data is still in the same rank + self.state.current_group_count += 1; + } + + Ok(ScalarValue::UInt64(Some( + self.state.last_rank_boundary as u64 + 1, + ))) + } + + fn evaluate_all_with_rank( + &self, + _num_rows: usize, + ranks_in_partition: &[Range], + ) -> Result { + let result = Arc::new(UInt64Array::from_iter_values( + ranks_in_partition + .iter() + .scan(1_u64, |acc, range| { + let len = range.end - range.start; + let result = iter::repeat(*acc).take(len); + *acc += len as u64; + Some(result) + }) + .flatten(), + )); + + Ok(result) + } + + fn supports_bounded_execution(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_common::cast::{as_float64_array, as_uint64_array}; + + use super::*; + + fn test_with_rank(expr: &Rank, expected: Vec) -> Result<()> { + test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected) + } + + #[allow(clippy::single_range_in_vec_init)] + fn test_without_rank(expr: &Rank, expected: Vec) -> Result<()> { + test_i32_result(expr, vec![0..8], expected) + } + + fn test_f64_result( + expr: &Rank, + num_rows: usize, + ranks: Vec>, + expected: Vec, + ) -> Result<()> { + let result = expr + .partition_evaluator()? + .evaluate_all_with_rank(num_rows, &ranks)?; + let result = as_float64_array(&result)?; + let result = result.values(); + assert_eq!(expected, *result); + Ok(()) + } + + fn test_i32_result( + expr: &Rank, + ranks: Vec>, + expected: Vec, + ) -> Result<()> { + let result = expr + .partition_evaluator()? + .evaluate_all_with_rank(8, &ranks)?; + let result = as_uint64_array(&result)?; + let result = result.values(); + assert_eq!(expected, *result); + Ok(()) + } + + #[test] + fn test_rank() -> Result<()> { + let r = Rank::default(); + test_with_rank(&r, vec![1; 8])?; + // test_without_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 9])?; + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 177fd799ae79..e07e11e43199 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -39,7 +39,6 @@ pub use crate::window::cume_dist::{cume_dist, CumeDist}; pub use crate::window::lead_lag::{lag, lead, WindowShift}; pub use crate::window::nth_value::NthValue; pub use crate::window::ntile::Ntile; -pub use crate::window::rank::{dense_rank, percent_rank, rank, Rank, RankType}; pub use crate::PhysicalSortExpr; pub use binary::{binary, similar_to, BinaryExpr}; diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs index 2aeb05333102..938bdac50f97 100644 --- a/datafusion/physical-expr/src/window/mod.rs +++ b/datafusion/physical-expr/src/window/mod.rs @@ -22,7 +22,6 @@ pub(crate) mod cume_dist; pub(crate) mod lead_lag; pub(crate) mod nth_value; pub(crate) mod ntile; -pub(crate) mod rank; mod sliding_aggregate; mod window_expr; diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 8f6f78df8cb8..46c46fab68c5 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -530,19 +530,6 @@ pub enum WindowFn { Aggregate(Box), } -/// State for the RANK(percent_rank, rank, dense_rank) built-in window function. -#[derive(Debug, Clone, Default)] -pub struct RankState { - /// The last values for rank as these values change, we increase n_rank - pub last_rank_data: Option>, - /// The index where last_rank_boundary is started - pub last_rank_boundary: usize, - /// Keep the number of entries in current rank - pub current_group_count: usize, - /// Rank number kept from the start - pub n_rank: usize, -} - /// Tag to differentiate special use cases of the NTH_VALUE built-in window function. #[derive(Debug, Copy, Clone)] pub enum NthValueKind { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index b6f34ec69f68..7c86b4e9cb55 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -21,10 +21,7 @@ use std::borrow::Borrow; use std::sync::Arc; use crate::{ - expressions::{ - cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, NthValue, Ntile, - PhysicalSortExpr, - }, + expressions::{cume_dist, lag, lead, Literal, NthValue, Ntile, PhysicalSortExpr}, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr, }; @@ -224,9 +221,6 @@ fn create_built_in_window_expr( let out_data_type: &DataType = input_schema.field_with_name(&name)?.data_type(); Ok(match fun { - BuiltInWindowFunction::Rank => Arc::new(rank(name, out_data_type)), - BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name, out_data_type)), - BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name, out_data_type)), BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name, out_data_type)), BuiltInWindowFunction::Ntile => { let n = get_scalar_value_from_args(args, 0)?.ok_or_else(|| { diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e36c91e7d004..65b32d337b15 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -508,6 +508,9 @@ message ScalarUDFExprNode { enum BuiltInWindowFunction { UNSPECIFIED = 0; // https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum // ROW_NUMBER = 0; + // TODO + // if i will remove this + // should i renumber all variables? RANK = 1; DENSE_RANK = 2; PERCENT_RANK = 3; @@ -739,7 +742,7 @@ message FileSinkConfig { datafusion_common.Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; bool keep_partition_by_columns = 9; - InsertOp insert_op = 10; + InsertOp insert_op = 10; } enum InsertOp { diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 436347330d92..49d33a62dc06 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -117,10 +117,11 @@ pub struct ListingTableScanNode { pub target_partitions: u32, #[prost(message, repeated, tag = "13")] pub file_sort_order: ::prost::alloc::vec::Vec, - #[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 12, 15")] - pub file_format_type: ::core::option::Option< - listing_table_scan_node::FileFormatType, - >, + #[prost( + oneof = "listing_table_scan_node::FileFormatType", + tags = "10, 11, 12, 15" + )] + pub file_format_type: ::core::option::Option, } /// Nested message and enum types in `ListingTableScanNode`. pub mod listing_table_scan_node { @@ -254,10 +255,8 @@ pub struct CreateExternalTableNode { #[prost(message, optional, tag = "12")] pub constraints: ::core::option::Option, #[prost(map = "string, message", tag = "13")] - pub column_defaults: ::std::collections::HashMap< - ::prost::alloc::string::String, - LogicalExprNode, - >, + pub column_defaults: + ::std::collections::HashMap<::prost::alloc::string::String, LogicalExprNode>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct PrepareNode { @@ -957,9 +956,7 @@ pub struct FullTableReference { #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableReference { #[prost(oneof = "table_reference::TableReferenceEnum", tags = "1, 2, 3")] - pub table_reference_enum: ::core::option::Option< - table_reference::TableReferenceEnum, - >, + pub table_reference_enum: ::core::option::Option, } /// Nested message and enum types in `TableReference`. pub mod table_reference { @@ -1077,9 +1074,8 @@ pub struct JsonSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub writer_options: ::core::option::Option< - super::datafusion_common::JsonWriterOptions, - >, + pub writer_options: + ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct JsonSinkExecNode { @@ -1097,9 +1093,8 @@ pub struct CsvSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub writer_options: ::core::option::Option< - super::datafusion_common::CsvWriterOptions, - >, + pub writer_options: + ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvSinkExecNode { @@ -1117,9 +1112,8 @@ pub struct ParquetSink { #[prost(message, optional, tag = "1")] pub config: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub parquet_options: ::core::option::Option< - super::datafusion_common::TableParquetOptions, - >, + pub parquet_options: + ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetSinkExecNode { @@ -1239,9 +1233,8 @@ pub struct PhysicalAggregateExprNode { #[prost(bytes = "vec", optional, tag = "7")] pub fun_definition: ::core::option::Option<::prost::alloc::vec::Vec>, #[prost(oneof = "physical_aggregate_expr_node::AggregateFunction", tags = "4")] - pub aggregate_function: ::core::option::Option< - physical_aggregate_expr_node::AggregateFunction, - >, + pub aggregate_function: + ::core::option::Option, } /// Nested message and enum types in `PhysicalAggregateExprNode`. pub mod physical_aggregate_expr_node { @@ -1266,9 +1259,8 @@ pub struct PhysicalWindowExprNode { #[prost(bytes = "vec", optional, tag = "9")] pub fun_definition: ::core::option::Option<::prost::alloc::vec::Vec>, #[prost(oneof = "physical_window_expr_node::WindowFunction", tags = "2, 3")] - pub window_function: ::core::option::Option< - physical_window_expr_node::WindowFunction, - >, + pub window_function: + ::core::option::Option, } /// Nested message and enum types in `PhysicalWindowExprNode`. pub mod physical_window_expr_node { @@ -1787,9 +1779,7 @@ pub struct PartitionedFile { #[prost(uint64, tag = "3")] pub last_modified_ns: u64, #[prost(message, repeated, tag = "4")] - pub partition_values: ::prost::alloc::vec::Vec< - super::datafusion_common::ScalarValue, - >, + pub partition_values: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "5")] pub range: ::core::option::Option, #[prost(message, optional, tag = "6")] @@ -1813,15 +1803,17 @@ pub struct PartitionStats { #[prost(message, repeated, tag = "4")] pub column_stats: ::prost::alloc::vec::Vec, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum BuiltInWindowFunction { /// Unspecified = 0, /// ROW_NUMBER = 0; - Rank = 1, - DenseRank = 2, - PercentRank = 3, + // Rank = 1, + // DenseRank = 2, + // PercentRank = 3, CumeDist = 4, Ntile = 5, Lag = 6, @@ -1838,9 +1830,6 @@ impl BuiltInWindowFunction { pub fn as_str_name(&self) -> &'static str { match self { Self::Unspecified => "UNSPECIFIED", - Self::Rank => "RANK", - Self::DenseRank => "DENSE_RANK", - Self::PercentRank => "PERCENT_RANK", Self::CumeDist => "CUME_DIST", Self::Ntile => "NTILE", Self::Lag => "LAG", @@ -1854,9 +1843,6 @@ impl BuiltInWindowFunction { pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "UNSPECIFIED" => Some(Self::Unspecified), - "RANK" => Some(Self::Rank), - "DENSE_RANK" => Some(Self::DenseRank), - "PERCENT_RANK" => Some(Self::PercentRank), "CUME_DIST" => Some(Self::CumeDist), "NTILE" => Some(Self::Ntile), "LAG" => Some(Self::Lag), @@ -1868,7 +1854,9 @@ impl BuiltInWindowFunction { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum WindowFrameUnits { Rows = 0, @@ -1897,7 +1885,9 @@ impl WindowFrameUnits { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum WindowFrameBoundType { CurrentRow = 0, @@ -1926,7 +1916,9 @@ impl WindowFrameBoundType { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum DateUnit { Day = 0, @@ -1952,7 +1944,9 @@ impl DateUnit { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum InsertOp { Append = 0, @@ -1981,7 +1975,9 @@ impl InsertOp { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum PartitionMode { CollectLeft = 0, @@ -2010,7 +2006,9 @@ impl PartitionMode { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum StreamPartitionMode { SinglePartition = 0, @@ -2036,7 +2034,9 @@ impl StreamPartitionMode { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum AggregateMode { Partial = 0, diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 893255ccc8ce..32e1b68203ce 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -142,9 +142,6 @@ impl From for BuiltInWindowFunction { fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self { match built_in_function { protobuf::BuiltInWindowFunction::Unspecified => todo!(), - protobuf::BuiltInWindowFunction::Rank => Self::Rank, - protobuf::BuiltInWindowFunction::PercentRank => Self::PercentRank, - protobuf::BuiltInWindowFunction::DenseRank => Self::DenseRank, protobuf::BuiltInWindowFunction::Lag => Self::Lag, protobuf::BuiltInWindowFunction::Lead => Self::Lead, protobuf::BuiltInWindowFunction::FirstValue => Self::FirstValue, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 63d1a007c1e5..07823b422f71 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -119,11 +119,8 @@ impl From<&BuiltInWindowFunction> for protobuf::BuiltInWindowFunction { BuiltInWindowFunction::NthValue => Self::NthValue, BuiltInWindowFunction::Ntile => Self::Ntile, BuiltInWindowFunction::CumeDist => Self::CumeDist, - BuiltInWindowFunction::PercentRank => Self::PercentRank, - BuiltInWindowFunction::Rank => Self::Rank, BuiltInWindowFunction::Lag => Self::Lag, BuiltInWindowFunction::Lead => Self::Lead, - BuiltInWindowFunction::DenseRank => Self::DenseRank, } } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 6f6065a1c284..85d4fe8a16d0 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -24,8 +24,8 @@ use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, InListExpr, IsNotNullExpr, - IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, Rank, RankType, - TryCastExpr, WindowShift, + IsNullExpr, Literal, NegativeExpr, NotExpr, NthValue, Ntile, TryCastExpr, + WindowShift, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; @@ -109,14 +109,7 @@ pub fn serialize_physical_window_expr( let expr = built_in_window_expr.get_built_in_func_expr(); let built_in_fn_expr = expr.as_any(); - let builtin_fn = if let Some(rank_expr) = built_in_fn_expr.downcast_ref::() - { - match rank_expr.get_type() { - RankType::Basic => protobuf::BuiltInWindowFunction::Rank, - RankType::Dense => protobuf::BuiltInWindowFunction::DenseRank, - RankType::Percent => protobuf::BuiltInWindowFunction::PercentRank, - } - } else if built_in_fn_expr.downcast_ref::().is_some() { + let builtin_fn = if built_in_fn_expr.downcast_ref::().is_some() { protobuf::BuiltInWindowFunction::CumeDist } else if let Some(ntile_expr) = built_in_fn_expr.downcast_ref::() { args.insert(