Skip to content

Commit

Permalink
feat: support date_bin with timezone display
Browse files Browse the repository at this point in the history
  • Loading branch information
appletreeisyellow committed Jun 28, 2024
1 parent 1574e41 commit 54f1bdc
Showing 1 changed file with 124 additions and 6 deletions.
130 changes: 124 additions & 6 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
// under the License.

use std::any::Any;
use std::ops::Sub;
use std::sync::Arc;

use arrow::array::temporal_conversions::NANOSECONDS;
use arrow::array::timezone::Tz;
use arrow::array::types::{
ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
Expand All @@ -38,7 +40,7 @@ use datafusion_expr::{
ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
};

use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
use chrono::{DateTime, Datelike, Duration, Months, Offset, TimeDelta, TimeZone, Utc};

#[derive(Debug)]
pub struct DateBinFunc {
Expand Down Expand Up @@ -341,39 +343,67 @@ fn date_bin_impl(
Millisecond => NANOSECONDS / 1_000,
Second => NANOSECONDS,
};

move |x: i64| stride_fn(stride, x * scale, origin) / scale
}

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
let apply_stride_fn =
stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);

// TODO chunchun: maybe simplify this part
let date_binned_ts = v.map(apply_stride_fn);
let adjusted_ts = match (date_binned_ts, tz_opt) {
(Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)),
(_, _) => date_binned_ts,
};

ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
v.map(apply_stride_fn),
adjusted_ts,
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
let apply_stride_fn =
stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);

let date_binned_ts = v.map(apply_stride_fn);
let adjusted_ts = match (date_binned_ts, tz_opt) {
(Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)),
(_, _) => date_binned_ts,
};

ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
v.map(apply_stride_fn),
adjusted_ts,
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
let apply_stride_fn =
stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
let date_binned_ts = v.map(apply_stride_fn);
let adjusted_ts = match (date_binned_ts, tz_opt) {
(Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)),
(_, _) => date_binned_ts,
};
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
v.map(apply_stride_fn),
adjusted_ts,
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
let apply_stride_fn =
stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);

let date_binned_ts = v.map(apply_stride_fn);
let adjusted_ts = match (date_binned_ts, tz_opt) {
(Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)),
(_, _) => date_binned_ts,
};

ColumnarValue::Scalar(ScalarValue::TimestampSecond(
v.map(apply_stride_fn),
adjusted_ts,
tz_opt.clone(),
))
}
Expand All @@ -392,7 +422,13 @@ fn date_bin_impl(
let array = as_primitive_array::<T>(array)?;
let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
let array: PrimitiveArray<T> = array
.unary(apply_stride_fn)
.unary(|ts| {
let date_binned_ts = apply_stride_fn(ts);
match tz_opt {
Some(tz) => adjust_to_local_time(date_binned_ts, tz),
None => date_binned_ts,
}
})
.with_timezone_opt(tz_opt.clone());

Ok(ColumnarValue::Array(Arc::new(array)))
Expand Down Expand Up @@ -435,6 +471,23 @@ fn date_bin_impl(
})
}

// TODO chunchun: add description
fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 {
let tz: Tz = timezone.parse().unwrap();

let date_time = DateTime::from_timestamp_nanos(ts).naive_utc();

let offset_seconds: i64 = tz
.offset_from_utc_datetime(&date_time)
.fix()
.local_minus_utc() as i64;

let adjusted_date_time =
date_time.sub(TimeDelta::try_seconds(offset_seconds).unwrap());

adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -767,6 +820,71 @@ mod tests {
});
}

#[test]
fn test_date_bin_timezones_daylight_saving_time() {
let cases = vec![(
vec![
"2024-03-31T00:00:00Z",
"2024-03-31T00:30:00Z",
"2024-03-31T01:00:00Z",
"2024-03-31T00:00:00Z",
"2024-03-31T04:20:00Z",
"2024-04-01T04:20:00Z",
"2024-05-01T00:30:00Z",
],
Some("Europe/Brussels".into()),
"1970-01-01T00:00:00Z",
vec![
"2024-03-31T00:00:00+01:00",
"2024-03-31T00:00:00+01:00",
"2024-03-31T00:00:00+01:00",
"2024-03-31T00:00:00+01:00",
"2024-03-31T00:00:00+01:00",
"2024-04-01T00:00:00+02:00",
"2024-05-01T00:00:00+02:00",
],
)];

cases
.iter()
.for_each(|(original, tz_opt, origin, expected)| {
let input = original
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let right = expected
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let result = DateBinFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
ColumnarValue::Array(Arc::new(input)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(string_to_timestamp_nanos(origin).unwrap()),
tz_opt.clone(),
)),
])
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
);
let left = arrow::array::cast::as_primitive_array::<
TimestampNanosecondType,
>(&result);
assert_eq!(left, &right);
} else {
panic!("unexpected column type");
}
});
}

// TODO chunchun: may need to add test for single: nano, milli, macro, sec, ...

#[test]
fn test_date_bin_single() {
let cases = vec![
Expand Down

0 comments on commit 54f1bdc

Please sign in to comment.