From 6bac4581b07bc3449f2b21c1eaf02aa416888972 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 12 Oct 2024 20:45:17 +0800 Subject: [PATCH] feat: Implement Decimal from/to bytes represents Signed-off-by: Xuanwo --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/spec/values.rs | 65 ++++++++++++++++++++++++++++--- 3 files changed, 62 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5e2b89730..a8eab95de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ itertools = "0.13" log = "0.4" mockito = "1" murmur3 = "0.5.2" +num-bigint = "0.4.6" once_cell = "1" opendal = "0.50" ordered-float = "4" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 1307cc6f3..d23ba2592 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -63,6 +63,7 @@ futures = { workspace = true } itertools = { workspace = true } moka = { version = "0.12.8", features = ["future"] } murmur3 = { workspace = true } +num-bigint = { workspace = true } once_cell = { workspace = true } opendal = { workspace = true } ordered-float = { workspace = true } diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 3c6e2aa68..eb691149f 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -29,7 +29,9 @@ use std::str::FromStr; pub use _serde::RawLiteral; use bitvec::vec::BitVec; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; +use num_bigint::BigInt; use ordered_float::OrderedFloat; +use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use serde::de::{ MapAccess, {self}, @@ -422,10 +424,15 @@ impl Datum { } PrimitiveType::Fixed(_) => PrimitiveLiteral::Binary(Vec::from(bytes)), PrimitiveType::Binary => PrimitiveLiteral::Binary(Vec::from(bytes)), - PrimitiveType::Decimal { - precision: _, - scale: _, - } => todo!(), + PrimitiveType::Decimal { .. } => { + let unscaled_value = BigInt::from_signed_bytes_be(bytes); + PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Can't convert bytes to i128: {:?}", bytes), + ) + })?) + } }; Ok(Datum::new(data_type, literal)) } @@ -449,7 +456,30 @@ impl Datum { PrimitiveLiteral::String(val) => ByteBuf::from(val.as_bytes()), PrimitiveLiteral::UInt128(val) => ByteBuf::from(val.to_be_bytes()), PrimitiveLiteral::Binary(val) => ByteBuf::from(val.as_slice()), - PrimitiveLiteral::Int128(_) => todo!(), + PrimitiveLiteral::Int128(val) => { + let PrimitiveType::Decimal { precision, .. } = self.r#type else { + unreachable!( + "PrimitiveLiteral Int128 must be PrimitiveType Decimal but got {}", + &self.r#type + ) + }; + + // It's required by iceberg spec that we must keep the minimum + // number of bytes for the value + let required_bytes = Type::decimal_required_bytes(precision) + .expect("PrimitiveType must has valid precision") + as usize; + + // The primitive literal is unscaled value. + let unscaled_value = BigInt::from(*val); + // Convert into two's-complement byte representation of the BigInt + // in big-endian byte order. + let mut bytes = unscaled_value.to_signed_bytes_be(); + // Truncate with required bytes to make sure. + bytes.truncate(required_bytes); + + ByteBuf::from(bytes) + } } } @@ -3031,6 +3061,31 @@ mod tests { check_avro_bytes_serde(bytes, Datum::string("iceberg"), &PrimitiveType::String); } + #[test] + fn avro_bytes_decimal() { + let bytes = vec![4u8, 210u8]; + + check_avro_bytes_serde( + bytes, + Datum::decimal(Decimal::new(1234, 2)).unwrap(), + &PrimitiveType::Decimal { + precision: 38, + scale: 2, + }, + ); + + let bytes = vec![251u8, 46u8]; + + check_avro_bytes_serde( + bytes, + Datum::decimal(Decimal::new(-1234, 2)).unwrap(), + &PrimitiveType::Decimal { + precision: 38, + scale: 2, + }, + ); + } + #[test] fn avro_convert_test_int() { check_convert_with_avro(