From ac99ed9e404070e881efa151f6553ab526d801fb Mon Sep 17 00:00:00 2001 From: retikulum Date: Thu, 17 Nov 2022 23:31:29 +0300 Subject: [PATCH] improve error messages while downcasting uint and boolean array --- datafusion/common/src/cast.rs | 37 ++++++++++++++++++- datafusion/common/src/scalar.rs | 24 ++++++------ .../core/src/datasource/file_format/avro.rs | 12 +++--- .../src/datasource/file_format/parquet.rs | 12 +++--- .../core/src/datasource/listing/helpers.rs | 13 +++---- .../file_format/parquet/row_filter.rs | 18 ++++----- .../core/src/physical_plan/joins/hash_join.rs | 8 ++-- .../physical-expr/src/expressions/binary.rs | 28 +++++--------- .../physical-expr/src/expressions/case.rs | 7 +--- .../physical-expr/src/expressions/in_list.rs | 13 ++++--- .../src/expressions/is_not_null.rs | 7 ++-- .../physical-expr/src/expressions/is_null.rs | 7 ++-- .../physical-expr/src/expressions/not.rs | 22 +++-------- .../physical-expr/src/expressions/nullif.rs | 11 ++---- datafusion/physical-expr/src/functions.rs | 15 +++----- datafusion/physical-expr/src/hash_utils.rs | 7 +++- datafusion/physical-expr/src/physical_expr.rs | 7 +++- datafusion/physical-expr/src/window/rank.rs | 4 +- .../physical-expr/src/window/row_number.rs | 6 +-- 19 files changed, 128 insertions(+), 130 deletions(-) diff --git a/datafusion/common/src/cast.rs b/datafusion/common/src/cast.rs index 79327406080e..9401689474bb 100644 --- a/datafusion/common/src/cast.rs +++ b/datafusion/common/src/cast.rs @@ -22,8 +22,8 @@ use crate::DataFusionError; use arrow::array::{ - Array, Date32Array, Decimal128Array, Float32Array, Float64Array, Int32Array, - Int64Array, StringArray, StructArray, + Array, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array, }; // Downcast ArrayRef to Date32Array @@ -116,3 +116,36 @@ pub fn as_string_array(array: &dyn Array) -> Result<&StringArray, DataFusionErro )) }) } + +// Downcast ArrayRef to UInt32Array +pub fn as_uint32_array(array: &dyn Array) -> Result<&UInt32Array, DataFusionError> { + array.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Internal(format!( + "Expected a UInt32Array, got: {}", + array.data_type() + )) + }) +} + +// Downcast ArrayRef to UInt64Array +pub fn as_uint64_array(array: &dyn Array) -> Result<&UInt64Array, DataFusionError> { + array.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Internal(format!( + "Expected a UInt64Array, got: {}", + array.data_type() + )) + }) +} + +// Downcast ArrayRef to BooleanArray +pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray, DataFusionError> { + array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Expected a BooleanArray, got: {}", + array.data_type() + )) + }) +} diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 71cce82b2307..36ea78580d14 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2657,7 +2657,7 @@ mod tests { use arrow::compute::kernels; use arrow::datatypes::ArrowPrimitiveType; - use crate::cast::as_string_array; + use crate::cast::{as_string_array, as_uint32_array, as_uint64_array}; use crate::from_slice::FromSlice; use super::*; @@ -2792,35 +2792,37 @@ mod tests { } #[test] - fn scalar_value_to_array_u64() { + fn scalar_value_to_array_u64() -> Result<()> { let value = ScalarValue::UInt64(Some(13u64)); let array = value.to_array(); - let array = array.as_any().downcast_ref::().unwrap(); + let array = as_uint64_array(&array)?; assert_eq!(array.len(), 1); assert!(!array.is_null(0)); assert_eq!(array.value(0), 13); let value = ScalarValue::UInt64(None); let array = value.to_array(); - let array = array.as_any().downcast_ref::().unwrap(); + let array = as_uint64_array(&array)?; assert_eq!(array.len(), 1); assert!(array.is_null(0)); + Ok(()) } #[test] - fn scalar_value_to_array_u32() { + fn scalar_value_to_array_u32() -> Result<()> { let value = ScalarValue::UInt32(Some(13u32)); let array = value.to_array(); - let array = array.as_any().downcast_ref::().unwrap(); + let array = as_uint32_array(&array)?; assert_eq!(array.len(), 1); assert!(!array.is_null(0)); assert_eq!(array.value(0), 13); let value = ScalarValue::UInt32(None); let array = value.to_array(); - let array = array.as_any().downcast_ref::().unwrap(); + let array = as_uint32_array(&array)?; assert_eq!(array.len(), 1); assert!(array.is_null(0)); + Ok(()) } #[test] @@ -2838,7 +2840,7 @@ mod tests { } #[test] - fn scalar_list_to_array() { + fn scalar_list_to_array() -> Result<()> { let list_array_ref = ScalarValue::List( Some(vec![ ScalarValue::UInt64(Some(100)), @@ -2854,14 +2856,12 @@ mod tests { assert_eq!(list_array.values().len(), 3); let prim_array_ref = list_array.value(0); - let prim_array = prim_array_ref - .as_any() - .downcast_ref::() - .unwrap(); + let prim_array = as_uint64_array(&prim_array_ref)?; assert_eq!(prim_array.len(), 3); assert_eq!(prim_array.value(0), 100); assert!(prim_array.is_null(1)); assert_eq!(prim_array.value(2), 101); + Ok(()) } /// Creates array directly and via ScalarValue and ensures they are the same diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index cb1b29fc3940..c4dbf873b717 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -92,8 +92,10 @@ mod tests { use crate::datasource::file_format::test_util::scan_format; use crate::physical_plan::collect; use crate::prelude::{SessionConfig, SessionContext}; - use arrow::array::{BinaryArray, BooleanArray, TimestampMicrosecondArray}; - use datafusion_common::cast::{as_float32_array, as_float64_array, as_int32_array}; + use arrow::array::{BinaryArray, TimestampMicrosecondArray}; + use datafusion_common::cast::{ + as_boolean_array, as_float32_array, as_float64_array, as_int32_array, + }; use futures::StreamExt; #[tokio::test] @@ -197,11 +199,7 @@ mod tests { assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); - let array = batches[0] - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); + let array = as_boolean_array(batches[0].column(0))?; let mut values: Vec = vec![]; for i in 0..batches[0].num_rows() { values.push(array.value(i)); diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b579e838482f..715a29190cf2 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -586,12 +586,14 @@ mod tests { use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{ - Array, ArrayRef, BinaryArray, BooleanArray, StringArray, TimestampNanosecondArray, + Array, ArrayRef, BinaryArray, StringArray, TimestampNanosecondArray, }; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use bytes::Bytes; - use datafusion_common::cast::{as_float32_array, as_float64_array, as_int32_array}; + use datafusion_common::cast::{ + as_boolean_array, as_float32_array, as_float64_array, as_int32_array, + }; use datafusion_common::ScalarValue; use futures::stream::BoxStream; use futures::StreamExt; @@ -945,11 +947,7 @@ mod tests { assert_eq!(1, batches[0].num_columns()); assert_eq!(8, batches[0].num_rows()); - let array = batches[0] - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); + let array = as_boolean_array(batches[0].column(0))?; let mut values: Vec = vec![]; for i in 0..batches[0].num_rows() { values.push(array.value(i)); diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 2fdae289fe15..f1a34e665a60 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use arrow::{ array::{ Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringBuilder, - UInt64Array, UInt64Builder, + UInt64Builder, }, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, @@ -38,7 +38,10 @@ use crate::{ use super::PartitionedFile; use crate::datasource::listing::ListingTableUrl; -use datafusion_common::{cast::as_string_array, Column, DataFusionError}; +use datafusion_common::{ + cast::{as_string_array, as_uint64_array}, + Column, DataFusionError, +}; use datafusion_expr::{ expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, Expr, Volatility, @@ -300,11 +303,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result> { .iter() .flat_map(|batch| { let key_array = as_string_array(batch.column(0)).unwrap(); - let length_array = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); + let length_array = as_uint64_array(batch.column(1)).unwrap(); let modified_array = batch .column(2) .as_any() diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs index 353162c7f686..782c3f5d8eca 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs @@ -19,6 +19,7 @@ use arrow::array::{Array, BooleanArray}; use arrow::datatypes::{DataType, Schema}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; +use datafusion_common::cast::as_boolean_array; use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema}; use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}; @@ -133,17 +134,12 @@ impl ArrowPredicate for DatafusionArrowPredicate { .map(|v| v.into_array(batch.num_rows())) { Ok(array) => { - if let Some(mask) = array.as_any().downcast_ref::() { - let bool_arr = BooleanArray::from(mask.data().clone()); - let num_filtered = bool_arr.len() - bool_arr.true_count(); - self.rows_filtered.add(num_filtered); - timer.stop(); - Ok(bool_arr) - } else { - Err(ArrowError::ComputeError( - "Unexpected result of predicate evaluation, expected BooleanArray".to_owned(), - )) - } + let mask = as_boolean_array(&array)?; + let bool_arr = BooleanArray::from(mask.data().clone()); + let num_filtered = bool_arr.len() - bool_arr.true_count(); + self.rows_filtered.add(num_filtered); + timer.stop(); + Ok(bool_arr) } Err(e) => Err(ArrowError::ComputeError(format!( "Error evaluating filter predicate: {:?}", diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index f0974c6a9c2d..86448ff58b2f 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -40,7 +40,7 @@ use std::{time::Instant, vec}; use futures::{ready, Stream, StreamExt, TryStreamExt}; -use arrow::array::{as_boolean_array, new_null_array, Array}; +use arrow::array::{new_null_array, Array}; use arrow::datatypes::{ArrowNativeType, DataType}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -52,7 +52,7 @@ use arrow::array::{ UInt8Array, }; -use datafusion_common::cast::as_string_array; +use datafusion_common::cast::{as_boolean_array, as_string_array}; use hashbrown::raw::RawTable; @@ -1027,7 +1027,7 @@ fn apply_join_filter( .expression() .evaluate(&intermediate_batch)? .into_array(intermediate_batch.num_rows()); - let mask = as_boolean_array(&filter_result); + let mask = as_boolean_array(&filter_result)?; let left_filtered = PrimitiveArray::::from( compute::filter(&left_indices, mask)?.data().clone(), @@ -1050,7 +1050,7 @@ fn apply_join_filter( .expression() .evaluate_selection(&intermediate_batch, &has_match)? .into_array(intermediate_batch.num_rows()); - let mask = as_boolean_array(&filter_result); + let mask = as_boolean_array(&filter_result)?; let mut left_rebuilt = UInt64Builder::with_capacity(0); let mut right_rebuilt = UInt32Builder::with_capacity(0); diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index df33cd0c9e68..e067e34b556a 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -75,7 +75,7 @@ use arrow::record_batch::RecordBatch; use crate::physical_expr::down_cast_any_ref; use crate::{AnalysisContext, ExprBoundaries, PhysicalExpr}; -use datafusion_common::cast::as_decimal128_array; +use datafusion_common::cast::{as_boolean_array, as_decimal128_array}; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::type_coercion::binary::binary_operator_data_type; @@ -472,14 +472,8 @@ macro_rules! binary_array_op { /// Invoke a boolean kernel on a pair of arrays macro_rules! boolean_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ - let ll = $LEFT - .as_any() - .downcast_ref::() - .expect("boolean_op failed to downcast array"); - let rr = $RIGHT - .as_any() - .downcast_ref::() - .expect("boolean_op failed to downcast array"); + let ll = as_boolean_array($LEFT).expect("boolean_op failed to downcast array"); + let rr = as_boolean_array($RIGHT).expect("boolean_op failed to downcast array"); Ok(Arc::new($OP(&ll, &rr)?)) }}; } @@ -1003,7 +997,7 @@ impl BinaryExpr { Operator::Modulo => binary_primitive_array_op!(left, right, modulus), Operator::And => { if left_data_type == &DataType::Boolean { - boolean_op!(left, right, and_kleene) + boolean_op!(&left, &right, and_kleene) } else { Err(DataFusionError::Internal(format!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", @@ -1015,7 +1009,7 @@ impl BinaryExpr { } Operator::Or => { if left_data_type == &DataType::Boolean { - boolean_op!(left, right, or_kleene) + boolean_op!(&left, &right, or_kleene) } else { Err(DataFusionError::Internal(format!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", @@ -1110,10 +1104,8 @@ mod tests { assert_eq!(result.len(), 5); let expected = vec![false, false, true, true, true]; - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast to BooleanArray"); + let result = + as_boolean_array(&result).expect("failed to downcast to BooleanArray"); for (i, &expected_item) in expected.iter().enumerate().take(5) { assert_eq!(result.value(i), expected_item); } @@ -1156,10 +1148,8 @@ mod tests { assert_eq!(result.len(), 5); let expected = vec![true, true, false, true, false]; - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast to BooleanArray"); + let result = + as_boolean_array(&result).expect("failed to downcast to BooleanArray"); for (i, &expected_item) in expected.iter().enumerate().take(5) { assert_eq!(result.value(i), expected_item); } diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index af3dfba7e5ac..ddf1d6bae81f 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -26,7 +26,7 @@ use arrow::compute::kernels::zip::zip; use arrow::compute::{and, eq_dyn, is_null, not, or, or_kleene}; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{cast::as_boolean_array, DataFusionError, Result}; use datafusion_expr::ColumnarValue; use itertools::Itertools; @@ -195,10 +195,7 @@ impl CaseExpr { _ => when_value, }; let when_value = when_value.into_array(batch.num_rows()); - let when_value = when_value - .as_ref() - .as_any() - .downcast_ref::() + let when_value = as_boolean_array(&when_value) .expect("WHEN expression did not return a BooleanArray"); let then_value = self.when_then_expr[i] diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 8aff51d84357..63fe2292a6c4 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -32,7 +32,10 @@ use arrow::datatypes::*; use arrow::record_batch::RecordBatch; use arrow::util::bit_iterator::BitIndexIterator; use arrow::{downcast_dictionary_array, downcast_primitive_array}; -use datafusion_common::{cast::as_string_array, DataFusionError, Result, ScalarValue}; +use datafusion_common::{ + cast::{as_boolean_array, as_string_array}, + DataFusionError, Result, ScalarValue, +}; use datafusion_expr::ColumnarValue; use hashbrown::hash_map::RawEntryMut; use hashbrown::HashMap; @@ -171,7 +174,7 @@ fn make_set(array: &dyn Array) -> Result> { Ok(downcast_primitive_array! { array => Box::new(ArraySet::new(array, make_hash_set(array))), DataType::Boolean => { - let array = as_boolean_array(array); + let array = as_boolean_array(array)?; Box::new(ArraySet::new(array, make_hash_set(array))) }, DataType::Decimal128(_, _) => { @@ -424,10 +427,8 @@ mod tests { let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?; let expr = in_list(cast_expr, cast_list_exprs, $NEGATED, $SCHEMA).unwrap(); let result = expr.evaluate(&$BATCH)?.into_array($BATCH.num_rows()); - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast to BooleanArray"); + let result = + as_boolean_array(&result).expect("failed to downcast to BooleanArray"); let expected = &BooleanArray::from($EXPECTED); assert_eq!(expected, result); }}; diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index b5003ac232c2..32e53e0c1ede 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -115,6 +115,7 @@ mod tests { datatypes::*, record_batch::RecordBatch, }; + use datafusion_common::cast::as_boolean_array; use std::sync::Arc; #[test] @@ -126,10 +127,8 @@ mod tests { // expression: "a is not null" let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast to BooleanArray"); + let result = + as_boolean_array(&result).expect("failed to downcast to BooleanArray"); let expected = &BooleanArray::from(vec![true, false]); diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 976df2bc04c8..85e111440aaf 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -116,6 +116,7 @@ mod tests { datatypes::*, record_batch::RecordBatch, }; + use datafusion_common::cast::as_boolean_array; use std::sync::Arc; #[test] @@ -128,10 +129,8 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?; let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast to BooleanArray"); + let result = + as_boolean_array(&result).expect("failed to downcast to BooleanArray"); let expected = &BooleanArray::from(vec![false, true]); diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 00f1670af7dc..bf935aa97e61 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -23,11 +23,9 @@ use std::sync::Arc; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; -use arrow::array::BooleanArray; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::ScalarValue; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{cast::as_boolean_array, DataFusionError, Result, ScalarValue}; use datafusion_expr::ColumnarValue; /// Not expression @@ -73,15 +71,7 @@ impl PhysicalExpr for NotExpr { let evaluate_arg = self.arg.evaluate(batch)?; match evaluate_arg { ColumnarValue::Array(array) => { - let array = - array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal( - "boolean_op failed to downcast array".to_owned(), - ) - })?; + let array = as_boolean_array(&array)?; Ok(ColumnarValue::Array(Arc::new( arrow::compute::kernels::boolean::not(array)?, ))) @@ -135,7 +125,7 @@ pub fn not(arg: Arc) -> Result> { mod tests { use super::*; use crate::expressions::col; - use arrow::datatypes::*; + use arrow::{array::BooleanArray, datatypes::*}; use datafusion_common::Result; #[test] @@ -153,10 +143,8 @@ mod tests { RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(input)])?; let result = expr.evaluate(&batch)?.into_array(batch.num_rows()); - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast to BooleanArray"); + let result = + as_boolean_array(&result).expect("failed to downcast to BooleanArray"); assert_eq!(result, expected); Ok(()) diff --git a/datafusion/physical-expr/src/expressions/nullif.rs b/datafusion/physical-expr/src/expressions/nullif.rs index 2ef40272a8ee..312246d35929 100644 --- a/datafusion/physical-expr/src/expressions/nullif.rs +++ b/datafusion/physical-expr/src/expressions/nullif.rs @@ -22,7 +22,7 @@ use arrow::array::*; use arrow::compute::eq_dyn; use arrow::compute::kernels::boolean::nullif; use arrow::datatypes::DataType; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{cast::as_boolean_array, DataFusionError, Result}; use datafusion_expr::ColumnarValue; use super::binary::array_eq_scalar; @@ -34,10 +34,7 @@ macro_rules! compute_bool_array_op { .as_any() .downcast_ref::<$DT>() .expect("compute_op failed to downcast array"); - let rr = $RIGHT - .as_any() - .downcast_ref::() - .expect("compute_op failed to downcast array"); + let rr = as_boolean_array($RIGHT).expect("compute_op failed to downcast array"); Ok(Arc::new($OP(&ll, &rr)?) as ArrayRef) }}; } @@ -82,7 +79,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result { (ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => { let cond_array = array_eq_scalar(lhs, rhs)?; - let array = primitive_bool_array_op!(lhs, *cond_array, nullif)?; + let array = primitive_bool_array_op!(lhs, &cond_array, nullif)?; Ok(ColumnarValue::Array(array)) } @@ -91,7 +88,7 @@ pub fn nullif_func(args: &[ColumnarValue]) -> Result { let cond_array = eq_dyn(lhs, rhs)?; // Now, invoke nullif on the result - let array = primitive_bool_array_op!(lhs, cond_array, nullif)?; + let array = primitive_bool_array_op!(lhs, &cond_array, nullif)?; Ok(ColumnarValue::Array(array)) } _ => Err(DataFusionError::NotImplemented( diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 7678e7dc557e..c84ee24c5c10 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -784,6 +784,7 @@ mod tests { datatypes::Field, record_batch::RecordBatch, }; + use datafusion_common::cast::as_uint64_array; use datafusion_common::{Result, ScalarValue}; /// $FUNC function to test @@ -2943,16 +2944,12 @@ mod tests { } fn unpack_uint64_array(col: Result) -> Result> { - match col? { - ColumnarValue::Array(array) => Ok(array - .as_any() - .downcast_ref::() - .unwrap() - .values() - .to_vec()), - ColumnarValue::Scalar(_) => Err(DataFusionError::Internal( + if let ColumnarValue::Array(array) = col? { + Ok(as_uint64_array(&array)?.values().to_vec()) + } else { + Err(DataFusionError::Internal( "Unexpected scalar created by a test function".to_string(), - )), + )) } } diff --git a/datafusion/physical-expr/src/hash_utils.rs b/datafusion/physical-expr/src/hash_utils.rs index 4e697f84f09b..d6cde1e7e49b 100644 --- a/datafusion/physical-expr/src/hash_utils.rs +++ b/datafusion/physical-expr/src/hash_utils.rs @@ -22,7 +22,10 @@ use arrow::array::*; use arrow::datatypes::*; use arrow::{downcast_dictionary_array, downcast_primitive_array}; use arrow_buffer::i256; -use datafusion_common::{cast::as_string_array, DataFusionError, Result}; +use datafusion_common::{ + cast::{as_boolean_array, as_string_array}, + DataFusionError, Result, +}; use std::sync::Arc; // Combines two hashes into one hash @@ -211,7 +214,7 @@ pub fn create_hashes<'a>( downcast_primitive_array! { array => hash_array(array, random_state, hashes_buffer, multi_col), DataType::Null => hash_null(random_state, hashes_buffer, multi_col), - DataType::Boolean => hash_array(as_boolean_array(array), random_state, hashes_buffer, multi_col), + DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, multi_col), DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, multi_col), DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, multi_col), DataType::Binary => hash_array(as_generic_binary_array::(array), random_state, hashes_buffer, multi_col), diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 653f2f854878..02b6eeb4efea 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -273,7 +273,10 @@ mod tests { use super::*; use arrow::array::Int32Array; - use datafusion_common::{cast::as_int32_array, Result}; + use datafusion_common::{ + cast::{as_boolean_array, as_int32_array}, + Result, + }; #[test] fn scatter_int() -> Result<()> { @@ -335,7 +338,7 @@ mod tests { Some(false), ]); let result = scatter(&mask, truthy.as_ref())?; - let result = result.as_any().downcast_ref::().unwrap(); + let result = as_boolean_array(&result)?; assert_eq!(&expected, result); Ok(()) diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index da6734543089..ec9aca532cdd 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -167,7 +167,7 @@ impl PartitionEvaluator for RankEvaluator { mod tests { use super::*; use arrow::{array::*, datatypes::*}; - use datafusion_common::cast::as_float64_array; + use datafusion_common::cast::{as_float64_array, as_uint64_array}; fn test_with_rank(expr: &Rank, expected: Vec) -> Result<()> { test_i32_result( @@ -217,7 +217,7 @@ mod tests { .create_evaluator(&batch)? .evaluate_with_rank(vec![0..8], ranks)?; assert_eq!(1, result.len()); - let result = result[0].as_any().downcast_ref::().unwrap(); + let result = as_uint64_array(&result[0])?; let result = result.values(); assert_eq!(expected, result); Ok(()) diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index fb3bc770442f..11f4f620dc9b 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -86,7 +86,7 @@ mod tests { use super::*; use arrow::record_batch::RecordBatch; use arrow::{array::*, datatypes::*}; - use datafusion_common::Result; + use datafusion_common::{cast::as_uint64_array, Result}; #[test] fn row_number_all_null() -> Result<()> { @@ -98,7 +98,7 @@ mod tests { let row_number = RowNumber::new("row_number".to_owned()); let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?; assert_eq!(1, result.len()); - let result = result[0].as_any().downcast_ref::().unwrap(); + let result = as_uint64_array(&result[0])?; let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); Ok(()) @@ -114,7 +114,7 @@ mod tests { let row_number = RowNumber::new("row_number".to_owned()); let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?; assert_eq!(1, result.len()); - let result = result[0].as_any().downcast_ref::().unwrap(); + let result = as_uint64_array(&result[0])?; let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); Ok(())