Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use Substrait's PrecisionTimestamp and PrecisionTimestampTz instead of deprecated Timestamp #11597

Merged
merged 5 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ chrono = { workspace = true }
datafusion = { workspace = true, default-features = true }
itertools = { workspace = true }
object_store = { workspace = true }
pbjson-types = "0.6"
prost = "0.12"
substrait = { version = "0.36.0", features = ["serde"] }
pbjson-types = "0.7"
prost = "0.13"
substrait = { version = "0.41", features = ["serde"] }
Copy link
Contributor Author

@Blizzara Blizzara Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a bump for precision timestamp types to have precision, and their value as i64 instead of u64

pbjson and prost need to be bumped to match substrait's deps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Blizzara

I also have a PR prepared to update a bunch of these dependencies as well queued up for the arrow release next week: #12032

url = { workspace = true }

[dev-dependencies]
Expand Down
244 changes: 177 additions & 67 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ use crate::variation_const::{
DECIMAL_128_TYPE_VARIATION_REF, DECIMAL_256_TYPE_VARIATION_REF,
DEFAULT_CONTAINER_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
INTERVAL_MONTH_DAY_NANO_TYPE_NAME, LARGE_CONTAINER_TYPE_VARIATION_REF,
TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF,
TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
};
#[allow(deprecated)]
use crate::variation_const::{
INTERVAL_DAY_TIME_TYPE_REF, INTERVAL_MONTH_DAY_NANO_TYPE_REF,
INTERVAL_YEAR_MONTH_TYPE_REF,
INTERVAL_YEAR_MONTH_TYPE_REF, TIMESTAMP_MICRO_TYPE_VARIATION_REF,
TIMESTAMP_MILLI_TYPE_VARIATION_REF, TIMESTAMP_NANO_TYPE_VARIATION_REF,
TIMESTAMP_SECOND_TYPE_VARIATION_REF,
};
use datafusion::arrow::array::{new_empty_array, AsArray};
use datafusion::common::scalar::ScalarStructBuilder;
Expand All @@ -69,6 +69,7 @@ use datafusion::{
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use substrait::proto::exchange_rel::ExchangeKind;
use substrait::proto::expression::literal::interval_day_to_second::PrecisionMode;
use substrait::proto::expression::literal::user_defined::Val;
use substrait::proto::expression::literal::{
IntervalDayToSecond, IntervalYearToMonth, UserDefined,
Expand All @@ -95,6 +96,13 @@ use substrait::proto::{
};
use substrait::proto::{FunctionArgument, SortField};

// Substrait PrecisionTimestampTz indicates that the timestamp is relative to UTC, which
// is the same as the expectation for any non-empty timezone in DF, so any non-empty timezone
// results in correct points on the timeline, and we pick UTC as a reasonable default.
// However, DF uses the timezone also for some arithmetic and display purposes (see e.g.
// https://github.com/apache/arrow-rs/blob/ee5694078c86c8201549654246900a4232d531a9/arrow-cast/src/cast/mod.rs#L1749).
const DEFAULT_TIMEZONE: &str = "UTC";

pub fn name_to_op(name: &str) -> Option<Operator> {
match name {
"equal" => Some(Operator::Eq),
Expand Down Expand Up @@ -877,8 +885,8 @@ fn from_substrait_jointype(join_type: i32) -> Result<JoinType> {
join_rel::JoinType::Left => Ok(JoinType::Left),
join_rel::JoinType::Right => Ok(JoinType::Right),
join_rel::JoinType::Outer => Ok(JoinType::Full),
join_rel::JoinType::Anti => Ok(JoinType::LeftAnti),
join_rel::JoinType::Semi => Ok(JoinType::LeftSemi),
join_rel::JoinType::LeftAnti => Ok(JoinType::LeftAnti),
join_rel::JoinType::LeftSemi => Ok(JoinType::LeftSemi),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed due to bumping substrait - I think this is just a compile-time break tho, the actual protobuf values stay the same

_ => plan_err!("unsupported join type {substrait_join_type:?}"),
}
} else {
Expand Down Expand Up @@ -1369,23 +1377,51 @@ fn from_substrait_type(
},
r#type::Kind::Fp32(_) => Ok(DataType::Float32),
r#type::Kind::Fp64(_) => Ok(DataType::Float64),
r#type::Kind::Timestamp(ts) => match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Second, None))
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
r#type::Kind::Timestamp(ts) => {
// Kept for backwards compatibility, new plans should use PrecisionTimestamp(Tz) instead
#[allow(deprecated)]
match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Second, None))
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
}
r#type::Kind::PrecisionTimestamp(pts) => {
let unit = match pts.precision {
0 => Ok(TimeUnit::Second),
3 => Ok(TimeUnit::Millisecond),
6 => Ok(TimeUnit::Microsecond),
9 => Ok(TimeUnit::Nanosecond),
p => not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
),
}?;
Ok(DataType::Timestamp(unit, None))
}
r#type::Kind::PrecisionTimestampTz(pts) => {
let unit = match pts.precision {
0 => Ok(TimeUnit::Second),
3 => Ok(TimeUnit::Millisecond),
6 => Ok(TimeUnit::Microsecond),
9 => Ok(TimeUnit::Nanosecond),
p => not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestampTz"
),
}?;
Ok(DataType::Timestamp(unit, Some(DEFAULT_TIMEZONE.into())))
}
r#type::Kind::Date(date) => match date.type_variation_reference {
DATE_32_TYPE_VARIATION_REF => Ok(DataType::Date32),
DATE_64_TYPE_VARIATION_REF => Ok(DataType::Date64),
Expand Down Expand Up @@ -1465,22 +1501,10 @@ fn from_substrait_type(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::IntervalYear(i) => match i.type_variation_reference {
DEFAULT_TYPE_VARIATION_REF => {
Ok(DataType::Interval(IntervalUnit::YearMonth))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::IntervalDay(i) => match i.type_variation_reference {
DEFAULT_TYPE_VARIATION_REF => {
Ok(DataType::Interval(IntervalUnit::DayTime))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::IntervalYear(_) => {
Ok(DataType::Interval(IntervalUnit::YearMonth))
}
r#type::Kind::IntervalDay(_) => Ok(DataType::Interval(IntervalUnit::DayTime)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was just cleanup - we don't check type variations for types where we don't use them

r#type::Kind::UserDefined(u) => {
if let Some(name) = extensions.types.get(&u.type_reference) {
match name.as_ref() {
Expand Down Expand Up @@ -1676,21 +1700,59 @@ fn from_substrait_literal(
},
Some(LiteralType::Fp32(f)) => ScalarValue::Float32(Some(*f)),
Some(LiteralType::Fp64(f)) => ScalarValue::Float64(Some(*f)),
Some(LiteralType::Timestamp(t)) => match lit.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
ScalarValue::TimestampSecond(Some(*t), None)
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
ScalarValue::TimestampMillisecond(Some(*t), None)
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
ScalarValue::TimestampMicrosecond(Some(*t), None)
Some(LiteralType::Timestamp(t)) => {
// Kept for backwards compatibility, new plans should use PrecisionTimestamp(Tz) instead
#[allow(deprecated)]
match lit.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
ScalarValue::TimestampSecond(Some(*t), None)
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
ScalarValue::TimestampMillisecond(Some(*t), None)
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
ScalarValue::TimestampMicrosecond(Some(*t), None)
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
ScalarValue::TimestampNanosecond(Some(*t), None)
}
others => {
return substrait_err!("Unknown type variation reference {others}");
}
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
ScalarValue::TimestampNanosecond(Some(*t), None)
}
Some(LiteralType::PrecisionTimestamp(pt)) => match pt.precision {
0 => ScalarValue::TimestampSecond(Some(pt.value), None),
3 => ScalarValue::TimestampMillisecond(Some(pt.value), None),
6 => ScalarValue::TimestampMicrosecond(Some(pt.value), None),
9 => ScalarValue::TimestampNanosecond(Some(pt.value), None),
p => {
return not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
);
}
others => {
return substrait_err!("Unknown type variation reference {others}");
},
Some(LiteralType::PrecisionTimestampTz(pt)) => match pt.precision {
0 => ScalarValue::TimestampSecond(
Some(pt.value),
Some(DEFAULT_TIMEZONE.into()),
),
3 => ScalarValue::TimestampMillisecond(
Some(pt.value),
Some(DEFAULT_TIMEZONE.into()),
),
6 => ScalarValue::TimestampMicrosecond(
Some(pt.value),
Some(DEFAULT_TIMEZONE.into()),
),
9 => ScalarValue::TimestampNanosecond(
Some(pt.value),
Some(DEFAULT_TIMEZONE.into()),
),
p => {
return not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
);
}
},
Some(LiteralType::Date(d)) => ScalarValue::Date32(Some(*d)),
Expand Down Expand Up @@ -1881,10 +1943,24 @@ fn from_substrait_literal(
Some(LiteralType::IntervalDayToSecond(IntervalDayToSecond {
days,
seconds,
microseconds,
subseconds,
precision_mode,
})) => {
// DF only supports millisecond precision, so we lose the micros here
ScalarValue::new_interval_dt(*days, (seconds * 1000) + (microseconds / 1000))
// DF only supports millisecond precision, so for any more granular type we lose precision
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these changes were needed as part of bumping substrait (substrait-io/substrait#665)

let milliseconds = match precision_mode {
Some(PrecisionMode::Microseconds(ms)) => ms / 1000,
Some(PrecisionMode::Precision(0)) => *subseconds as i32 * 1000,
Some(PrecisionMode::Precision(3)) => *subseconds as i32,
Some(PrecisionMode::Precision(6)) => (subseconds / 1000) as i32,
Some(PrecisionMode::Precision(9)) => (subseconds / 1000 / 1000) as i32,
_ => {
return not_impl_err!(
"Unsupported Substrait interval day to second precision mode"
)
}
};

ScalarValue::new_interval_dt(*days, (seconds * 1000) + milliseconds)
}
Some(LiteralType::IntervalYearToMonth(IntervalYearToMonth { years, months })) => {
ScalarValue::new_interval_ym(*years, *months)
Expand Down Expand Up @@ -2026,21 +2102,55 @@ fn from_substrait_null(
},
r#type::Kind::Fp32(_) => Ok(ScalarValue::Float32(None)),
r#type::Kind::Fp64(_) => Ok(ScalarValue::Float64(None)),
r#type::Kind::Timestamp(ts) => match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampSecond(None, None))
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampMillisecond(None, None))
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampMicrosecond(None, None))
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampNanosecond(None, None))
r#type::Kind::Timestamp(ts) => {
// Kept for backwards compatibility, new plans should use PrecisionTimestamp(Tz) instead
#[allow(deprecated)]
match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampSecond(None, None))
}
TIMESTAMP_MILLI_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampMillisecond(None, None))
}
TIMESTAMP_MICRO_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampMicrosecond(None, None))
}
TIMESTAMP_NANO_TYPE_VARIATION_REF => {
Ok(ScalarValue::TimestampNanosecond(None, None))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {kind:?}"
),
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {kind:?}"
}
r#type::Kind::PrecisionTimestamp(pts) => match pts.precision {
0 => Ok(ScalarValue::TimestampSecond(None, None)),
3 => Ok(ScalarValue::TimestampMillisecond(None, None)),
6 => Ok(ScalarValue::TimestampMicrosecond(None, None)),
9 => Ok(ScalarValue::TimestampNanosecond(None, None)),
p => not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
),
},
r#type::Kind::PrecisionTimestampTz(pts) => match pts.precision {
0 => Ok(ScalarValue::TimestampSecond(
None,
Some(DEFAULT_TIMEZONE.into()),
)),
3 => Ok(ScalarValue::TimestampMillisecond(
None,
Some(DEFAULT_TIMEZONE.into()),
)),
6 => Ok(ScalarValue::TimestampMicrosecond(
None,
Some(DEFAULT_TIMEZONE.into()),
)),
9 => Ok(ScalarValue::TimestampNanosecond(
None,
Some(DEFAULT_TIMEZONE.into()),
)),
p => not_impl_err!(
"Unsupported Substrait precision {p} for PrecisionTimestamp"
),
},
r#type::Kind::Date(date) => match date.type_variation_reference {
Expand Down
Loading