From a028ee6e3d110bb0aded722d92e99d2d0cf39df9 Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Tue, 28 May 2024 18:06:39 -0500 Subject: [PATCH 1/6] feat: add `to_local_time` scalar function --- datafusion/functions/src/datetime/mod.rs | 1 + .../functions/src/datetime/to_local_time.rs | 181 ++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 datafusion/functions/src/datetime/to_local_time.rs diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 9c2f80856bf8..93277b6aaed2 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -32,6 +32,7 @@ pub mod make_date; pub mod now; pub mod to_char; pub mod to_date; +pub mod to_local_time; pub mod to_timestamp; pub mod to_unixtime; diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs new file mode 100644 index 000000000000..8c855402d56c --- /dev/null +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use arrow::datatypes::DataType; +use arrow::datatypes::DataType::Timestamp; +use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; + +use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + +#[derive(Debug)] +pub struct ToLocalTimeFunc { + signature: Signature, +} + +impl Default for ToLocalTimeFunc { + fn default() -> Self { + Self::new() + } +} + +impl ToLocalTimeFunc { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } + + fn to_local_time(&self, args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!( + "to_local_time function requires 1 argument, got {}", + args.len() + ); + } + + let time_value = args[0].clone(); + let arg_type = time_value.data_type(); + match arg_type { + DataType::Timestamp(_, None) => { + // if no timezone specificed, just return the input + Ok(time_value.clone()) + } + // if has timezone, then remove the timezone in return type, keep the time value the same + DataType::Timestamp(_, Some(_)) => match time_value { + ColumnarValue::Scalar(ScalarValue::TimestampSecond( + Some(ts), + Some(_), + )) => Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond( + Some(ts), + None, + ))), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(ts), + Some(_), + )) => Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(ts), + None, + ))), + ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( + Some(ts), + Some(_), + )) => Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( + Some(ts), + None, + ))), + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(ts), + Some(_), + )) => Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(ts), + None, + ))), + _ => { + exec_err!( + "to_local_time function requires timestamp argument, got {:?}", + arg_type + ) + } + }, + _ => { + exec_err!( + "to_local_time function requires timestamp argument, got {:?}", + arg_type + ) + } + } + } +} + +impl ScalarUDFImpl for ToLocalTimeFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "to_local_time" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types.is_empty() { + return exec_err!("to_local_time function requires 1 arguments, got 0"); + } + + match &arg_types[0] { + Timestamp(Nanosecond, _) => Ok(Timestamp(Nanosecond, None)), + Timestamp(Microsecond, _) => Ok(Timestamp(Microsecond, None)), + Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)), + Timestamp(Second, _) => Ok(Timestamp(Second, None)), + _ => exec_err!( + "The date_bin function can only accept timestamp as the second arg." + ), + } + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.is_empty() { + return exec_err!( + "to_local_time function requires 1 or more arguments, got 0" + ); + } + + self.to_local_time(args) + } +} + +#[cfg(test)] +mod tests { + use datafusion_common::ScalarValue; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + + use super::ToLocalTimeFunc; + + #[test] + fn test_to_local_time() { + // TODO chunchun: update test cases to assert the result + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampSecond(Some(1), None), + )]); + assert!(res.is_ok()); + + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampSecond(Some(1), Some("+01:00".into())), + )]); + assert!(res.is_ok()); + + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampNanosecond(Some(1), Some("America/New_York".into())), + )]); + assert!(res.is_ok()); + + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + // 2021-03-28T02:30:00-04:00 + ScalarValue::TimestampNanosecond( + Some(1616916600), + Some("America/New_York".into()), + ), + )]); + assert!(res.is_ok()); + } +} From d642b493108263defcbf797ab90bf5de91beac7e Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Tue, 28 May 2024 18:26:25 -0500 Subject: [PATCH 2/6] chore: wire up the `to_local_time` as a UDF func --- datafusion/functions/src/datetime/mod.rs | 11 ++++++++++- datafusion/functions/src/datetime/to_local_time.rs | 3 ++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 93277b6aaed2..c47efb322a7f 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -51,6 +51,7 @@ make_udf_function!( make_udf_function!(now::NowFunc, NOW, now); make_udf_function!(to_char::ToCharFunc, TO_CHAR, to_char); make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date); +make_udf_function!(to_local_time::ToLocalTimeFunc, TO_LOCAL_TIME, to_local_time); make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime); make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp); make_udf_function!( @@ -109,7 +110,14 @@ pub mod expr_fn { ),( now, "returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement", - ),( + ), + // TODO chunchun: add more doc examples + ( + to_local_time, + "converts a timestamp with a timezone to a local time, returns a timestamp without timezone", + args, + ), + ( to_unixtime, "converts a string and optional formats to a Unixtime", args, @@ -278,6 +286,7 @@ pub fn functions() -> Vec> { now(), to_char(), to_date(), + to_local_time(), to_unixtime(), to_timestamp(), to_timestamp_seconds(), diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index 8c855402d56c..d873f9035c3f 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -128,7 +128,7 @@ impl ScalarUDFImpl for ToLocalTimeFunc { Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)), Timestamp(Second, _) => Ok(Timestamp(Second, None)), _ => exec_err!( - "The date_bin function can only accept timestamp as the second arg." + "The to_local_time function can only accept timestamp as the second arg." ), } } @@ -140,6 +140,7 @@ impl ScalarUDFImpl for ToLocalTimeFunc { ); } + // TODO chunchun: support more input data types self.to_local_time(args) } } From 1e8bc26c0821271c7713d0bdf69f91912d64ae9d Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Thu, 6 Jun 2024 16:57:33 -0500 Subject: [PATCH 3/6] chore: make `to_local_time` adjust underlying time value --- .../functions/src/datetime/to_local_time.rs | 101 +++++++++++++----- 1 file changed, 74 insertions(+), 27 deletions(-) diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index d873f9035c3f..d073c0a2c61e 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -16,11 +16,14 @@ // under the License. use std::any::Any; +use std::ops::Add; +use arrow::array::timezone::Tz; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; +use chrono::{DateTime, Offset, TimeDelta, TimeZone}; use datafusion_common::{exec_err, Result, ScalarValue}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; @@ -57,36 +60,53 @@ impl ToLocalTimeFunc { // if no timezone specificed, just return the input Ok(time_value.clone()) } - // if has timezone, then remove the timezone in return type, keep the time value the same + // if has timezone, adjust the underlying time value. the current time value + // is stored as i64 in UTC, even though the timezone may not be in UTC, so + // we need to adjust the time value to the local time. see [`adjust_to_local_time`] + // for more details. + // + // Then remove the timezone in return type, i.e. return None DataType::Timestamp(_, Some(_)) => match time_value { ColumnarValue::Scalar(ScalarValue::TimestampSecond( Some(ts), - Some(_), - )) => Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond( - Some(ts), - None, - ))), + Some(tz), + )) => { + let adjusted_ts = adjust_to_local_time(ts, &*tz); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond( + Some(adjusted_ts), + None, + ))) + } ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( Some(ts), - Some(_), - )) => Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - Some(ts), - None, - ))), + Some(tz), + )) => { + let adjusted_ts = adjust_to_local_time(ts, &*tz); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(adjusted_ts), + None, + ))) + } ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( Some(ts), - Some(_), - )) => Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - Some(ts), - None, - ))), + Some(tz), + )) => { + let adjusted_ts = adjust_to_local_time(ts, &*tz); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( + Some(adjusted_ts), + None, + ))) + } ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( Some(ts), - Some(_), - )) => Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - Some(ts), - None, - ))), + Some(tz), + )) => { + let adjusted_ts = adjust_to_local_time(ts, &*tz); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(adjusted_ts), + None, + ))) + } _ => { exec_err!( "to_local_time function requires timestamp argument, got {:?}", @@ -104,6 +124,27 @@ impl ToLocalTimeFunc { } } +fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { + let tz: Tz = timezone.parse().unwrap(); + + let date_time = DateTime::from_timestamp_nanos(ts).naive_utc(); + + let offset_seconds: i64 = tz + .offset_from_utc_datetime(&date_time) + .fix() + .local_minus_utc() as i64; + + let adjusted_date_time = + date_time.add(TimeDelta::try_seconds(offset_seconds).unwrap()); + let adjusted_ts = adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap(); + + // println!( + // "chunchun - adjust_to_local_time:\ninput timestamp: {:?}\nin NavieDateTime: {:?}\noffset: {:?}\nadjusted_date_time: {:?}\nadjusted_ts: {:?}\n", + // ts, date_time, offset_seconds, adjusted_date_time, adjusted_ts + // ); + adjusted_ts +} + impl ScalarUDFImpl for ToLocalTimeFunc { fn as_any(&self) -> &dyn Any { self @@ -128,7 +169,7 @@ impl ScalarUDFImpl for ToLocalTimeFunc { Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)), Timestamp(Second, _) => Ok(Timestamp(Second, None)), _ => exec_err!( - "The to_local_time function can only accept timestamp as the second arg." + "The to_local_time function can only accept timestamp as the arg." ), } } @@ -140,7 +181,6 @@ impl ScalarUDFImpl for ToLocalTimeFunc { ); } - // TODO chunchun: support more input data types self.to_local_time(args) } } @@ -166,15 +206,22 @@ mod tests { assert!(res.is_ok()); let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( - ScalarValue::TimestampNanosecond(Some(1), Some("America/New_York".into())), + ScalarValue::TimestampNanosecond(Some(1), Some("Europe/Brussels".into())), + )]); + assert!(res.is_ok()); + + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampSecond( + Some(2_000_000_000), // 2033-05-18T03:33:20Z + Some("Europe/Brussels".into()), + ), )]); assert!(res.is_ok()); let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( - // 2021-03-28T02:30:00-04:00 ScalarValue::TimestampNanosecond( - Some(1616916600), - Some("America/New_York".into()), + Some(1711922401000000000), // 2024-03-31T22:00:01Z + Some("Europe/Brussels".into()), // 2024-04-01T00:00:01+02:00 ), )]); assert!(res.is_ok()); From ce4e560928f880d199e9b9dbc2e8fa22b2c370bc Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Fri, 7 Jun 2024 08:43:15 -0500 Subject: [PATCH 4/6] chore: lint --- datafusion/functions/src/datetime/to_local_time.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index d073c0a2c61e..7cb1931b48a0 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -71,7 +71,7 @@ impl ToLocalTimeFunc { Some(ts), Some(tz), )) => { - let adjusted_ts = adjust_to_local_time(ts, &*tz); + let adjusted_ts = adjust_to_local_time(ts, &tz); Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond( Some(adjusted_ts), None, @@ -81,7 +81,7 @@ impl ToLocalTimeFunc { Some(ts), Some(tz), )) => { - let adjusted_ts = adjust_to_local_time(ts, &*tz); + let adjusted_ts = adjust_to_local_time(ts, &tz); Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( Some(adjusted_ts), None, @@ -91,7 +91,7 @@ impl ToLocalTimeFunc { Some(ts), Some(tz), )) => { - let adjusted_ts = adjust_to_local_time(ts, &*tz); + let adjusted_ts = adjust_to_local_time(ts, &tz); Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( Some(adjusted_ts), None, @@ -101,7 +101,7 @@ impl ToLocalTimeFunc { Some(ts), Some(tz), )) => { - let adjusted_ts = adjust_to_local_time(ts, &*tz); + let adjusted_ts = adjust_to_local_time(ts, &tz); Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( Some(adjusted_ts), None, @@ -124,7 +124,7 @@ impl ToLocalTimeFunc { } } -fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { +pub fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { let tz: Tz = timezone.parse().unwrap(); let date_time = DateTime::from_timestamp_nanos(ts).naive_utc(); @@ -136,13 +136,13 @@ fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { let adjusted_date_time = date_time.add(TimeDelta::try_seconds(offset_seconds).unwrap()); - let adjusted_ts = adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap(); + // let adjusted_ts = adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap(); // println!( // "chunchun - adjust_to_local_time:\ninput timestamp: {:?}\nin NavieDateTime: {:?}\noffset: {:?}\nadjusted_date_time: {:?}\nadjusted_ts: {:?}\n", // ts, date_time, offset_seconds, adjusted_date_time, adjusted_ts // ); - adjusted_ts + adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap() } impl ScalarUDFImpl for ToLocalTimeFunc { From 54f1bdc5e83044d61b4eb278e22559d01140ebb3 Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Fri, 28 Jun 2024 17:22:44 -0500 Subject: [PATCH 5/6] feat: support `date_bin` with timezone display --- datafusion/functions/src/datetime/date_bin.rs | 130 +++++++++++++++++- 1 file changed, 124 insertions(+), 6 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index e777e5ea95d0..5dc1ad7c5e91 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -16,9 +16,11 @@ // under the License. use std::any::Any; +use std::ops::Sub; use std::sync::Arc; use arrow::array::temporal_conversions::NANOSECONDS; +use arrow::array::timezone::Tz; use arrow::array::types::{ ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, @@ -38,7 +40,7 @@ use datafusion_expr::{ ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD, }; -use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc}; +use chrono::{DateTime, Datelike, Duration, Months, Offset, TimeDelta, TimeZone, Utc}; #[derive(Debug)] pub struct DateBinFunc { @@ -341,6 +343,7 @@ fn date_bin_impl( Millisecond => NANOSECONDS / 1_000, Second => NANOSECONDS, }; + move |x: i64| stride_fn(stride, x * scale, origin) / scale } @@ -348,32 +351,59 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + + // TODO chunchun: maybe simplify this part + let date_binned_ts = v.map(apply_stride_fn); + let adjusted_ts = match (date_binned_ts, tz_opt) { + (Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)), + (_, _) => date_binned_ts, + }; + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - v.map(apply_stride_fn), + adjusted_ts, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + + let date_binned_ts = v.map(apply_stride_fn); + let adjusted_ts = match (date_binned_ts, tz_opt) { + (Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)), + (_, _) => date_binned_ts, + }; + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - v.map(apply_stride_fn), + adjusted_ts, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + let date_binned_ts = v.map(apply_stride_fn); + let adjusted_ts = match (date_binned_ts, tz_opt) { + (Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)), + (_, _) => date_binned_ts, + }; ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - v.map(apply_stride_fn), + adjusted_ts, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + + let date_binned_ts = v.map(apply_stride_fn); + let adjusted_ts = match (date_binned_ts, tz_opt) { + (Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)), + (_, _) => date_binned_ts, + }; + ColumnarValue::Scalar(ScalarValue::TimestampSecond( - v.map(apply_stride_fn), + adjusted_ts, tz_opt.clone(), )) } @@ -392,7 +422,13 @@ fn date_bin_impl( let array = as_primitive_array::(array)?; let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); let array: PrimitiveArray = array - .unary(apply_stride_fn) + .unary(|ts| { + let date_binned_ts = apply_stride_fn(ts); + match tz_opt { + Some(tz) => adjust_to_local_time(date_binned_ts, tz), + None => date_binned_ts, + } + }) .with_timezone_opt(tz_opt.clone()); Ok(ColumnarValue::Array(Arc::new(array))) @@ -435,6 +471,23 @@ fn date_bin_impl( }) } +// TODO chunchun: add description +fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { + let tz: Tz = timezone.parse().unwrap(); + + let date_time = DateTime::from_timestamp_nanos(ts).naive_utc(); + + let offset_seconds: i64 = tz + .offset_from_utc_datetime(&date_time) + .fix() + .local_minus_utc() as i64; + + let adjusted_date_time = + date_time.sub(TimeDelta::try_seconds(offset_seconds).unwrap()); + + adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap() +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -767,6 +820,71 @@ mod tests { }); } + #[test] + fn test_date_bin_timezones_daylight_saving_time() { + let cases = vec![( + vec![ + "2024-03-31T00:00:00Z", + "2024-03-31T00:30:00Z", + "2024-03-31T01:00:00Z", + "2024-03-31T00:00:00Z", + "2024-03-31T04:20:00Z", + "2024-04-01T04:20:00Z", + "2024-05-01T00:30:00Z", + ], + Some("Europe/Brussels".into()), + "1970-01-01T00:00:00Z", + vec![ + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-04-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + ], + )]; + + cases + .iter() + .for_each(|(original, tz_opt, origin, expected)| { + let input = original + .iter() + .map(|s| Some(string_to_timestamp_nanos(s).unwrap())) + .collect::() + .with_timezone_opt(tz_opt.clone()); + let right = expected + .iter() + .map(|s| Some(string_to_timestamp_nanos(s).unwrap())) + .collect::() + .with_timezone_opt(tz_opt.clone()); + let result = DateBinFunc::new() + .invoke(&[ + ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), + ColumnarValue::Array(Arc::new(input)), + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(string_to_timestamp_nanos(origin).unwrap()), + tz_opt.clone(), + )), + ]) + .unwrap(); + if let ColumnarValue::Array(result) = result { + assert_eq!( + result.data_type(), + &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()) + ); + let left = arrow::array::cast::as_primitive_array::< + TimestampNanosecondType, + >(&result); + assert_eq!(left, &right); + } else { + panic!("unexpected column type"); + } + }); + } + + // TODO chunchun: may need to add test for single: nano, milli, macro, sec, ... + #[test] fn test_date_bin_single() { let cases = vec![ From a1211961a4c1383a662c383d969516fa1906b893 Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Tue, 2 Jul 2024 12:02:46 -0500 Subject: [PATCH 6/6] chore: update tests --- datafusion/functions/src/datetime/date_bin.rs | 129 +++++++----------- 1 file changed, 51 insertions(+), 78 deletions(-) diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index 5dc1ad7c5e91..516e0cfd31b8 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -726,6 +726,42 @@ mod tests { "2020-09-08T00:00:00Z", ], ), + ( + vec![ + "2024-03-31T00:30:00Z", + "2024-03-31T01:00:00Z", + "2024-03-31T04:20:00Z", + "2024-04-01T04:20:00Z", + "2024-05-01T00:30:00Z", + ], + Some("Europe/Brussels".into()), + "1970-01-01T00:00:00Z", + vec![ + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-04-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + ], + ), + ( + vec![ + "2024-03-31T00:30:00", + "2024-03-31T01:00:00", + "2024-03-31T04:20:00", + "2024-04-01T04:20:00", + "2024-05-01T00:30:00", + ], + Some("Europe/Brussels".into()), + "1970-01-01T00:00:00Z", + vec![ + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-04-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + ], + ), ( vec![ "2020-09-08T00:00:00Z", @@ -737,11 +773,11 @@ mod tests { Some("-02".into()), "1970-01-01T00:00:00Z", vec![ - "2020-09-08T00:00:00Z", - "2020-09-08T00:00:00Z", - "2020-09-08T00:00:00Z", - "2020-09-08T00:00:00Z", - "2020-09-08T00:00:00Z", + "2020-09-08T00:00:00-02:00", + "2020-09-08T00:00:00-02:00", + "2020-09-08T00:00:00-02:00", + "2020-09-08T00:00:00-02:00", + "2020-09-08T00:00:00-02:00", ], ), ( @@ -755,11 +791,11 @@ mod tests { Some("+05".into()), "1970-01-01T00:00:00+05", vec![ - "2020-09-08T00:00:00+05", - "2020-09-08T00:00:00+05", - "2020-09-08T00:00:00+05", - "2020-09-08T00:00:00+05", - "2020-09-08T00:00:00+05", + "2020-09-07T19:00:00+05:00", + "2020-09-07T19:00:00+05:00", + "2020-09-07T19:00:00+05:00", + "2020-09-07T19:00:00+05:00", + "2020-09-07T19:00:00+05:00", ], ), ( @@ -773,11 +809,11 @@ mod tests { Some("+08".into()), "1970-01-01T00:00:00+08", vec![ - "2020-09-08T00:00:00+08", - "2020-09-08T00:00:00+08", - "2020-09-08T00:00:00+08", - "2020-09-08T00:00:00+08", - "2020-09-08T00:00:00+08", + "2020-09-07T16:00:00+08:00", + "2020-09-07T16:00:00+08:00", + "2020-09-07T16:00:00+08:00", + "2020-09-07T16:00:00+08:00", + "2020-09-07T16:00:00+08:00", ], ), ]; @@ -820,69 +856,6 @@ mod tests { }); } - #[test] - fn test_date_bin_timezones_daylight_saving_time() { - let cases = vec![( - vec![ - "2024-03-31T00:00:00Z", - "2024-03-31T00:30:00Z", - "2024-03-31T01:00:00Z", - "2024-03-31T00:00:00Z", - "2024-03-31T04:20:00Z", - "2024-04-01T04:20:00Z", - "2024-05-01T00:30:00Z", - ], - Some("Europe/Brussels".into()), - "1970-01-01T00:00:00Z", - vec![ - "2024-03-31T00:00:00+01:00", - "2024-03-31T00:00:00+01:00", - "2024-03-31T00:00:00+01:00", - "2024-03-31T00:00:00+01:00", - "2024-03-31T00:00:00+01:00", - "2024-04-01T00:00:00+02:00", - "2024-05-01T00:00:00+02:00", - ], - )]; - - cases - .iter() - .for_each(|(original, tz_opt, origin, expected)| { - let input = original - .iter() - .map(|s| Some(string_to_timestamp_nanos(s).unwrap())) - .collect::() - .with_timezone_opt(tz_opt.clone()); - let right = expected - .iter() - .map(|s| Some(string_to_timestamp_nanos(s).unwrap())) - .collect::() - .with_timezone_opt(tz_opt.clone()); - let result = DateBinFunc::new() - .invoke(&[ - ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)), - ColumnarValue::Array(Arc::new(input)), - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - Some(string_to_timestamp_nanos(origin).unwrap()), - tz_opt.clone(), - )), - ]) - .unwrap(); - if let ColumnarValue::Array(result) = result { - assert_eq!( - result.data_type(), - &DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()) - ); - let left = arrow::array::cast::as_primitive_array::< - TimestampNanosecondType, - >(&result); - assert_eq!(left, &right); - } else { - panic!("unexpected column type"); - } - }); - } - // TODO chunchun: may need to add test for single: nano, milli, macro, sec, ... #[test]