Skip to content

Commit

Permalink
[task apache#5568] add_to_unixtime_function
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <[email protected]>
  • Loading branch information
Tangruilin committed Mar 7, 2024
1 parent f3836a5 commit eff4770
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 1 deletion.
60 changes: 60 additions & 0 deletions datafusion-examples/examples/to_unixtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::prelude::*;

/// This example demonstrates how to use the to_date series
/// of functions in the DataFrame API as well as via sql.
#[tokio::main]
async fn main() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(StringArray::from(vec![
"2020-09-08T13:42:29Z",
"2020-09-08T13:42:29.190855-05:00",
"2020-08-09 12:13:29",
"2020-01-02",
]))],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
let ctx = SessionContext::new();

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;

// use to_date function to convert col 'a' to timestamp type using the default parsing
let df = df.with_column("a", to_unixtime(vec![col("a")]))?;

let df = df.select_columns(&["a"])?;

// print the results
df.show().await?;

Ok(())
}
10 changes: 9 additions & 1 deletion datafusion/functions/src/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use datafusion_expr::ScalarUDF;
mod common;
mod to_date;
mod to_timestamp;
mod to_unixtime;

// create UDFs
make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date);
make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime);
make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp);
make_udf_function!(
to_timestamp::ToTimestampSecondsFunc,
Expand Down Expand Up @@ -68,7 +70,7 @@ pub mod expr_fn {
/// # use datafusion_expr::col;
/// # use datafusion::prelude::*;
/// # use datafusion_functions::expr_fn::to_date;
///
///
/// // define a schema.
/// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
///
Expand Down Expand Up @@ -105,6 +107,11 @@ pub mod expr_fn {
super::to_date().call(args)
}

#[doc = "converts a string and optional formats to a Unixtime"]
pub fn to_unixtime(args: Vec<Expr>) -> Expr {
super::to_unixtime().call(args)
}

#[doc = "converts a string and optional formats to a `Timestamp(Nanoseconds, None)`"]
pub fn to_timestamp(args: Vec<Expr>) -> Expr {
super::to_timestamp().call(args)
Expand Down Expand Up @@ -135,6 +142,7 @@ pub mod expr_fn {
pub fn functions() -> Vec<Arc<ScalarUDF>> {
vec![
to_date(),
to_unixtime(),
to_timestamp(),
to_timestamp_seconds(),
to_timestamp_millis(),
Expand Down
85 changes: 85 additions & 0 deletions datafusion/functions/src/datetime/to_unixtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;

use arrow::datatypes::{DataType, TimeUnit};

use crate::datetime::common::*;
use datafusion_common::{exec_err, Result};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};

use super::to_timestamp::ToTimestampSecondsFunc;

#[derive(Debug)]
pub(super) struct ToUnixtimeFunc {
signature: Signature,
}

impl ToUnixtimeFunc {
pub fn new() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
}
}
}

impl ScalarUDFImpl for ToUnixtimeFunc {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"to_unixtime"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.is_empty() {
return exec_err!("to_unixtime function requires 1 or more arguments, got 0");
}

// validate that any args after the first one are Utf8
if args.len() > 1 {
if let Some(value) = validate_data_types(args, "to_unixtime") {
return value;
}
}

match args[0].data_type() {
DataType::Int32 | DataType::Int64 | DataType::Null | DataType::Float64 => {
args[0].cast_to(&DataType::Int64, None)
}
DataType::Date64 | DataType::Date32 | DataType::Timestamp(_, None) => args[0]
.cast_to(&DataType::Timestamp(TimeUnit::Second, None), None)?
.cast_to(&DataType::Int64, None),
DataType::Utf8 => ToTimestampSecondsFunc::new()
.invoke(args)?
.cast_to(&DataType::Int64, None),
other => {
exec_err!("Unsupported data type {:?} for function to_unixtime", other)
}
}
}
}
2 changes: 2 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -684,6 +685,7 @@ enum ScalarFunction {
/// 135 is RegexpLike
ToChar = 136;
/// 137 was ToDate
/// 138 was ToUnixtime
}

message ScalarFunctionNode {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions datafusion/sqllogictest/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2682,3 +2682,72 @@ SELECT to_char(null, '%d-%m-%Y');

statement ok
drop table formats;

##########
## to_unixtime tests
##########

query I
select to_unixtime('2020-09-08T12:00:00+00:00');
----
1599566400

query I
select to_unixtime('01-14-2023 01:01:30+05:30', '%q', '%d-%m-%Y %H/%M/%S', '%+', '%m-%d-%Y %H:%M:%S%#z');
----
1673638290

query I
select to_unixtime('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
----
1684295940

query I
select to_unixtime(arrow_cast('2020-09-08T12:00:00+00:00', 'Date64'));
----
1599566400

query I
select to_unixtime(arrow_cast('2020-09-08', 'Date32'));
----
1599523200

query I
select to_unixtime(to_timestamp('2020-09-08'));
----
1599523200

query I
select to_unixtime(to_timestamp_seconds('2020-09-08'));
----
1599523200

query I
select to_unixtime(to_timestamp_millis('2020-09-08'));
----
1599523200

query I
select to_unixtime(to_timestamp_micros('2020-09-08'));
----
1599523200

query I
select to_unixtime(to_timestamp_nanos('2020-09-08'));
----
1599523200

query I
select to_unixtime(arrow_cast(1599523200, 'Int32'));
----
1599523200

query I
select to_unixtime(arrow_cast(1599523200, 'Int64'));
----
1599523200

query I
select to_unixtime(arrow_cast(1599523200.414, 'Float64'));
----
1599523200

0 comments on commit eff4770

Please sign in to comment.