Skip to content

Commit

Permalink
improve error messages while downcasting Int32Array (#4146)
Browse files Browse the repository at this point in the history
  • Loading branch information
retikulum authored Nov 8, 2022
1 parent 18c6a0b commit b58ec81
Show file tree
Hide file tree
Showing 14 changed files with 51 additions and 85 deletions.
6 changes: 3 additions & 3 deletions benchmarks/src/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use arrow::array::{
Array, ArrayRef, Decimal128Array, Float64Array, Int32Array, Int64Array, StringArray,
Array, ArrayRef, Decimal128Array, Float64Array, Int64Array, StringArray,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand All @@ -26,7 +26,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Instant;

use datafusion::common::cast::as_date32_array;
use datafusion::common::cast::{as_date32_array, as_int32_array};
use datafusion::common::ScalarValue;
use datafusion::logical_expr::Cast;
use datafusion::prelude::*;
Expand Down Expand Up @@ -424,7 +424,7 @@ fn col_to_scalar(column: &ArrayRef, row_index: usize) -> ScalarValue {
}
match column.data_type() {
DataType::Int32 => {
let array = column.as_any().downcast_ref::<Int32Array>().unwrap();
let array = as_int32_array(column).unwrap();
ScalarValue::Int32(Some(array.value(row_index)))
}
DataType::Int64 => {
Expand Down
12 changes: 11 additions & 1 deletion datafusion/common/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! kernels in arrow-rs such as `as_boolean_array` do.

use crate::DataFusionError;
use arrow::array::{Array, Date32Array, StructArray};
use arrow::array::{Array, Date32Array, Int32Array, StructArray};

// Downcast ArrayRef to Date32Array
pub fn as_date32_array(array: &dyn Array) -> Result<&Date32Array, DataFusionError> {
Expand All @@ -42,3 +42,13 @@ pub fn as_struct_array(array: &dyn Array) -> Result<&StructArray, DataFusionErro
))
})
}

// Downcast ArrayRef to Int32Array
pub fn as_int32_array(array: &dyn Array) -> Result<&Int32Array, DataFusionError> {
array.as_any().downcast_ref::<Int32Array>().ok_or_else(|| {
DataFusionError::Internal(format!(
"Expected a Int32Array, got: {}",
array.data_type()
))
})
}
9 changes: 3 additions & 6 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,9 @@ mod test {
use crate::arrow::array::Array;
use crate::arrow::datatypes::{Field, TimeUnit};
use crate::avro_to_arrow::{Reader, ReaderBuilder};
use arrow::array::{Int32Array, Int64Array, ListArray, TimestampMicrosecondArray};
use arrow::array::{Int64Array, ListArray, TimestampMicrosecondArray};
use arrow::datatypes::DataType;
use datafusion_common::cast::as_int32_array;
use std::fs::File;

fn build_reader(name: &str, batch_size: usize) -> Reader<File> {
Expand Down Expand Up @@ -1080,11 +1081,7 @@ mod test {
num_batches += 1;
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let a_array = batch
.column(col_id_index)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let a_array = as_int32_array(batch.column(col_id_index)).unwrap();
sum_id += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i32>();
}
assert_eq!(8, sum_num_rows);
Expand Down
10 changes: 3 additions & 7 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ mod tests {
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampMicrosecondArray,
BinaryArray, BooleanArray, Float32Array, Float64Array, TimestampMicrosecondArray,
};
use datafusion_common::cast::as_int32_array;
use futures::StreamExt;

#[tokio::test]
Expand Down Expand Up @@ -229,11 +229,7 @@ mod tests {
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());

let array = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let array = as_int32_array(batches[0].column(0))?;
let mut values: Vec<i32> = vec![];
for i in 0..batches[0].num_rows() {
values.push(array.value(i));
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,11 +587,12 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array,
Int32Array, StringArray, TimestampNanosecondArray,
StringArray, TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::cast::as_int32_array;
use datafusion_common::ScalarValue;
use futures::stream::BoxStream;
use futures::StreamExt;
Expand Down Expand Up @@ -975,11 +976,7 @@ mod tests {
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());

let array = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let array = as_int32_array(batches[0].column(0))?;
let mut values: Vec<i32> = vec![];
for i in 0..batches[0].num_rows() {
values.push(array.value(i));
Expand Down
29 changes: 6 additions & 23 deletions datafusion/core/tests/sql/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::{
execution::registry::FunctionRegistry,
physical_plan::{expressions::AvgAccumulator, functions::make_scalar_function},
};
use datafusion_common::cast::as_int32_array;
use datafusion_expr::{create_udaf, LogicalPlanBuilder};

/// test that casting happens on udfs.
Expand Down Expand Up @@ -57,14 +58,8 @@ async fn scalar_udf() -> Result<()> {
ctx.register_batch("t", batch)?;

let myfunc = |args: &[ArrayRef]| {
let l = &args[0]
.as_any()
.downcast_ref::<Int32Array>()
.expect("cast failed");
let r = &args[1]
.as_any()
.downcast_ref::<Int32Array>()
.expect("cast failed");
let l = as_int32_array(&args[0])?;
let r = as_int32_array(&args[1])?;
Ok(Arc::new(add(l, r)?) as ArrayRef)
};
let myfunc = make_scalar_function(myfunc);
Expand Down Expand Up @@ -113,21 +108,9 @@ async fn scalar_udf() -> Result<()> {
assert_batches_eq!(expected, &result);

let batch = &result[0];
let a = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.expect("failed to cast a");
let b = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.expect("failed to cast b");
let sum = batch
.column(2)
.as_any()
.downcast_ref::<Int32Array>()
.expect("failed to cast sum");
let a = as_int32_array(batch.column(0))?;
let b = as_int32_array(batch.column(1))?;
let sum = as_int32_array(batch.column(2))?;

assert_eq!(4, a.len());
assert_eq!(4, b.len());
Expand Down
12 changes: 3 additions & 9 deletions datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ mod tests {
datatypes::{DataType, Field, Schema},
};
use chrono::{DateTime, TimeZone, Utc};
use datafusion_common::{DFField, ToDFSchema};
use datafusion_common::{cast::as_int32_array, DFField, ToDFSchema};
use datafusion_expr::*;
use datafusion_physical_expr::{
execution_props::ExecutionProps, functions::make_scalar_function,
Expand Down Expand Up @@ -891,14 +891,8 @@ mod tests {
let return_type = Arc::new(DataType::Int32);

let fun = |args: &[ArrayRef]| {
let arg0 = &args[0]
.as_any()
.downcast_ref::<Int32Array>()
.expect("cast failed");
let arg1 = &args[1]
.as_any()
.downcast_ref::<Int32Array>()
.expect("cast failed");
let arg0 = as_int32_array(&args[0])?;
let arg1 = as_int32_array(&args[1])?;

// 2. perform the computation
let array = arg0
Expand Down
21 changes: 5 additions & 16 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ mod tests {
use arrow::buffer::Buffer;
use arrow::datatypes::DataType::Float64;
use arrow::datatypes::*;
use datafusion_common::cast::as_int32_array;
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_expr::Operator;
Expand All @@ -417,10 +418,7 @@ mod tests {
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = result
.as_any()
.downcast_ref::<Int32Array>()
.expect("failed to downcast to Int32Array");
let result = as_int32_array(&result)?;

let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]);

Expand Down Expand Up @@ -448,10 +446,7 @@ mod tests {
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = result
.as_any()
.downcast_ref::<Int32Array>()
.expect("failed to downcast to Int32Array");
let result = as_int32_array(&result)?;

let expected =
&Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]);
Expand Down Expand Up @@ -523,10 +518,7 @@ mod tests {
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = result
.as_any()
.downcast_ref::<Int32Array>()
.expect("failed to downcast to Int32Array");
let result = as_int32_array(&result)?;

let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]);

Expand Down Expand Up @@ -605,10 +597,7 @@ mod tests {
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = result
.as_any()
.downcast_ref::<Int32Array>()
.expect("failed to downcast to Int32Array");
let result = as_int32_array(&result)?;

let expected =
&Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ mod tests {
use super::*;
use arrow::array::Int32Array;
use arrow::datatypes::*;
use datafusion_common::cast::as_int32_array;
use datafusion_common::Result;

#[test]
Expand All @@ -144,7 +145,7 @@ mod tests {
assert_eq!("42", format!("{}", literal_expr));

let literal_array = literal_expr.evaluate(&batch)?.into_array(batch.num_rows());
let literal_array = literal_array.as_any().downcast_ref::<Int32Array>().unwrap();
let literal_array = as_int32_array(&literal_array)?;

// note that the contents of the literal array are unrelated to the batch contents except for the length of the array
assert_eq!(literal_array.len(), 5); // 5 rows in the batch
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ mod tests {

use super::*;
use arrow::array::Int32Array;
use datafusion_common::Result;
use datafusion_common::{cast::as_int32_array, Result};

#[test]
fn scatter_int() -> Result<()> {
Expand All @@ -235,7 +235,7 @@ mod tests {
let expected =
Int32Array::from_iter(vec![Some(1), Some(10), None, None, Some(11)]);
let result = scatter(&mask, truthy.as_ref())?;
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
let result = as_int32_array(&result)?;

assert_eq!(&expected, result);
Ok(())
Expand All @@ -250,7 +250,7 @@ mod tests {
let expected =
Int32Array::from_iter(vec![Some(1), None, Some(10), None, None, None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
let result = as_int32_array(&result)?;

assert_eq!(&expected, result);
Ok(())
Expand All @@ -266,7 +266,7 @@ mod tests {
// output should treat nulls as though they are false
let expected = Int32Array::from_iter(vec![None, None, Some(1), Some(10), None]);
let result = scatter(&mask, truthy.as_ref())?;
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
let result = as_int32_array(&result)?;

assert_eq!(&expected, result);
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ mod tests {
use crate::expressions::Column;
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_common::cast::as_int32_array;
use datafusion_common::Result;

fn test_i32_result(expr: WindowShift, expected: Int32Array) -> Result<()> {
Expand All @@ -191,7 +192,7 @@ mod tests {
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr.create_evaluator(&batch)?.evaluate(vec![0..8])?;
assert_eq!(1, result.len());
let result = result[0].as_any().downcast_ref::<Int32Array>().unwrap();
let result = as_int32_array(&result[0])?;
assert_eq!(expected, *result);
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ mod tests {
use crate::expressions::Column;
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_common::cast::as_int32_array;
use datafusion_common::Result;

fn test_i32_result(expr: NthValue, expected: Int32Array) -> Result<()> {
Expand All @@ -194,7 +195,7 @@ mod tests {
.into_iter()
.collect::<Result<Vec<ScalarValue>>>()?;
let result = ScalarValue::iter_to_array(result.into_iter())?;
let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
let result = as_int32_array(&result)?;
assert_eq!(expected, *result);
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ edition = "2021"

[dependencies]
arrow = { version = "26.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../datafusion/common", version = "14.0.0" }
env_logger = "0.9.0"
rand = "0.8"
10 changes: 3 additions & 7 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

//! Common functions used for testing
use arrow::{array::Int32Array, record_batch::RecordBatch};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_int32_array;
use rand::prelude::StdRng;
use rand::Rng;

Expand All @@ -32,12 +33,7 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
.iter()
.flat_map(|batch| {
assert_eq!(batch.num_columns(), 1);
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
as_int32_array(batch.column(0)).unwrap().iter()
})
.collect()
}
Expand Down

0 comments on commit b58ec81

Please sign in to comment.