Skip to content

Commit

Permalink
Implement user defined planner for position (apache#11243)
Browse files Browse the repository at this point in the history
* Implement user defined planner for position

* Fix format

* Move planner to session_state

* Extract function
  • Loading branch information
xinlifoobar authored and findepi committed Jul 16, 2024
1 parent a5d3098 commit ca6d146
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 19 deletions.
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ impl SessionState {
Arc::new(functions_array::planner::FieldAccessPlanner),
#[cfg(feature = "datetime_expressions")]
Arc::new(functions::datetime::planner::ExtractPlanner),
#[cfg(feature = "unicode_expressions")]
Arc::new(functions::unicode::planner::PositionPlanner),
];

let mut new_self = SessionState {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
Ok(PlannerResult::Original(exprs))
}

// Plan the POSITION expression, e.g., POSITION(<expr> in <expr>)
// returns origin expression arguments if not possible
fn plan_position(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}

/// Plan the dictionary literal `{ key: value, ...}`
///
/// Returns origin expression arguments if not possible
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions/src/unicode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod character_length;
pub mod find_in_set;
pub mod left;
pub mod lpad;
pub mod planner;
pub mod reverse;
pub mod right;
pub mod rpad;
Expand Down
36 changes: 36 additions & 0 deletions datafusion/functions/src/unicode/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! SQL planning extensions like [`PositionPlanner`]

use datafusion_common::Result;
use datafusion_expr::{
expr::ScalarFunction,
planner::{PlannerResult, UserDefinedSQLPlanner},
Expr,
};

#[derive(Default)]
pub struct PositionPlanner;

impl UserDefinedSQLPlanner for PositionPlanner {
fn plan_position(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::unicode::strpos(), args),
)))
}
}
44 changes: 25 additions & 19 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

fn sql_position_to_expr(
&self,
substr_expr: SQLExpr,
str_expr: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let substr =
self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
let mut extract_args = vec![fullstr, substr];
for planner in self.planners.iter() {
match planner.plan_position(extract_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
extract_args = args;
}
}
}

not_impl_err!(
"Position not supported by UserDefinedExtensionPlanners: {extract_args:?}"
)
}

fn try_plan_dictionary_literal(
&self,
fields: Vec<DictionaryField>,
Expand Down Expand Up @@ -924,25 +949,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
};
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
}
fn sql_position_to_expr(
&self,
substr_expr: SQLExpr,
str_expr: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let fun = self
.context_provider
.get_function_meta("strpos")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'strpos' function")
})?;
let substr =
self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
let args = vec![fullstr, substr];
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
}
}

#[cfg(test)]
Expand Down

0 comments on commit ca6d146

Please sign in to comment.