From 0289bdb0be45e68466433efc4056f6daa6aaabef Mon Sep 17 00:00:00 2001 From: Sevenannn Date: Thu, 1 Aug 2024 22:58:32 -0700 Subject: [PATCH 1/2] Postgres Type::TIME support --- src/sql/arrow_sql_gen/postgres.rs | 34 +++++++++++++++++++++++++++++- src/sql/arrow_sql_gen/statement.rs | 7 +++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/sql/arrow_sql_gen/postgres.rs b/src/sql/arrow_sql_gen/postgres.rs index ab04d08..b3490e7 100644 --- a/src/sql/arrow_sql_gen/postgres.rs +++ b/src/sql/arrow_sql_gen/postgres.rs @@ -7,13 +7,15 @@ use arrow::array::{ ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, LargeBinaryBuilder, LargeStringBuilder, ListBuilder, RecordBatch, RecordBatchOptions, - StringBuilder, StructBuilder, TimestampMillisecondBuilder, UInt32Builder, + StringBuilder, StructBuilder, Time64NanosecondBuilder, TimestampMillisecondBuilder, + UInt32Builder, }; use arrow::datatypes::{DataType, Date32Type, Field, Schema, TimeUnit}; use bigdecimal::num_bigint::BigInt; use bigdecimal::num_bigint::Sign; use bigdecimal::BigDecimal; use bigdecimal::ToPrimitive; +use chrono::Timelike; use composite::CompositeType; use sea_query::{Alias, ColumnType, SeaRc}; use snafu::prelude::*; @@ -250,6 +252,35 @@ pub fn rows_to_arrow(rows: &[Row]) -> Result { Type::BOOL => { handle_primitive_type!(builder, Type::BOOL, BooleanBuilder, bool, row, i); } + Type::TIME => { + let Some(builder) = builder else { + return NoBuilderForIndexSnafu { index: i }.fail(); + }; + let Some(builder) = builder + .as_any_mut() + .downcast_mut::() + else { + return FailedToDowncastBuilderSnafu { + postgres_type: format!("{postgres_type}"), + } + .fail(); + }; + let v = row + .try_get::>(i) + .with_context(|_| FailedToGetRowValueSnafu { + pg_type: Type::TIME, + })?; + + match v { + Some(v) => { + let timestamp: i64 = i64::from(v.num_seconds_from_midnight()) + * 1_000_000_000 + + i64::from(v.nanosecond()); + builder.append_value(timestamp); + } + None => builder.append_null(), + } + } Type::NUMERIC => { let v: Option = row.try_get(i).context(FailedToGetRowValueSnafu { @@ -532,6 +563,7 @@ fn map_column_type_to_data_type(column_type: &Type) -> Option { Some(DataType::Timestamp(TimeUnit::Millisecond, None)) } Type::DATE => Some(DataType::Date32), + Type::TIME => Some(DataType::Time64(TimeUnit::Nanosecond)), Type::INT2_ARRAY => Some(DataType::List(Arc::new(Field::new( "item", DataType::Int16, diff --git a/src/sql/arrow_sql_gen/statement.rs b/src/sql/arrow_sql_gen/statement.rs index b6f7a2f..821f43d 100644 --- a/src/sql/arrow_sql_gen/statement.rs +++ b/src/sql/arrow_sql_gen/statement.rs @@ -267,7 +267,12 @@ impl InsertBuilder { row_values.push(Keyword::Null.into()); continue; } - row_values.push(valid_array.value(row).into()); + insert_timestamp_into_row_values( + OffsetDateTime::from_unix_timestamp_nanos(i128::from( + valid_array.value(row), + )), + &mut row_values, + )?; } } DataType::Timestamp(TimeUnit::Second, _) => { From 90546f4f918fbca9f29d10fe30249d228c585391 Mon Sep 17 00:00:00 2001 From: Sevenannn Date: Mon, 5 Aug 2024 14:43:55 -0700 Subject: [PATCH 2/2] Add test for chrono naive time to arrow type --- src/sql/arrow_sql_gen/postgres.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/sql/arrow_sql_gen/postgres.rs b/src/sql/arrow_sql_gen/postgres.rs index b3490e7..b5c527b 100644 --- a/src/sql/arrow_sql_gen/postgres.rs +++ b/src/sql/arrow_sql_gen/postgres.rs @@ -732,6 +732,8 @@ impl<'a> FromSql<'a> for BigDecimalFromSql { #[cfg(test)] mod tests { use super::*; + use arrow::array::{Time64NanosecondArray, Time64NanosecondBuilder}; + use chrono::NaiveTime; use std::str::FromStr; #[allow(clippy::cast_possible_truncation)] @@ -759,4 +761,27 @@ mod tests { .expect("Failed to run FromSql"); assert_eq!(negative_result.inner, negative); } + + #[test] + fn test_chrono_naive_time_to_time64nanosecond() { + let chrono_naive_vec = vec![ + NaiveTime::from_hms_opt(10, 30, 00).unwrap_or_default(), + NaiveTime::from_hms_opt(10, 45, 15).unwrap_or_default(), + ]; + + let time_array: Time64NanosecondArray = vec![ + (10 * 3600 + 30 * 60) * 1_000_000_000, + (10 * 3600 + 45 * 60 + 15) * 1_000_000_000, + ] + .into(); + + let mut builder = Time64NanosecondBuilder::new(); + for time in chrono_naive_vec { + let timestamp: i64 = i64::from(time.num_seconds_from_midnight()) * 1_000_000_000 + + i64::from(time.nanosecond()); + builder.append_value(timestamp); + } + let converted_result = builder.finish(); + assert_eq!(converted_result, time_array); + } }