diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index a5404532ace6..c72dfaa33828 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -40,6 +40,8 @@ use datafusion_expr::{ use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc}; +use crate::datetime::to_local_time::adjust_to_local_time; + #[derive(Debug)] pub struct DateBinFunc { signature: Signature, @@ -335,6 +337,7 @@ fn date_bin_impl( stride: i64, stride_fn: fn(i64, i64, i64) -> i64, ) -> impl Fn(i64) -> i64 { + println!("chunchun - origin: {:?}\nstride: {:?}\n", origin, stride); let scale = match T::UNIT { Nanosecond => 1, Microsecond => NANOSECONDS / 1_000_000, @@ -348,6 +351,10 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + let v = match tz_opt { + Some(tz) => Some(adjust_to_local_time(v.unwrap(), tz)), + None => *v, + }; ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( v.map(apply_stride_fn), tz_opt.clone(), @@ -356,6 +363,10 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + let v = match tz_opt { + Some(tz) => Some(adjust_to_local_time(v.unwrap(), tz)), + None => *v, + }; ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( v.map(apply_stride_fn), tz_opt.clone(), @@ -364,6 +375,10 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + let v = match tz_opt { + Some(tz) => Some(adjust_to_local_time(v.unwrap(), tz)), + None => *v, + }; ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( v.map(apply_stride_fn), tz_opt.clone(), @@ -372,6 +387,10 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + let v = match tz_opt { + Some(tz) => Some(adjust_to_local_time(v.unwrap(), tz)), + None => *v, + }; ColumnarValue::Scalar(ScalarValue::TimestampSecond( v.map(apply_stride_fn), tz_opt.clone(), @@ -684,6 +703,28 @@ mod tests { "2020-09-08T00:00:00+08", ], ), + ( + vec![ + "2024-05-01T00:30:00+02:00", + "2024-04-30T23:30:00+02:00", + "2024-05-01T01:30:00+02:00", + "2024-05-01T02:00:00+02:00", + "2024-05-01T02:30:00+02:00", + "2024-05-01T12:30:00+02:00", + "2024-05-01T22:30:00+02:00", + ], + Some("+02".into()), + "1970-01-01T00:00:00+00:00", + vec![ + "2024-04-30T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + ], + ), ]; cases diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index d073c0a2c61e..7cb1931b48a0 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -71,7 +71,7 @@ impl ToLocalTimeFunc { Some(ts), Some(tz), )) => { - let adjusted_ts = adjust_to_local_time(ts, &*tz); + let adjusted_ts = adjust_to_local_time(ts, &tz); Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond( Some(adjusted_ts), None, @@ -81,7 +81,7 @@ impl ToLocalTimeFunc { Some(ts), Some(tz), )) => { - let adjusted_ts = adjust_to_local_time(ts, &*tz); + let adjusted_ts = adjust_to_local_time(ts, &tz); Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( Some(adjusted_ts), None, @@ -91,7 +91,7 @@ impl ToLocalTimeFunc { Some(ts), Some(tz), )) => { - let adjusted_ts = adjust_to_local_time(ts, &*tz); + let adjusted_ts = adjust_to_local_time(ts, &tz); Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( Some(adjusted_ts), None, @@ -101,7 +101,7 @@ impl ToLocalTimeFunc { Some(ts), Some(tz), )) => { - let adjusted_ts = adjust_to_local_time(ts, &*tz); + let adjusted_ts = adjust_to_local_time(ts, &tz); Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( Some(adjusted_ts), None, @@ -124,7 +124,7 @@ impl ToLocalTimeFunc { } } -fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { +pub fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { let tz: Tz = timezone.parse().unwrap(); let date_time = DateTime::from_timestamp_nanos(ts).naive_utc(); @@ -136,13 +136,13 @@ fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { let adjusted_date_time = date_time.add(TimeDelta::try_seconds(offset_seconds).unwrap()); - let adjusted_ts = adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap(); + // let adjusted_ts = adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap(); // println!( // "chunchun - adjust_to_local_time:\ninput timestamp: {:?}\nin NavieDateTime: {:?}\noffset: {:?}\nadjusted_date_time: {:?}\nadjusted_ts: {:?}\n", // ts, date_time, offset_seconds, adjusted_date_time, adjusted_ts // ); - adjusted_ts + adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap() } impl ScalarUDFImpl for ToLocalTimeFunc {