Skip to content

Commit

Permalink
consume and produce precisiontimestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
Blizzara committed Jul 24, 2024
1 parent 1b8a9a6 commit 350e00f
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 94 deletions.
219 changes: 157 additions & 62 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ use crate::variation_const::{
DECIMAL_128_TYPE_VARIATION_REF, DECIMAL_256_TYPE_VARIATION_REF,
DEFAULT_CONTAINER_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
INTERVAL_MONTH_DAY_NANO_TYPE_NAME, LARGE_CONTAINER_TYPE_VARIATION_REF,
TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF,
TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
};
#[allow(deprecated)]
use crate::variation_const::{
INTERVAL_DAY_TIME_TYPE_REF, INTERVAL_MONTH_DAY_NANO_TYPE_REF,
INTERVAL_YEAR_MONTH_TYPE_REF,
INTERVAL_YEAR_MONTH_TYPE_REF, TIMESTAMP_MICRO_TYPE_VARIATION_REF,
TIMESTAMP_MILLI_TYPE_VARIATION_REF, TIMESTAMP_NANO_TYPE_VARIATION_REF,
TIMESTAMP_SECOND_TYPE_VARIATION_REF,
};
use datafusion::arrow::array::{new_empty_array, AsArray};
use datafusion::common::scalar::ScalarStructBuilder;
Expand Down Expand Up @@ -96,6 +96,13 @@ use substrait::proto::{
};
use substrait::proto::{FunctionArgument, SortField};

// Substrait PrecisionTimestampTz indicates that the timestamp is relative to UTC, which
// is the same as the expectation for any non-empty timezone in DF, so any non-empty timezone
// results in correct points on the timeline, and we pick UTC as a reasonable default.
// However, DF uses the timezone also for some arithmetic and display purposes (see e.g.
// https://github.com/apache/arrow-rs/blob/ee5694078c86c8201549654246900a4232d531a9/arrow-cast/src/cast/mod.rs#L1749).
const DEFAULT_TIMEZONE: &str = "UTC";

pub fn name_to_op(name: &str) -> Option<Operator> {
match name {
"equal" => Some(Operator::Eq),
Expand Down Expand Up @@ -1374,23 +1381,51 @@ fn from_substrait_type(
},
r#type::Kind::Fp32(_) => Ok(DataType::Float32),
r#type::Kind::Fp64(_) => Ok(DataType::Float64),
r#type::Kind::Timestamp(ts) => match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Second, None))
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
r#type::Kind::Timestamp(ts) => {
// Kept for backwards compatibility, new plans should use PrecisionTimestamp(Tz) instead
#[allow(deprecated)]
match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Second, None))
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
}
r#type::Kind::PrecisionTimestamp(pts) => {
let unit = match pts.precision {
0 => Ok(TimeUnit::Second),
3 => Ok(TimeUnit::Millisecond),
6 => Ok(TimeUnit::Microsecond),
9 => Ok(TimeUnit::Nanosecond),
p => not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
),
}?;
Ok(DataType::Timestamp(unit, None))
}
r#type::Kind::PrecisionTimestampTz(pts) => {
let unit = match pts.precision {
0 => Ok(TimeUnit::Second),
3 => Ok(TimeUnit::Millisecond),
6 => Ok(TimeUnit::Microsecond),
9 => Ok(TimeUnit::Nanosecond),
p => not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestampTz"
),
}?;
Ok(DataType::Timestamp(unit, Some(DEFAULT_TIMEZONE.into())))
}
r#type::Kind::Date(date) => match date.type_variation_reference {
DATE_32_TYPE_VARIATION_REF => Ok(DataType::Date32),
DATE_64_TYPE_VARIATION_REF => Ok(DataType::Date64),
Expand Down Expand Up @@ -1470,22 +1505,10 @@ fn from_substrait_type(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::IntervalYear(i) => match i.type_variation_reference {
DEFAULT_TYPE_VARIATION_REF => {
Ok(DataType::Interval(IntervalUnit::YearMonth))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::IntervalDay(i) => match i.type_variation_reference {
DEFAULT_TYPE_VARIATION_REF => {
Ok(DataType::Interval(IntervalUnit::DayTime))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::IntervalYear(i) => {
Ok(DataType::Interval(IntervalUnit::YearMonth))
}
r#type::Kind::IntervalDay(i) => Ok(DataType::Interval(IntervalUnit::DayTime)),
r#type::Kind::UserDefined(u) => {
if let Some(name) = extensions.types.get(&u.type_reference) {
match name.as_ref() {
Expand Down Expand Up @@ -1681,21 +1704,59 @@ fn from_substrait_literal(
},
Some(LiteralType::Fp32(f)) => ScalarValue::Float32(Some(*f)),
Some(LiteralType::Fp64(f)) => ScalarValue::Float64(Some(*f)),
Some(LiteralType::Timestamp(t)) => match lit.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
ScalarValue::TimestampSecond(Some(*t), None)
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
ScalarValue::TimestampMillisecond(Some(*t), None)
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
ScalarValue::TimestampMicrosecond(Some(*t), None)
Some(LiteralType::Timestamp(t)) => {
// Kept for backwards compatibility, new plans should use PrecisionTimestamp(Tz) instead
#[allow(deprecated)]
match lit.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
ScalarValue::TimestampSecond(Some(*t), None)
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
ScalarValue::TimestampMillisecond(Some(*t), None)
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
ScalarValue::TimestampMicrosecond(Some(*t), None)
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
ScalarValue::TimestampNanosecond(Some(*t), None)
}
others => {
return substrait_err!("Unknown type variation reference {others}");
}
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
ScalarValue::TimestampNanosecond(Some(*t), None)
}
Some(LiteralType::PrecisionTimestamp(pt)) => match pt.precision {
0 => ScalarValue::TimestampSecond(Some(pt.value as i64), None),
3 => ScalarValue::TimestampMillisecond(Some(pt.value as i64), None),
6 => ScalarValue::TimestampMicrosecond(Some(pt.value as i64), None),
9 => ScalarValue::TimestampNanosecond(Some(pt.value as i64), None),
p => {
return not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
);
}
others => {
return substrait_err!("Unknown type variation reference {others}");
},
Some(LiteralType::PrecisionTimestampTz(pt)) => match pt.precision {
0 => ScalarValue::TimestampSecond(
Some(pt.value as i64),
Some(DEFAULT_TIMEZONE.into()),
),
3 => ScalarValue::TimestampMillisecond(
Some(pt.value as i64),
Some(DEFAULT_TIMEZONE.into()),
),
6 => ScalarValue::TimestampMicrosecond(
Some(pt.value as i64),
Some(DEFAULT_TIMEZONE.into()),
),
9 => ScalarValue::TimestampNanosecond(
Some(pt.value as i64),
Some(DEFAULT_TIMEZONE.into()),
),
p => {
return not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
);
}
},
Some(LiteralType::Date(d)) => ScalarValue::Date32(Some(*d)),
Expand Down Expand Up @@ -2031,21 +2092,55 @@ fn from_substrait_null(
},
r#type::Kind::Fp32(_) => Ok(ScalarValue::Float32(None)),
r#type::Kind::Fp64(_) => Ok(ScalarValue::Float64(None)),
r#type::Kind::Timestamp(ts) => match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampSecond(None, None))
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampMillisecond(None, None))
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampMicrosecond(None, None))
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampNanosecond(None, None))
r#type::Kind::Timestamp(ts) => {
// Kept for backwards compatibility, new plans should use PrecisionTimestamp(Tz) instead
#[allow(deprecated)]
match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampSecond(None, None))
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampMillisecond(None, None))
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampMicrosecond(None, None))
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampNanosecond(None, None))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {kind:?}"
),
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {kind:?}"
}
r#type::Kind::PrecisionTimestamp(pts) => match pts.precision {
0 => Ok(ScalarValue::TimestampSecond(None, None)),
3 => Ok(ScalarValue::TimestampMillisecond(None, None)),
6 => Ok(ScalarValue::TimestampMicrosecond(None, None)),
9 => Ok(ScalarValue::TimestampNanosecond(None, None)),
p => not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
),
},
r#type::Kind::PrecisionTimestampTz(pts) => match pts.precision {
0 => Ok(ScalarValue::TimestampSecond(
None,
Some(DEFAULT_TIMEZONE.into()),
)),
3 => Ok(ScalarValue::TimestampMillisecond(
None,
Some(DEFAULT_TIMEZONE.into()),
)),
6 => Ok(ScalarValue::TimestampMicrosecond(
None,
Some(DEFAULT_TIMEZONE.into()),
)),
9 => Ok(ScalarValue::TimestampNanosecond(
None,
Some(DEFAULT_TIMEZONE.into()),
)),
p => not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
),
},
r#type::Kind::Date(date) => match date.type_variation_reference {
Expand Down
Loading

0 comments on commit 350e00f

Please sign in to comment.