Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create unicode module in datafusion/functions/src/unicode and unicode_expressions feature flag, move char_length function #9825

Merged
merged 14 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ unicode_expressions = [
"datafusion-physical-expr/unicode_expressions",
"datafusion-optimizer/unicode_expressions",
"datafusion-sql/unicode_expressions",
"datafusion-functions/unicode_expressions",
]

[dependencies]
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::assert_batches_eq;
use datafusion_common::DFSchema;
use datafusion_expr::expr::Alias;
use datafusion_expr::{approx_median, cast, ExprSchemable};
use datafusion_functions::unicode::expr_fn::character_length;

fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Expand Down
14 changes: 1 addition & 13 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ pub enum BuiltinScalarFunction {
Cot,

// string functions
/// character_length
CharacterLength,
/// concat
Concat,
/// concat_ws
Expand Down Expand Up @@ -218,7 +216,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Cbrt => Volatility::Immutable,
BuiltinScalarFunction::Cot => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
BuiltinScalarFunction::CharacterLength => Volatility::Immutable,
BuiltinScalarFunction::Concat => Volatility::Immutable,
BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
BuiltinScalarFunction::EndsWith => Volatility::Immutable,
Expand Down Expand Up @@ -257,9 +254,6 @@ impl BuiltinScalarFunction {
// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match self {
BuiltinScalarFunction::CharacterLength => {
utf8_to_int_type(&input_expr_types[0], "character_length")
}
BuiltinScalarFunction::Coalesce => {
// COALESCE has multiple args and they might get coerced, get a preview of this
let coerced_types = data_types(input_expr_types, &self.signature());
Expand Down Expand Up @@ -367,9 +361,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Coalesce => {
Signature::variadic_equal(self.volatility())
}
BuiltinScalarFunction::CharacterLength
| BuiltinScalarFunction::InitCap
| BuiltinScalarFunction::Reverse => {
BuiltinScalarFunction::InitCap | BuiltinScalarFunction::Reverse => {
Signature::uniform(1, vec![Utf8, LargeUtf8], self.volatility())
}
BuiltinScalarFunction::Lpad | BuiltinScalarFunction::Rpad => {
Expand Down Expand Up @@ -584,10 +576,6 @@ impl BuiltinScalarFunction {
// conditional functions
BuiltinScalarFunction::Coalesce => &["coalesce"],

// string functions
BuiltinScalarFunction::CharacterLength => {
&["character_length", "char_length", "length"]
}
BuiltinScalarFunction::Concat => &["concat"],
BuiltinScalarFunction::ConcatWithSeparator => &["concat_ws"],
BuiltinScalarFunction::EndsWith => &["ends_with"],
Expand Down
8 changes: 0 additions & 8 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,6 @@ scalar_expr!(Power, power, base exponent, "`base` raised to the power of `expone
scalar_expr!(Atan2, atan2, y x, "inverse tangent of a division given in the argument");
scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");

// string functions
scalar_expr!(
CharacterLength,
character_length,
string,
"the number of characters in the `string`"
);
scalar_expr!(InitCap, initcap, string, "converts the first letter of each word in `string` in uppercase and the remaining characters in lowercase");
scalar_expr!(Left, left, string n, "returns the first `n` characters in the `string`");
scalar_expr!(Reverse, reverse, string, "reverses the `string`");
Expand Down Expand Up @@ -1032,7 +1025,6 @@ mod test {
test_scalar_expr!(Nanvl, nanvl, x, y);
test_scalar_expr!(Iszero, iszero, input);

test_scalar_expr!(CharacterLength, character_length, string);
test_scalar_expr!(Gcd, gcd, arg_1, arg_2);
test_scalar_expr!(Lcm, lcm, arg_1, arg_2);
test_scalar_expr!(InitCap, initcap, string);
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ default = [
"regex_expressions",
"crypto_expressions",
"string_expressions",
"unicode_expressions",
]
# enable encode/decode functions
encoding_expressions = ["base64", "hex"]
Expand All @@ -52,6 +53,8 @@ math_expressions = []
regex_expressions = ["regex"]
# enable string functions
string_expressions = []
# enable unicode functions
unicode_expressions = ["unicode-segmentation"]

[lib]
name = "datafusion_functions"
Expand All @@ -75,6 +78,7 @@ log = { workspace = true }
md-5 = { version = "^0.10.0", optional = true }
regex = { version = "1.8", optional = true }
sha2 = { version = "^0.10.1", optional = true }
unicode-segmentation = { version = "^1.7.1", optional = true }
uuid = { version = "1.7", features = ["v4"] }

[dev-dependencies]
Expand Down
9 changes: 9 additions & 0 deletions datafusion/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ make_stub_package!(regex, "regex_expressions");
pub mod crypto;
make_stub_package!(crypto, "crypto_expressions");

#[cfg(feature = "unicode_expressions")]
pub mod unicode;
make_stub_package!(unicode, "unicode_expressions");

mod utils;

/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
#[cfg(feature = "core_expressions")]
Expand All @@ -140,6 +146,8 @@ pub mod expr_fn {
pub use super::regex::expr_fn::*;
#[cfg(feature = "string_expressions")]
pub use super::string::expr_fn::*;
#[cfg(feature = "unicode_expressions")]
pub use super::unicode::expr_fn::*;
}

/// Registers all enabled packages with a [`FunctionRegistry`]
Expand All @@ -151,6 +159,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
.chain(math::functions())
.chain(regex::functions())
.chain(crypto::functions())
.chain(unicode::functions())
.chain(string::functions());

all_functions.try_for_each(|udf| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/string/ascii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::string::common::make_scalar_function;
use crate::utils::make_scalar_function;
use arrow::array::Int32Array;
use arrow::array::{ArrayRef, OffsetSizeTrait};
use arrow::datatypes::DataType;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions/src/string/bit_length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use arrow::compute::kernels::length::bit_length;
use std::any::Any;

use arrow::compute::kernels::length::bit_length;
use arrow::datatypes::DataType;

use datafusion_common::{exec_err, Result, ScalarValue};
use datafusion_expr::{ColumnarValue, Volatility};
use datafusion_expr::{ScalarUDFImpl, Signature};

use crate::string::common::*;
use crate::utils::utf8_to_int_type;

#[derive(Debug)]
pub(super) struct BitLengthFunc {
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions/src/string/btrim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion_expr::{ColumnarValue, Volatility};
use datafusion_expr::{ScalarUDFImpl, Signature};

use crate::string::common::*;
use crate::utils::{make_scalar_function, utf8_to_str_type};

/// Returns the longest string with leading and trailing characters removed. If the characters are not specified, whitespace is removed.
/// btrim('xyxtrimyyx', 'xyz') = 'trim'
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/string/chr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_common::{exec_err, Result};
use datafusion_expr::{ColumnarValue, Volatility};
use datafusion_expr::{ScalarUDFImpl, Signature};

use crate::string::common::*;
use crate::utils::make_scalar_function;

/// Returns the character with the given code. chr(0) is disallowed because text data types cannot store that character.
/// chr(65) = 'A'
Expand Down
158 changes: 1 addition & 157 deletions datafusion/functions/src/string/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use arrow::datatypes::DataType;
use datafusion_common::cast::as_generic_string_array;
use datafusion_common::Result;
use datafusion_common::{exec_err, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation};
use datafusion_physical_expr::functions::Hint;
use datafusion_expr::ColumnarValue;

pub(crate) enum TrimType {
Left,
Expand Down Expand Up @@ -98,52 +97,6 @@ pub(crate) fn general_trim<T: OffsetSizeTrait>(
}
}

/// Creates a function to identify the optimal return type of a string function given
/// the type of its first argument.
///
/// If the input type is `LargeUtf8` or `LargeBinary` the return type is
/// `$largeUtf8Type`,
///
/// If the input type is `Utf8` or `Binary` the return type is `$utf8Type`,
macro_rules! get_optimal_return_type {
($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
pub(crate) fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {
Ok(match arg_type {
// LargeBinary inputs are automatically coerced to Utf8
DataType::LargeUtf8 | DataType::LargeBinary => $largeUtf8Type,
// Binary inputs are automatically coerced to Utf8
DataType::Utf8 | DataType::Binary => $utf8Type,
DataType::Null => DataType::Null,
DataType::Dictionary(_, value_type) => match **value_type {
DataType::LargeUtf8 | DataType::LargeBinary => $largeUtf8Type,
DataType::Utf8 | DataType::Binary => $utf8Type,
DataType::Null => DataType::Null,
_ => {
return datafusion_common::exec_err!(
"The {} function can only accept strings, but got {:?}.",
name.to_uppercase(),
**value_type
);
}
},
data_type => {
return datafusion_common::exec_err!(
"The {} function can only accept strings, but got {:?}.",
name.to_uppercase(),
data_type
);
}
})
}
};
}

// `utf8_to_str_type`: returns either a Utf8 or LargeUtf8 based on the input type size.
get_optimal_return_type!(utf8_to_str_type, DataType::LargeUtf8, DataType::Utf8);

// `utf8_to_int_type`: returns either a Int32 or Int64 based on the input type size.
get_optimal_return_type!(utf8_to_int_type, DataType::Int64, DataType::Int32);

/// applies a unary expression to `args[0]` that is expected to be downcastable to
/// a `GenericStringArray` and returns a `GenericStringArray` (which may have a different offset)
/// # Errors
Expand Down Expand Up @@ -221,112 +174,3 @@ where
},
}
}

pub(super) fn make_scalar_function<F>(
inner: F,
hints: Vec<Hint>,
) -> ScalarFunctionImplementation
where
F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
{
Arc::new(move |args: &[ColumnarValue]| {
// first, identify if any of the arguments is an Array. If yes, store its `len`,
// as any scalar will need to be converted to an array of len `len`.
let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});

let is_scalar = len.is_none();

let inferred_length = len.unwrap_or(1);
let args = args
.iter()
.zip(hints.iter().chain(std::iter::repeat(&Hint::Pad)))
.map(|(arg, hint)| {
// Decide on the length to expand this scalar to depending
// on the given hints.
let expansion_len = match hint {
Hint::AcceptsSingular => 1,
Hint::Pad => inferred_length,
};
arg.clone().into_array(expansion_len)
})
.collect::<Result<Vec<_>>>()?;

let result = (inner)(&args);
if is_scalar {
// If all inputs are scalar, keeps output as scalar
let result = result.and_then(|arr| ScalarValue::try_from_array(&arr, 0));
result.map(ColumnarValue::Scalar)
} else {
result.map(ColumnarValue::Array)
}
})
}

#[cfg(test)]
pub mod test {
/// $FUNC ScalarUDFImpl to test
/// $ARGS arguments (vec) to pass to function
/// $EXPECTED a Result<ColumnarValue>
/// $EXPECTED_TYPE is the expected value type
/// $EXPECTED_DATA_TYPE is the expected result type
/// $ARRAY_TYPE is the column type after function applied
macro_rules! test_function {
($FUNC:expr, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, $EXPECTED_DATA_TYPE:expr, $ARRAY_TYPE:ident) => {
let expected: Result<Option<$EXPECTED_TYPE>> = $EXPECTED;
let func = $FUNC;

let type_array = $ARGS.iter().map(|arg| arg.data_type()).collect::<Vec<_>>();
let return_type = func.return_type(&type_array);

match expected {
Ok(expected) => {
assert_eq!(return_type.is_ok(), true);
assert_eq!(return_type.unwrap(), $EXPECTED_DATA_TYPE);

let result = func.invoke($ARGS);
assert_eq!(result.is_ok(), true);

let len = $ARGS
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});
let inferred_length = len.unwrap_or(1);
let result = result.unwrap().clone().into_array(inferred_length).expect("Failed to convert to array");
let result = result.as_any().downcast_ref::<$ARRAY_TYPE>().expect("Failed to convert to type");

// value is correct
match expected {
Some(v) => assert_eq!(result.value(0), v),
None => assert!(result.is_null(0)),
};
}
Err(expected_error) => {
if return_type.is_err() {
match return_type {
Ok(_) => assert!(false, "expected error"),
Err(error) => { datafusion_common::assert_contains!(expected_error.strip_backtrace(), error.strip_backtrace()); }
}
}
else {
// invoke is expected error - cannot use .expect_err() due to Debug not being implemented
match func.invoke($ARGS) {
Ok(_) => assert!(false, "expected error"),
Err(error) => {
assert!(expected_error.strip_backtrace().starts_with(&error.strip_backtrace()));
}
}
}
}
};
};
}

pub(crate) use test_function;
}
3 changes: 1 addition & 2 deletions datafusion/functions/src/string/levenshtein.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use std::sync::Arc;
use arrow::array::{ArrayRef, Int32Array, Int64Array, OffsetSizeTrait};
use arrow::datatypes::DataType;

use crate::utils::{make_scalar_function, utf8_to_int_type};
use datafusion_common::cast::as_generic_string_array;
use datafusion_common::utils::datafusion_strsim;
use datafusion_common::{exec_err, Result};
use datafusion_expr::ColumnarValue;
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};

use crate::string::common::{make_scalar_function, utf8_to_int_type};

#[derive(Debug)]
pub(super) struct LevenshteinFunc {
signature: Signature,
Expand Down
Loading
Loading