Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A new interface for Scalar Functions #7978

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ use object_store::{ObjectMeta, ObjectStore};
/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
let mut is_applicable = true;

fn handle_func_volatility(
volatility: Volatility,
is_applicable: &mut bool,
) -> VisitRecursion {
match volatility {
Volatility::Immutable => VisitRecursion::Continue,
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
*is_applicable = false;
VisitRecursion::Stop
}
}
}

expr.apply(&mut |expr| {
Ok(match expr {
Expr::Column(Column { ref name, .. }) => {
Expand Down Expand Up @@ -90,28 +105,17 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
| Expr::GetIndexedField { .. }
| Expr::GroupingSet(_)
| Expr::Case { .. } => VisitRecursion::Continue,

Expr::ScalarFunction(scalar_function) => {
match scalar_function.fun.volatility() {
Volatility::Immutable => VisitRecursion::Continue,
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
VisitRecursion::Stop
}
}
}
Expr::ScalarFunction(scalar_function) => handle_func_volatility(
scalar_function.fun.volatility(),
&mut is_applicable,
),
Expr::ScalarFunctionExpr(scalar_function) => handle_func_volatility(
scalar_function.fun.volatility(),
&mut is_applicable,
),
Expr::ScalarUDF(ScalarUDF { fun, .. }) => {
match fun.signature.volatility {
Volatility::Immutable => VisitRecursion::Continue,
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
is_applicable = false;
VisitRecursion::Stop
}
}
handle_func_volatility(fun.signature.volatility, &mut is_applicable)
}

// TODO other expressions are not handled yet:
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::ScalarFunction(func) => {
create_function_physical_name(&func.fun.to_string(), false, &func.args)
}
Expr::ScalarFunctionExpr(func) => {
create_function_physical_name(func.fun.name()[0], false, &func.args)
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => {
create_function_physical_name(&fun.name, false, args)
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/tpcds_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ async fn tpcds_logical_q48() -> Result<()> {
create_logical_plan(48).await
}

#[ignore]
#[tokio::test]
async fn tpcds_logical_q49() -> Result<()> {
create_logical_plan(49).await
Expand Down Expand Up @@ -776,6 +777,7 @@ async fn tpcds_physical_q48() -> Result<()> {
create_physical_plan(48).await
}

#[ignore]
#[tokio::test]
async fn tpcds_physical_q49() -> Result<()> {
create_physical_plan(49).await
Expand Down
45 changes: 43 additions & 2 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Built-in functions module contains all the built-in functions definitions.

use std::any::Any;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt;
Expand All @@ -28,8 +29,8 @@ use crate::signature::TIMEZONE_WILDCARD;
use crate::type_coercion::binary::get_wider_type;
use crate::type_coercion::functions::data_types;
use crate::{
conditional_expressions, struct_expressions, utils, FuncMonotonicity, Signature,
TypeSignature, Volatility,
conditional_expressions, struct_expressions, utils, FuncMonotonicity,
FunctionReturnType, ScalarFunctionDef, Signature, TypeSignature, Volatility,
};

use arrow::datatypes::{DataType, Field, Fields, IntervalUnit, TimeUnit};
Expand Down Expand Up @@ -1550,6 +1551,46 @@ impl FromStr for BuiltinScalarFunction {
}
}

/// `ScalarFunctionDef` is the new interface for builtin scalar functions
/// This is an adapter between the old and new interface, to use the new interface
/// for internal execution. Functions are planned to move into new interface gradually
/// The function body (`execute()` in `ScalarFunctionDef`) now are all defined in
/// `physical-expr` crate, so the new interface implementation are defined separately
/// in `BuiltinScalarFunctionWrapper`
impl ScalarFunctionDef for BuiltinScalarFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll be able to impl this as long as BuiltInScalarFunction is split across two crates.

fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &[&str] {
aliases(self)
}

fn input_type(&self) -> TypeSignature {
self.signature().type_signature
}

fn return_type(&self) -> FunctionReturnType {
let self_cloned = *self;
let return_type_resolver = move |args: &[DataType]| -> Result<Arc<DataType>> {
let result = BuiltinScalarFunction::return_type(self_cloned, args)?;
Ok(Arc::new(result))
};

FunctionReturnType::LambdaReturnType(Arc::new(return_type_resolver))
}

fn volatility(&self) -> Volatility {
self.volatility()
}

fn monotonicity(&self) -> Option<FuncMonotonicity> {
self.monotonicity()
}

// execution functions are defined in `BuiltinScalarFunctionWrapper`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All execution code for BuiltinScalarFunction are in phys-expr crate (which depends on this crate), so they're defined elsewhere

}

/// Creates a function that returns the return type of a string function given
/// the type of its first argument.
///
Expand Down
43 changes: 43 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::utils::{expr_to_columns, find_out_reference_exprs};
use crate::window_frame;
use crate::window_function;
use crate::Operator;
use crate::ScalarFunctionDef;
use crate::{aggregate_function, ExprSchemable};
use arrow::datatypes::DataType;
use datafusion_common::tree_node::{Transformed, TreeNode};
Expand Down Expand Up @@ -150,6 +151,9 @@ pub enum Expr {
Sort(Sort),
/// Represents the call of a built-in scalar function with a set of arguments.
ScalarFunction(ScalarFunction),
/// Represents the call of a built-in scalar function with a set of arguments,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about calling this ScalarFunction2 or ScalarFunctionDyn to try and describe the difference? It might be confusing to see ScalarFunction and ScalarFunctionDef

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than add a parallel implementation I would love to just change

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
    /// The function
    pub fun: built_in_function::BuiltinScalarFunction,
    /// List of expressions to feed to the functions as arguments
    pub args: Vec<Expr>,
}

to something like

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
    /// The function
    pub fun: Arc<dyn ScalarFunctionDef>,
    /// List of expressions to feed to the functions as arguments
    pub args: Vec<Expr>,
}

/// with new `ScalarFunctionDef` interface
ScalarFunctionExpr(ScalarFunctionExpr),
/// Represents the call of a user-defined scalar function with arguments.
ScalarUDF(ScalarUDF),
/// Represents the call of an aggregate built-in function with arguments.
Expand Down Expand Up @@ -351,6 +355,38 @@ impl ScalarFunction {
}
}

/// scalar function expression for new `ScalarFunctionDef` interface
#[derive(Clone, Debug)]
pub struct ScalarFunctionExpr {
/// The function
pub fun: Arc<dyn ScalarFunctionDef>,
/// List of expressions to feed to the functions as arguments
pub args: Vec<Expr>,
}

impl Hash for ScalarFunctionExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.fun.name().hash(state);
self.fun.input_type().hash(state);
}
}

impl Eq for ScalarFunctionExpr {}

impl PartialEq for ScalarFunctionExpr {
fn eq(&self, other: &Self) -> bool {
self.fun.name() == other.fun.name()
&& self.fun.input_type() == other.fun.input_type()
}
}

impl ScalarFunctionExpr {
/// Create a new ScalarFunctionExpr expression
pub fn new(fun: Arc<dyn ScalarFunctionDef>, args: Vec<Expr>) -> Self {
Self { fun, args }
}
}

/// ScalarUDF expression
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarUDF {
Expand Down Expand Up @@ -731,6 +767,7 @@ impl Expr {
Expr::Placeholder(_) => "Placeholder",
Expr::QualifiedWildcard { .. } => "QualifiedWildcard",
Expr::ScalarFunction(..) => "ScalarFunction",
Expr::ScalarFunctionExpr(..) => "ScalarFunctionExpr",
Expr::ScalarSubquery { .. } => "ScalarSubquery",
Expr::ScalarUDF(..) => "ScalarUDF",
Expr::ScalarVariable(..) => "ScalarVariable",
Expand Down Expand Up @@ -1177,6 +1214,9 @@ impl fmt::Display for Expr {
Expr::ScalarFunction(func) => {
fmt_function(f, &func.fun.to_string(), false, &func.args, true)
}
Expr::ScalarFunctionExpr(func) => {
fmt_function(f, func.fun.name()[0], false, &func.args, true)
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => {
fmt_function(f, &fun.name, false, args, true)
}
Expand Down Expand Up @@ -1511,6 +1551,9 @@ fn create_name(e: &Expr) -> Result<String> {
Expr::ScalarFunction(func) => {
create_function_name(&func.fun.to_string(), false, &func.args)
}
Expr::ScalarFunctionExpr(func) => {
create_function_name(func.fun.name()[0], false, &func.args)
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => {
create_function_name(&fun.name, false, args)
}
Expand Down
22 changes: 20 additions & 2 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
use super::{Between, Expr, Like};
use crate::expr::{
AggregateFunction, AggregateUDF, Alias, BinaryExpr, Cast, GetFieldAccess,
GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction, ScalarUDF, Sort,
TryCast, WindowFunction,
GetIndexedField, InList, InSubquery, Placeholder, ScalarFunction, ScalarFunctionExpr,
ScalarUDF, Sort, TryCast, WindowFunction,
};
use crate::field_util::GetFieldAccessSchema;
use crate::type_coercion::binary::get_result_type;
use crate::FunctionReturnType;
use crate::{LogicalPlan, Projection, Subquery};
use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -96,6 +97,22 @@ impl ExprSchemable for Expr {

fun.return_type(&data_types)
}
Expr::ScalarFunctionExpr(ScalarFunctionExpr { fun, args }) => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;

match fun.return_type() {
FunctionReturnType::LambdaReturnType(return_type_resolver) => {
Ok((return_type_resolver)(&data_types)?.as_ref().clone())
}
FunctionReturnType::SameAsFirstArg
| FunctionReturnType::FixedType(_) => {
unimplemented!()
}
}
}
Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
let data_types = args
.iter()
Expand Down Expand Up @@ -230,6 +247,7 @@ impl ExprSchemable for Expr {
Expr::ScalarVariable(_, _)
| Expr::TryCast { .. }
| Expr::ScalarFunction(..)
| Expr::ScalarFunctionExpr(..)
| Expr::ScalarUDF(..)
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub use signature::{
};
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::AggregateUDF;
pub use udf::ScalarUDF;
pub use udf::{FunctionReturnType, ScalarFunctionDef, ScalarUDF};
pub use udwf::WindowUDF;
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
pub use window_function::{BuiltInWindowFunction, WindowFunction};
Expand Down
10 changes: 8 additions & 2 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::expr::{
AggregateFunction, AggregateUDF, Alias, Between, BinaryExpr, Case, Cast,
GetIndexedField, GroupingSet, InList, InSubquery, Like, Placeholder, ScalarFunction,
ScalarUDF, Sort, TryCast, WindowFunction,
ScalarFunctionExpr, ScalarUDF, Sort, TryCast, WindowFunction,
};
use crate::{Expr, GetFieldAccess};

Expand Down Expand Up @@ -64,7 +64,7 @@ impl TreeNode for Expr {
}
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.clone(),
Expr::ScalarFunction (ScalarFunction{ args, .. } )| Expr::ScalarUDF(ScalarUDF { args, .. }) => {
Expr::ScalarFunction (ScalarFunction{ args, .. } )| Expr::ScalarFunctionExpr(ScalarFunctionExpr{args, ..})| Expr::ScalarUDF(ScalarUDF { args, .. }) => {
args.clone()
}
Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
Expand Down Expand Up @@ -278,6 +278,12 @@ impl TreeNode for Expr {
Expr::ScalarFunction(ScalarFunction { args, fun }) => Expr::ScalarFunction(
ScalarFunction::new(fun, transform_vec(args, &mut transform)?),
),
Expr::ScalarFunctionExpr(ScalarFunctionExpr { args, fun }) => {
Expr::ScalarFunctionExpr(ScalarFunctionExpr::new(
fun,
transform_vec(args, &mut transform)?,
))
}
Expr::ScalarUDF(ScalarUDF { args, fun }) => {
Expr::ScalarUDF(ScalarUDF::new(fun, transform_vec(args, &mut transform)?))
}
Expand Down
57 changes: 56 additions & 1 deletion datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,67 @@

//! Udf module contains foundational types that are used to represent UDFs in DataFusion.

use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use crate::{
ColumnarValue, Expr, FuncMonotonicity, ReturnTypeFunction,
ScalarFunctionImplementation, Signature, TypeSignature, Volatility,
};
use arrow::array::ArrayRef;
use arrow::datatypes::DataType;
use datafusion_common::{internal_err, DataFusionError, Result};
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

// TODO(PR): add doc comments
pub trait ScalarFunctionDef: Any + Sync + Send + std::fmt::Debug {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about calling this trait ScalarFunction rather than ScalarFunctionDef? I know there are already several other things called ScalarFunction but that would also keep it in line with things like WindowFunction https://docs.rs/datafusion/latest/datafusion/index.html?search=WindowFunction

/// Return as [`Any`] so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

// May return 1 or more name as aliasing
fn name(&self) -> &[&str];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend name(&self) ->&str that returns one name, and then a second API that returns optional aliases

/// returns any alias names this function is known by. Defaults to empty list
fn aliases(&self) -> &[&str] { &[] }
``

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks better 👍🏼


fn input_type(&self) -> TypeSignature;

fn return_type(&self) -> FunctionReturnType;

fn execute(&self, _args: &[ArrayRef]) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be

Suggested change
fn execute(&self, _args: &[ArrayRef]) -> Result<ArrayRef> {
fn execute(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {

internal_err!("This method should be implemented if `supports_execute_raw()` returns `false`")
}

fn volatility(&self) -> Volatility;

fn monotonicity(&self) -> Option<FuncMonotonicity>;

// ===============================
// OPTIONAL METHODS START BELOW
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trait consists of mandatory and optional methods, it can get a bit lengthy...
An alternative implementation is trait inheritance(e.g. pub trait ScalarFunctionExtended: ScalarFunctionDef, it seems won't be much clearer than the current one.
We can add more docs/examples to make it more straightforward later

// ===============================

/// `execute()` and `execute_raw()` are two possible alternative for function definition:
/// If returns `false`, `execute()` will be used for execution;
/// If returns `true`, `execute_raw()` will be called.
fn use_execute_raw_instead(&self) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rational for this:
built-in functions now have two kinds of implementation

  1. execute() -- It's a more common and easy-to-implement interface for UDFs, and can be converted to the more general execute_raw() case using make_scalar_function()
  2. execute_raw() -- Fewer existing functions are directly implemented using this interface

Though a single execute_raw() can cover all existing cases, this design can make the general case easier to implement for UDFs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a single execute method that takes ColumnarValues and put in the documentation how to go from ColumnarValue --> ArrayRef for simple, initial implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should make the initial implementation concise

false
}

/// An alternative function defination than `execute()`
fn execute_raw(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
internal_err!("This method should be implemented if `supports_execute_raw()` returns `true`")
}
}

/// Defines the return type behavior of a function.
pub enum FunctionReturnType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now like 99% of built-in functions are either SameAsFirstArg or FixedType, only very rare array functions can only be defined using lambda. This way can make the new interface a little bit easier to use.
(also possible to extend to address #7657)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a signature like this:

pub trait ScalarFunctionDef: Any + Sync + Send + std::fmt::Debug {
...
    /// What type will this function return, given arguments of the specified input types?
    /// By default, returns the same type as the first argument
    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
      arg_types.get(0)
        .ok_or_else(Error ("Implementation of Function {} did not specify a return type, and there are no arguments"))
  }
...
}

Then I think most function implementations can be left as the default or as

impl ScalarFunctionDef for Foo {
    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
      Ok(DataType::Utf8)
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returning enum FunctionReturnType approach's advantage is being able to extend to solve #7657, otherwise we have to extend the function interface to address that issue (though I'm not sure if that requirement is common, should we consider that case?)
And its limitation is harder to use when the return type is actually some complex lambda, but only for very few array functions.

/// Matches the first argument's type.
SameAsFirstArg,
/// A predetermined type.
FixedType(Arc<DataType>),
/// Decided by a custom lambda function.
LambdaReturnType(ReturnTypeFunction),
}

/// Logical representation of a UDF.
#[derive(Clone)]
pub struct ScalarUDF {
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
| Expr::TryCast { .. }
| Expr::Sort { .. }
| Expr::ScalarFunction(..)
| Expr::ScalarFunctionExpr(..)
| Expr::ScalarUDF(..)
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
Expand Down
Loading