Skip to content

Commit

Permalink
[task apache#8987]add_to_date_function
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <[email protected]>
  • Loading branch information
Tangruilin committed Feb 28, 2024
1 parent 96abac8 commit da8c3b7
Show file tree
Hide file tree
Showing 13 changed files with 361 additions and 3 deletions.
60 changes: 60 additions & 0 deletions datafusion-examples/examples/to_date.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::prelude::*;

/// This example demonstrates how to use the to_date series
/// of functions in the DataFrame API as well as via sql.
#[tokio::main]
async fn main() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"2020-08-09 12:13:29",
"2020-01-02",
]))],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
let ctx = SessionContext::new();

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;

// use to_date function to convert col 'a' to timestamp type using the default parsing
let df = df.with_column("a", to_date(vec![col("a")]))?;

let df = df.select_columns(&["a"])?;

// print the results
df.show().await?;

Ok(())
}
8 changes: 7 additions & 1 deletion datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use arrow::{
compute::kernels::cast::{cast_with_options, CastOptions},
datatypes::{
i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType,
Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
Date32Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
IntervalYearMonthType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
Expand Down Expand Up @@ -3317,6 +3317,12 @@ impl ScalarType<i64> for TimestampNanosecondType {
}
}

impl ScalarType<i32> for Date32Type {
fn scalar(r: Option<i32>) -> ScalarValue {
ScalarValue::Date32(r)
}
}

#[cfg(test)]
mod tests {
use std::cmp::Ordering;
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ pub enum BuiltinScalarFunction {
ToTimestampSeconds,
/// from_unixtime
FromUnixtime,
/// to_date
ToDate,
///now
Now,
///current_date
Expand Down Expand Up @@ -475,6 +477,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Upper => Volatility::Immutable,
BuiltinScalarFunction::Struct => Volatility::Immutable,
BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
BuiltinScalarFunction::ToDate => Volatility::Immutable,
BuiltinScalarFunction::ArrowTypeof => Volatility::Immutable,
BuiltinScalarFunction::OverLay => Volatility::Immutable,
BuiltinScalarFunction::Levenshtein => Volatility::Immutable,
Expand Down Expand Up @@ -787,6 +790,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ToTimestampMicros => Ok(Timestamp(Microsecond, None)),
BuiltinScalarFunction::ToTimestampSeconds => Ok(Timestamp(Second, None)),
BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)),
BuiltinScalarFunction::ToDate => Ok(Date32),
BuiltinScalarFunction::Now => {
Ok(Timestamp(Nanosecond, Some("+00:00".into())))
}
Expand Down Expand Up @@ -1072,6 +1076,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::FromUnixtime => {
Signature::uniform(1, vec![Int64], self.volatility())
}
BuiltinScalarFunction::ToDate => Signature::variadic_any(self.volatility()),
BuiltinScalarFunction::Digest => Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Expand Down Expand Up @@ -1504,6 +1509,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ToTimestampSeconds => &["to_timestamp_seconds"],
BuiltinScalarFunction::ToTimestampNanos => &["to_timestamp_nanos"],
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],
BuiltinScalarFunction::ToDate => &["to_date"],

// hashing functions
BuiltinScalarFunction::Digest => &["digest"],
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,11 @@ scalar_expr!(
datetime format,
"converts a date, time, timestamp or duration to a string based on the provided format"
);
nary_scalar_expr!(
ToDate,
to_date,
"converts string to date according to the given format"
);
nary_scalar_expr!(
ToTimestamp,
to_timestamp,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/core/nvl2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use arrow::datatypes::DataType;
use datafusion_common::{internal_err, plan_datafusion_err, DataFusionError, Result};
use datafusion_common::{internal_err, plan_datafusion_err, Result};
use datafusion_expr::{utils, ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use arrow::compute::kernels::zip::zip;
use arrow::compute::is_not_null;
Expand Down
111 changes: 110 additions & 1 deletion datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ use datafusion_common::cast::{
as_timestamp_nanosecond_array, as_timestamp_second_array,
};
use datafusion_common::{
exec_err, not_impl_err, DataFusionError, Result, ScalarType, ScalarValue,
exec_err, internal_datafusion_err, not_impl_err, DataFusionError, Result, ScalarType,
ScalarValue,
};
use datafusion_expr::ColumnarValue;

Expand Down Expand Up @@ -424,6 +425,84 @@ fn to_timestamp_impl<T: ArrowTimestampType + ScalarType<i64>>(
}
}

/// # Examples
///
/// ```ignore
/// # use std::sync::Arc;

/// # use datafusion::arrow::array::StringArray;
/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
/// # use datafusion::arrow::record_batch::RecordBatch;
/// # use datafusion::error::Result;
/// # use datafusion::prelude::*;

/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// // define a schema.
/// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));

/// // define data.
/// let batch = RecordBatch::try_new(
/// schema,
/// vec![Arc::new(StringArray::from(vec![
/// "2020-09-08T13:42:29Z",
/// "2020-09-08T13:42:29.190855-05:00",
/// "2020-08-09 12:13:29",
/// "2020-01-02",
/// ]))],
/// )?;

/// // declare a new context. In spark API, this corresponds to a new spark SQLsession
/// let ctx = SessionContext::new();

/// // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
/// ctx.register_batch("t", batch)?;
/// let df = ctx.table("t").await?;

/// // use to_date function to convert col 'a' to timestamp type using the default parsing
/// let df = df.with_column("a", to_date(vec![col("a")]))?;

/// let df = df.select_columns(&["a"])?;

/// // print the results
/// df.show().await?;

/// # Ok(())
/// # }
/// ```
pub fn to_date(args: &[ColumnarValue]) -> Result<ColumnarValue> {
match args.len() {
1 => handle::<Date32Type, _, Date32Type>(
args,
|s| {
string_to_timestamp_nanos_shim(s)
.map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000))
.and_then(|v| {
v.try_into().map_err(|_| {
internal_datafusion_err!("Unable to cast to Date32 for converting from i64 to i32 failed")
})
})
},
"to_date",
),
n if n >= 2 => handle_multiple::<Date32Type, _, Date32Type, _>(
args,
|s, format| {
string_to_timestamp_nanos_formatted(s, format)
.map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000))
.and_then(|v| {
v.try_into().map_err(|_| {
internal_datafusion_err!("Unable to cast to Date32 for converting from i64 to i32 failed")
})
})
},
|n| n,
"to_date",
),
_ => exec_err!("Unsupported 0 argument count for function to_date"),
}
}

/// to_timestamp SQL function
///
/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**.
Expand Down Expand Up @@ -1567,6 +1646,36 @@ fn validate_to_timestamp_data_types(
None
}

/// to_date SQL function implementation
pub fn to_date_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.is_empty() {
return exec_err!(
"to_date function requires 1 or more arguments, got {}",
args.len()
);
}

// validate that any args after the first one are Utf8
if args.len() > 1 {
if let Some(value) = validate_to_timestamp_data_types(args, "to_date") {
return value;
}
}

match args[0].data_type() {
DataType::Int32
| DataType::Int64
| DataType::Null
| DataType::Float64
| DataType::Date32
| DataType::Date64 => cast_column(&args[0], &DataType::Date32, None),
DataType::Utf8 => to_date(args),
other => {
exec_err!("Unsupported data type {:?} for function to_date", other)
}
}
}

/// to_timestamp() SQL function implementation
pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ pub fn create_physical_fun(
BuiltinScalarFunction::FromUnixtime => {
Arc::new(datetime_expressions::from_unixtime_invoke)
}
BuiltinScalarFunction::ToDate => Arc::new(datetime_expressions::to_date_invoke),
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
make_scalar_function_inner(string_expressions::initcap::<i32>)(args)
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -682,6 +683,7 @@ enum ScalarFunction {
ArrayReverse = 134;
RegexpLike = 135;
ToChar = 136;
ToDate = 137;
}

message ScalarFunctionNode {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Levenshtein => Self::Levenshtein,
ScalarFunction::SubstrIndex => Self::SubstrIndex,
ScalarFunction::FindInSet => Self::FindInSet,
ScalarFunction::ToDate => Self::ToDate,
}
}
}
Expand Down Expand Up @@ -1817,6 +1818,16 @@ pub fn parse_expr(
ScalarFunction::StructFun => {
Ok(struct_fun(parse_expr(&args[0], registry)?))
}
ScalarFunction::ToDate => {
let args: Vec<_> = args
.iter()
.map(|expr| parse_expr(expr, registry))
.collect::<std::result::Result<_, _>>()?;
Ok(Expr::ScalarFunction(expr::ScalarFunction::new(
BuiltinScalarFunction::ToDate,
args,
)))
}
}
}
ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, args }) => {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::Levenshtein => Self::Levenshtein,
BuiltinScalarFunction::SubstrIndex => Self::SubstrIndex,
BuiltinScalarFunction::FindInSet => Self::FindInSet,
BuiltinScalarFunction::ToDate => Self::ToDate,
};

Ok(scalar_function)
Expand Down
Loading

0 comments on commit da8c3b7

Please sign in to comment.