Skip to content

Commit

Permalink
chore: remove panics in datafusion-common::scalar (#7901)
Browse files Browse the repository at this point in the history
  • Loading branch information
junjunjd authored Nov 11, 2023
1 parent ceb09b2 commit e642cc2
Show file tree
Hide file tree
Showing 54 changed files with 723 additions and 427 deletions.
2 changes: 1 addition & 1 deletion datafusion/common/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl FromPyArrow for ScalarValue {

impl ToPyArrow for ScalarValue {
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let array = self.to_array();
let array = self.to_array()?;
// convert to pyarrow array using C data interface
let pyarray = array.to_data().to_pyarrow(py)?;
let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?;
Expand Down
598 changes: 351 additions & 247 deletions datafusion/common/src/scalar.rs

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion datafusion/core/benches/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("to_array_of_size 100000", |b| {
let scalar = ScalarValue::Int32(Some(100));

b.iter(|| assert_eq!(scalar.to_array_of_size(100000).null_count(), 0))
b.iter(|| {
assert_eq!(
scalar
.to_array_of_size(100000)
.expect("Failed to convert to array of size")
.null_count(),
0
)
})
});
}

Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ async fn prune_partitions(
// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &schema, &props).ok()?;
Some(expr.evaluate(&batch).ok()?.into_array(partitions.len()))
expr.evaluate(&batch)
.ok()?
.into_array(partitions.len())
.ok()
};

//.Compute the conjunction of the filters, ignoring errors
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl PartitionColumnProjector {
&mut self.key_buffer_cache,
partition_value.as_ref(),
file_batch.num_rows(),
),
)?,
)
}

Expand Down Expand Up @@ -396,11 +396,11 @@ fn create_dict_array<T>(
dict_val: &ScalarValue,
len: usize,
data_type: DataType,
) -> ArrayRef
) -> Result<ArrayRef>
where
T: ArrowNativeType,
{
let dict_vals = dict_val.to_array();
let dict_vals = dict_val.to_array()?;

let sliced_key_buffer = buffer_gen.get_buffer(len);

Expand All @@ -409,16 +409,16 @@ where
.len(len)
.add_buffer(sliced_key_buffer);
builder = builder.add_child_data(dict_vals.to_data());
Arc::new(DictionaryArray::<UInt16Type>::from(
Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
builder.build().unwrap(),
))
)))
}

fn create_output_array(
key_buffer_cache: &mut ZeroBufferGenerators,
val: &ScalarValue,
len: usize,
) -> ArrayRef {
) -> Result<ArrayRef> {
if let ScalarValue::Dictionary(key_type, dict_val) = &val {
match key_type.as_ref() {
DataType::Int8 => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
match self
.physical_expr
.evaluate(&batch)
.map(|v| v.into_array(batch.num_rows()))
.and_then(|v| v.into_array(batch.num_rows()))
{
Ok(array) => {
let bool_arr = as_boolean_array(&array)?.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ macro_rules! get_min_max_values {
.flatten()
// column either didn't have statistics at all or didn't have min/max values
.or_else(|| Some(null_scalar.clone()))
.map(|s| s.to_array())
.and_then(|s| s.to_array().ok())
}}
}

Expand All @@ -425,7 +425,7 @@ macro_rules! get_null_count_values {
},
);

Some(value.to_array())
value.to_array().ok()
}};
}

Expand Down
14 changes: 9 additions & 5 deletions datafusion/expr/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use datafusion_common::{Result, ScalarValue};
use std::sync::Arc;

/// Represents the result of evaluating an expression: either a single
Expand All @@ -47,11 +47,15 @@ impl ColumnarValue {

/// Convert a columnar value into an ArrayRef. [`Self::Scalar`] is
/// converted by repeating the same scalar multiple times.
pub fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
///
/// # Errors
///
/// Errors if `self` is a Scalar that fails to be converted into an array of size
pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
Ok(match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
})
}

/// null columnar values are implemented as a null array in order to pass batch
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/window_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl WindowAggState {
}

pub fn new(out_type: &DataType) -> Result<Self> {
let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0);
let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0)?;
Ok(Self {
window_frame_range: Range { start: 0, end: 0 },
window_frame_ctx: None,
Expand Down
8 changes: 6 additions & 2 deletions datafusion/optimizer/src/unwrap_cast_in_comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1089,8 +1089,12 @@ mod tests {
// Verify that calling the arrow
// cast kernel yields the same results
// input array
let literal_array = literal.to_array_of_size(1);
let expected_array = expected_value.to_array_of_size(1);
let literal_array = literal
.to_array_of_size(1)
.expect("Failed to convert to array of size");
let expected_array = expected_value
.to_array_of_size(1)
.expect("Failed to convert to array of size");
let cast_array = cast_with_options(
&literal_array,
&target_type,
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-expr/src/aggregate/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,17 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-expr/src/aggregate/covariance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,13 +754,17 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
10 changes: 8 additions & 2 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,10 @@ mod tests {
let mut states = vec![];

for idx in 0..state1.len() {
states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
states.push(concat(&[
&state1[idx].to_array()?,
&state2[idx].to_array()?,
])?);
}

let mut first_accumulator =
Expand All @@ -614,7 +617,10 @@ mod tests {
let mut states = vec![];

for idx in 0..state1.len() {
states.push(concat(&[&state1[idx].to_array(), &state2[idx].to_array()])?);
states.push(concat(&[
&state1[idx].to_array()?,
&state2[idx].to_array()?,
])?);
}

let mut last_accumulator =
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-expr/src/aggregate/stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,13 +445,17 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ use std::sync::Arc;
pub fn get_accum_scalar_values_as_arrays(
accum: &dyn Accumulator,
) -> Result<Vec<ArrayRef>> {
Ok(accum
accum
.state()?
.iter()
.map(|s| s.to_array_of_size(1))
.collect::<Vec<_>>())
.collect::<Result<Vec<_>>>()
}

/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow
Expand Down
12 changes: 8 additions & 4 deletions datafusion/physical-expr/src/aggregate/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,13 +519,17 @@ mod tests {

let values1 = expr1
.iter()
.map(|e| e.evaluate(batch1))
.map(|r| r.map(|v| v.into_array(batch1.num_rows())))
.map(|e| {
e.evaluate(batch1)
.and_then(|v| v.into_array(batch1.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
let values2 = expr2
.iter()
.map(|e| e.evaluate(batch2))
.map(|r| r.map(|v| v.into_array(batch2.num_rows())))
.map(|e| {
e.evaluate(batch2)
.and_then(|v| v.into_array(batch2.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/conditional_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if value.is_null() {
continue;
} else {
let last_value = value.to_array_of_size(size);
let last_value = value.to_array_of_size(size)?;
current_value =
zip(&remainder, &last_value, current_value.as_ref())?;
break;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {

let array = match array {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};

let arr = match date_part.to_lowercase().as_str() {
Expand Down
Loading

0 comments on commit e642cc2

Please sign in to comment.