Skip to content

Commit

Permalink
Merge branch 'apache:main' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead authored Jul 25, 2023
2 parents 80ac8a8 + 8fca61e commit 4d6ff35
Show file tree
Hide file tree
Showing 24 changed files with 826 additions and 41 deletions.
3 changes: 2 additions & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ pub async fn exec_from_lines(
}

// run the left over query if the last statement doesn't contain ‘;’
if !query.is_empty() {
// ignore if it only consists of '\n'
if query.contains(|c| c != '\n') {
match exec_and_print(ctx, print_options, query).await {
Ok(_) => {}
Err(err) => println!("{err}"),
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn main() -> Result<()> {
Some(vec![12, 0]),
true,
b',',
b'"',
object_store,
);

Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use arrow::{
},
datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType},
};
use arrow_array::Decimal256Array;

// Downcast ArrayRef to Date32Array
pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array> {
Expand Down Expand Up @@ -65,6 +66,11 @@ pub fn as_decimal128_array(array: &dyn Array) -> Result<&Decimal128Array> {
Ok(downcast_value!(array, Decimal128Array))
}

// Downcast ArrayRef to Decimal256Array
pub fn as_decimal256_array(array: &dyn Array) -> Result<&Decimal256Array> {
Ok(downcast_value!(array, Decimal256Array))
}

// Downcast ArrayRef to Float32Array
pub fn as_float32_array(array: &dyn Array) -> Result<&Float32Array> {
Ok(downcast_value!(array, Float32Array))
Expand Down
161 changes: 148 additions & 13 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use std::str::FromStr;
use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};

use crate::cast::{
as_decimal128_array, as_dictionary_array, as_fixed_size_binary_array,
as_fixed_size_list_array, as_list_array, as_struct_array,
as_decimal128_array, as_decimal256_array, as_dictionary_array,
as_fixed_size_binary_array, as_fixed_size_list_array, as_list_array, as_struct_array,
};
use crate::delta::shift_months;
use crate::error::{DataFusionError, Result};
use arrow::buffer::NullBuffer;
use arrow::compute::nullif;
use arrow::datatypes::{FieldRef, Fields, SchemaBuilder};
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
use arrow::{
array::*,
compute::kernels::cast::{cast_with_options, CastOptions},
Expand All @@ -47,6 +47,7 @@ use arrow::{
},
};
use arrow_array::timezone::Tz;
use arrow_array::ArrowNativeTypeOp;
use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime};

// Constants we use throughout this file:
Expand Down Expand Up @@ -75,6 +76,8 @@ pub enum ScalarValue {
Float64(Option<f64>),
/// 128bit decimal, using the i128 to represent the decimal, precision scale
Decimal128(Option<i128>, u8, i8),
/// 256bit decimal, using the i256 to represent the decimal, precision scale
Decimal256(Option<i256>, u8, i8),
/// signed 8bit int
Int8(Option<i8>),
/// signed 16bit int
Expand Down Expand Up @@ -160,6 +163,10 @@ impl PartialEq for ScalarValue {
v1.eq(v2) && p1.eq(p2) && s1.eq(s2)
}
(Decimal128(_, _, _), _) => false,
(Decimal256(v1, p1, s1), Decimal256(v2, p2, s2)) => {
v1.eq(v2) && p1.eq(p2) && s1.eq(s2)
}
(Decimal256(_, _, _), _) => false,
(Boolean(v1), Boolean(v2)) => v1.eq(v2),
(Boolean(_), _) => false,
(Float32(v1), Float32(v2)) => match (v1, v2) {
Expand Down Expand Up @@ -283,6 +290,15 @@ impl PartialOrd for ScalarValue {
}
}
(Decimal128(_, _, _), _) => None,
(Decimal256(v1, p1, s1), Decimal256(v2, p2, s2)) => {
if p1.eq(p2) && s1.eq(s2) {
v1.partial_cmp(v2)
} else {
// Two decimal values can be compared if they have the same precision and scale.
None
}
}
(Decimal256(_, _, _), _) => None,
(Boolean(v1), Boolean(v2)) => v1.partial_cmp(v2),
(Boolean(_), _) => None,
(Float32(v1), Float32(v2)) => match (v1, v2) {
Expand Down Expand Up @@ -1038,6 +1054,7 @@ macro_rules! impl_op_arithmetic {
get_sign!($OPERATION),
true,
)))),
// todo: Add Decimal256 support
_ => Err(DataFusionError::Internal(format!(
"Operator {} is not implemented for types {:?} and {:?}",
stringify!($OPERATION),
Expand Down Expand Up @@ -1516,6 +1533,11 @@ impl std::hash::Hash for ScalarValue {
p.hash(state);
s.hash(state)
}
Decimal256(v, p, s) => {
v.hash(state);
p.hash(state);
s.hash(state)
}
Boolean(v) => v.hash(state),
Float32(v) => v.map(Fl).hash(state),
Float64(v) => v.map(Fl).hash(state),
Expand Down Expand Up @@ -1994,6 +2016,9 @@ impl ScalarValue {
ScalarValue::Decimal128(_, precision, scale) => {
DataType::Decimal128(*precision, *scale)
}
ScalarValue::Decimal256(_, precision, scale) => {
DataType::Decimal256(*precision, *scale)
}
ScalarValue::TimestampSecond(_, tz_opt) => {
DataType::Timestamp(TimeUnit::Second, tz_opt.clone())
}
Expand Down Expand Up @@ -2083,6 +2108,9 @@ impl ScalarValue {
ScalarValue::Decimal128(Some(v), precision, scale) => {
Ok(ScalarValue::Decimal128(Some(-v), *precision, *scale))
}
ScalarValue::Decimal256(Some(v), precision, scale) => Ok(
ScalarValue::Decimal256(Some(v.neg_wrapping()), *precision, *scale),
),
value => Err(DataFusionError::Internal(format!(
"Can not run arithmetic negative on scalar value {value:?}"
))),
Expand Down Expand Up @@ -2154,6 +2182,7 @@ impl ScalarValue {
ScalarValue::Float32(v) => v.is_none(),
ScalarValue::Float64(v) => v.is_none(),
ScalarValue::Decimal128(v, _, _) => v.is_none(),
ScalarValue::Decimal256(v, _, _) => v.is_none(),
ScalarValue::Int8(v) => v.is_none(),
ScalarValue::Int16(v) => v.is_none(),
ScalarValue::Int32(v) => v.is_none(),
Expand Down Expand Up @@ -2415,10 +2444,10 @@ impl ScalarValue {
ScalarValue::iter_to_decimal_array(scalars, *precision, *scale)?;
Arc::new(decimal_array)
}
DataType::Decimal256(_, _) => {
return Err(DataFusionError::Internal(
"Decimal256 is not supported for ScalarValue".to_string(),
));
DataType::Decimal256(precision, scale) => {
let decimal_array =
ScalarValue::iter_to_decimal256_array(scalars, *precision, *scale)?;
Arc::new(decimal_array)
}
DataType::Null => ScalarValue::iter_to_null_array(scalars),
DataType::Boolean => build_array_primitive!(BooleanArray, Boolean),
Expand Down Expand Up @@ -2680,6 +2709,22 @@ impl ScalarValue {
Ok(array)
}

fn iter_to_decimal256_array(
scalars: impl IntoIterator<Item = ScalarValue>,
precision: u8,
scale: i8,
) -> Result<Decimal256Array> {
let array = scalars
.into_iter()
.map(|element: ScalarValue| match element {
ScalarValue::Decimal256(v1, _, _) => v1,
_ => unreachable!(),
})
.collect::<Decimal256Array>()
.with_precision_and_scale(precision, scale)?;
Ok(array)
}

fn iter_to_array_list(
scalars: impl IntoIterator<Item = ScalarValue>,
data_type: &DataType,
Expand Down Expand Up @@ -2764,12 +2809,28 @@ impl ScalarValue {
}
}

fn build_decimal256_array(
value: Option<i256>,
precision: u8,
scale: i8,
size: usize,
) -> Decimal256Array {
std::iter::repeat(value)
.take(size)
.collect::<Decimal256Array>()
.with_precision_and_scale(precision, scale)
.unwrap()
}

/// Converts a scalar value into an array of `size` rows.
pub fn to_array_of_size(&self, size: usize) -> ArrayRef {
match self {
ScalarValue::Decimal128(e, precision, scale) => Arc::new(
ScalarValue::build_decimal_array(*e, *precision, *scale, size),
),
ScalarValue::Decimal256(e, precision, scale) => Arc::new(
ScalarValue::build_decimal256_array(*e, *precision, *scale, size),
),
ScalarValue::Boolean(e) => {
Arc::new(BooleanArray::from(vec![*e; size])) as ArrayRef
}
Expand Down Expand Up @@ -3044,12 +3105,28 @@ impl ScalarValue {
precision: u8,
scale: i8,
) -> Result<ScalarValue> {
let array = as_decimal128_array(array)?;
if array.is_null(index) {
Ok(ScalarValue::Decimal128(None, precision, scale))
} else {
let value = array.value(index);
Ok(ScalarValue::Decimal128(Some(value), precision, scale))
match array.data_type() {
DataType::Decimal128(_, _) => {
let array = as_decimal128_array(array)?;
if array.is_null(index) {
Ok(ScalarValue::Decimal128(None, precision, scale))
} else {
let value = array.value(index);
Ok(ScalarValue::Decimal128(Some(value), precision, scale))
}
}
DataType::Decimal256(_, _) => {
let array = as_decimal256_array(array)?;
if array.is_null(index) {
Ok(ScalarValue::Decimal256(None, precision, scale))
} else {
let value = array.value(index);
Ok(ScalarValue::Decimal256(Some(value), precision, scale))
}
}
_ => Err(DataFusionError::Internal(
"Unsupported decimal type".to_string(),
)),
}
}

Expand All @@ -3067,6 +3144,11 @@ impl ScalarValue {
array, index, *precision, *scale,
)?
}
DataType::Decimal256(precision, scale) => {
ScalarValue::get_decimal_value_from_array(
array, index, *precision, *scale,
)?
}
DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean),
DataType::Float64 => typed_cast!(array, index, Float64Array, Float64),
DataType::Float32 => typed_cast!(array, index, Float32Array, Float32),
Expand Down Expand Up @@ -3265,6 +3347,25 @@ impl ScalarValue {
}
}

fn eq_array_decimal256(
array: &ArrayRef,
index: usize,
value: Option<&i256>,
precision: u8,
scale: i8,
) -> Result<bool> {
let array = as_decimal256_array(array)?;
if array.precision() != precision || array.scale() != scale {
return Ok(false);
}
let is_null = array.is_null(index);
if let Some(v) = value {
Ok(!array.is_null(index) && array.value(index) == *v)
} else {
Ok(is_null)
}
}

/// Compares a single row of array @ index for equality with self,
/// in an optimized fashion.
///
Expand Down Expand Up @@ -3294,6 +3395,16 @@ impl ScalarValue {
)
.unwrap()
}
ScalarValue::Decimal256(v, precision, scale) => {
ScalarValue::eq_array_decimal256(
array,
index,
v.as_ref(),
*precision,
*scale,
)
.unwrap()
}
ScalarValue::Boolean(val) => {
eq_array_primitive!(array, index, BooleanArray, val)
}
Expand Down Expand Up @@ -3416,6 +3527,7 @@ impl ScalarValue {
| ScalarValue::Float32(_)
| ScalarValue::Float64(_)
| ScalarValue::Decimal128(_, _, _)
| ScalarValue::Decimal256(_, _, _)
| ScalarValue::Int8(_)
| ScalarValue::Int16(_)
| ScalarValue::Int32(_)
Expand Down Expand Up @@ -3647,6 +3759,22 @@ impl TryFrom<ScalarValue> for i128 {
}
}

// special implementation for i256 because of Decimal128
impl TryFrom<ScalarValue> for i256 {
type Error = DataFusionError;

fn try_from(value: ScalarValue) -> Result<Self> {
match value {
ScalarValue::Decimal256(Some(inner_value), _, _) => Ok(inner_value),
_ => Err(DataFusionError::Internal(format!(
"Cannot convert {:?} to {}",
value,
std::any::type_name::<Self>()
))),
}
}
}

impl_try_from!(UInt8, u8);
impl_try_from!(UInt16, u16);
impl_try_from!(UInt32, u32);
Expand Down Expand Up @@ -3684,6 +3812,9 @@ impl TryFrom<&DataType> for ScalarValue {
DataType::Decimal128(precision, scale) => {
ScalarValue::Decimal128(None, *precision, *scale)
}
DataType::Decimal256(precision, scale) => {
ScalarValue::Decimal256(None, *precision, *scale)
}
DataType::Utf8 => ScalarValue::Utf8(None),
DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
DataType::Binary => ScalarValue::Binary(None),
Expand Down Expand Up @@ -3753,6 +3884,9 @@ impl fmt::Display for ScalarValue {
ScalarValue::Decimal128(v, p, s) => {
write!(f, "{v:?},{p:?},{s:?}")?;
}
ScalarValue::Decimal256(v, p, s) => {
write!(f, "{v:?},{p:?},{s:?}")?;
}
ScalarValue::Boolean(e) => format_option!(f, e)?,
ScalarValue::Float32(e) => format_option!(f, e)?,
ScalarValue::Float64(e) => format_option!(f, e)?,
Expand Down Expand Up @@ -3830,6 +3964,7 @@ impl fmt::Debug for ScalarValue {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ScalarValue::Decimal128(_, _, _) => write!(f, "Decimal128({self})"),
ScalarValue::Decimal256(_, _, _) => write!(f, "Decimal256({self})"),
ScalarValue::Boolean(_) => write!(f, "Boolean({self})"),
ScalarValue::Float32(_) => write!(f, "Float32({self})"),
ScalarValue::Float64(_) => write!(f, "Float64({self})"),
Expand Down
Loading

0 comments on commit 4d6ff35

Please sign in to comment.