Skip to content

Commit

Permalink
wip: converting rank builtin function to UDWF
Browse files Browse the repository at this point in the history
  • Loading branch information
jatin committed Oct 2, 2024
1 parent ddb4fac commit 05fd8bc
Show file tree
Hide file tree
Showing 17 changed files with 694 additions and 147 deletions.
23 changes: 8 additions & 15 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

use datafusion::functions_window::row_number::row_number_udwf;
use datafusion_functions_window::dense_rank::dense_rank_udwf;
use datafusion_functions_window::rank::rank_udwf;
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
Expand Down Expand Up @@ -224,9 +226,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Rank),
WindowFunctionDefinition::WindowUDF(rank_udwf()),
// its name
"RANK",
"rank",
// no argument
vec![],
// Expected causality, for None cases causality will be determined from window frame boundaries
Expand All @@ -238,11 +240,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::DenseRank,
),
WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
// its name
"DENSE_RANK",
"dense_rank",
// no argument
vec![],
// Expected causality, for None cases causality will be determined from window frame boundaries
Expand Down Expand Up @@ -382,19 +382,12 @@ fn get_random_function(
);
window_fn_map.insert(
"rank",
(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::Rank,
),
vec![],
),
(WindowFunctionDefinition::WindowUDF(rank_udwf()), vec![]),
);
window_fn_map.insert(
"dense_rank",
(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::DenseRank,
),
WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
vec![],
),
);
Expand Down
25 changes: 3 additions & 22 deletions datafusion/expr/src/built_in_window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// rank of the current row with gaps; same as row_number of its first peer
Rank,
/// rank of the current row without gaps; this function counts peer groups
DenseRank,
/// relative rank of the current row: (rank - 1) / (total rows - 1)
PercentRank,
/// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
CumeDist,
/// integer ranging from 1 to the argument value, dividing the partition as equally as possible
Expand All @@ -72,9 +66,6 @@ impl BuiltInWindowFunction {
pub fn name(&self) -> &str {
use BuiltInWindowFunction::*;
match self {
Rank => "RANK",
DenseRank => "DENSE_RANK",
PercentRank => "PERCENT_RANK",
CumeDist => "CUME_DIST",
Ntile => "NTILE",
Lag => "LAG",
Expand All @@ -90,9 +81,6 @@ impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name.to_uppercase().as_str() {
"RANK" => BuiltInWindowFunction::Rank,
"DENSE_RANK" => BuiltInWindowFunction::DenseRank,
"PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
"CUME_DIST" => BuiltInWindowFunction::CumeDist,
"NTILE" => BuiltInWindowFunction::Ntile,
"LAG" => BuiltInWindowFunction::Lag,
Expand Down Expand Up @@ -127,12 +115,8 @@ impl BuiltInWindowFunction {
})?;

match self {
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Ok(DataType::Float64)
}
BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::CumeDist => Ok(DataType::Float64),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
Expand All @@ -145,10 +129,7 @@ impl BuiltInWindowFunction {
pub fn signature(&self) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
match self {
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => {
Signature::one_of(
vec![
Expand Down
3 changes: 0 additions & 3 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2628,9 +2628,6 @@ mod test {
#[test]
fn test_window_function_case_insensitive() -> Result<()> {
let names = vec![
"rank",
"dense_rank",
"percent_rank",
"cume_dist",
"ntile",
"lag",
Expand Down
21 changes: 0 additions & 21 deletions datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,6 @@ use datafusion_common::ScalarValue;

use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};

/// Create an expression to represent the `rank` window function
pub fn rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Rank, vec![]))
}

/// Create an expression to represent the `dense_rank` window function
pub fn dense_rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(
BuiltInWindowFunction::DenseRank,
vec![],
))
}

/// Create an expression to represent the `percent_rank` window function
pub fn percent_rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(
BuiltInWindowFunction::PercentRank,
vec![],
))
}

/// Create an expression to represent the `cume_dist` window function
pub fn cume_dist() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::CumeDist, vec![]))
Expand Down
183 changes: 183 additions & 0 deletions datafusion/functions-window/src/dense_rank.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expression for `dense_rank` that can evaluated at runtime during query execution

use std::any::Any;
use std::fmt::Debug;
use std::ops::Range;

use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::array::UInt64Array;
use datafusion_common::arrow::compute::SortOptions;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::arrow::datatypes::Field;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::{Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl};
use datafusion_functions_window_common::field;
use field::WindowUDFFieldArgs;

/// Create a [`WindowFunction`](Expr::WindowFunction) expression for
/// `dense_rank` user-defined window function.
pub fn dense_rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(dense_rank_udwf(), vec![]))
}

/// Singleton instance of `dense_rank`, ensures the UDWF is only created once.
#[allow(non_upper_case_globals)]
static STATIC_DenseNumber: std::sync::OnceLock<
std::sync::Arc<datafusion_expr::WindowUDF>,
> = std::sync::OnceLock::new();

/// Returns a [`WindowUDF`](datafusion_expr::WindowUDF) for `dense_rank`
/// user-defined window function.
pub fn dense_rank_udwf() -> std::sync::Arc<datafusion_expr::WindowUDF> {
STATIC_DenseNumber
.get_or_init(|| {
std::sync::Arc::new(datafusion_expr::WindowUDF::from(DenseNumber::default()))
})
.clone()
}

/// dense_rank expression
#[derive(Debug)]
pub struct DenseNumber {
signature: Signature,
}

impl DenseNumber {
/// Create a new `dense_rank` function
pub fn new() -> Self {
Self {
signature: Signature::any(0, Volatility::Immutable),
}
}
}

impl Default for DenseNumber {
fn default() -> Self {
Self::new()
}
}

impl WindowUDFImpl for DenseNumber {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"dense_rank"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::<NumDensesEvaluator>::default())
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::UInt64, false))
}

fn sort_options(&self) -> Option<SortOptions> {
Some(SortOptions {
descending: false,
nulls_first: false,
})
}
}

/// State for the `dense_rank` built-in window function.
#[derive(Debug, Default)]
struct NumDensesEvaluator {
n_rows: usize,
}

impl PartitionEvaluator for NumDensesEvaluator {
fn is_causal(&self) -> bool {
// The dense_rank function doesn't need "future" values to emit results:
true
}

fn evaluate_all(
&mut self,
_values: &[ArrayRef],
num_rows: usize,
) -> Result<ArrayRef> {
Ok(std::sync::Arc::new(UInt64Array::from_iter_values(
1..(num_rows as u64) + 1,
)))
}

fn evaluate(
&mut self,
_values: &[ArrayRef],
_range: &Range<usize>,
) -> Result<ScalarValue> {
self.n_rows += 1;
Ok(ScalarValue::UInt64(Some(self.n_rows as u64)))
}

fn supports_bounded_execution(&self) -> bool {
true
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion_common::arrow::array::{Array, BooleanArray};
use datafusion_common::cast::as_uint64_array;

use super::*;

#[test]
fn dense_rank_all_null() -> Result<()> {
let values: ArrayRef = Arc::new(BooleanArray::from(vec![
None, None, None, None, None, None, None, None,
]));
let num_rows = values.len();

let actual = DenseNumber::default()
.partition_evaluator()?
.evaluate_all(&[values], num_rows)?;
let actual = as_uint64_array(&actual)?;

assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values());
Ok(())
}

#[test]
fn dense_rank_all_values() -> Result<()> {
let values: ArrayRef = Arc::new(BooleanArray::from(vec![
true, false, true, false, false, true, false, true,
]));
let num_rows = values.len();

let actual = DenseNumber::default()
.partition_evaluator()?
.evaluate_all(&[values], num_rows)?;
let actual = as_uint64_array(&actual)?;

assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values());
Ok(())
}
}
13 changes: 12 additions & 1 deletion datafusion/functions-window/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,27 @@ use log::debug;
use datafusion_expr::registry::FunctionRegistry;
use datafusion_expr::WindowUDF;

pub mod dense_rank;
pub mod percent_rank;
pub mod rank;
pub mod row_number;

/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
pub use super::dense_rank::dense_rank;
pub use super::percent_rank::percent_rank;
pub use super::rank::rank;
pub use super::row_number::row_number;
}

/// Returns all default window functions
pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
vec![row_number::row_number_udwf()]
vec![
row_number::row_number_udwf(),
rank::rank_udwf(),
dense_rank::dense_rank_udwf(),
percent_rank::percent_rank_udwf(),
]
}
/// Registers all enabled packages with a [`FunctionRegistry`]
pub fn register_all(
Expand Down
Loading

0 comments on commit 05fd8bc

Please sign in to comment.