From 83ed4297239e4a83e761904f45c2df1d2f85a6ac Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Mon, 20 Dec 2021 22:24:53 -0800 Subject: [PATCH] fix is distict from and muttable buffer use --- .../src/physical_plan/expressions/binary.rs | 150 ++++++++++++++++-- .../expressions/get_indexed_field.rs | 2 +- .../src/physical_plan/file_format/mod.rs | 13 +- datafusion/src/physical_plan/hash_join.rs | 10 +- datafusion/src/physical_plan/values.rs | 2 +- 5 files changed, 147 insertions(+), 30 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs index 7cbb6365c3be..c7bba45d723c 100644 --- a/datafusion/src/physical_plan/expressions/binary.rs +++ b/datafusion/src/physical_plan/expressions/binary.rs @@ -34,6 +34,40 @@ use super::coercion::{ use arrow::scalar::Scalar; use arrow::types::NativeType; +// Simple (low performance) kernels until optimized kernels are added to arrow +// See https://github.com/apache/arrow-rs/issues/960 + +fn is_distinct_from_bool(left: &dyn Array, right: &dyn Array) -> BooleanArray { + // Different from `neq_bool` because `null is distinct from null` is false and not null + let left = left + .as_any() + .downcast_ref::() + .expect("distinct_from op failed to downcast to boolean array"); + let right = right + .as_any() + .downcast_ref::() + .expect("distinct_from op failed to downcast to boolean array"); + left.iter() + .zip(right.iter()) + .map(|(left, right)| Some(left != right)) + .collect() +} + +fn is_not_distinct_from_bool(left: &dyn Array, right: &dyn Array) -> BooleanArray { + let left = left + .as_any() + .downcast_ref::() + .expect("not_distinct_from op failed to downcast to boolean array"); + let right = right + .as_any() + .downcast_ref::() + .expect("not_distinct_from op failed to downcast to boolean array"); + left.iter() + .zip(right.iter()) + .map(|(left, right)| Some(left == right)) + .collect() +} + /// Binary expression #[derive(Debug)] pub struct BinaryExpr { @@ -142,9 +176,9 @@ fn evaluate(lhs: &dyn Array, op: &Operator, rhs: &dyn Array) -> Result) } else if matches!(op, IsDistinctFrom) { - boolean_op!(lhs, rhs, is_distinct_from) + is_distinct_from(lhs, rhs) } else if matches!(op, IsNotDistinctFrom) { - boolean_op!(lhs, rhs, is_not_distinct_from) + is_not_distinct_from(lhs, rhs) } else if matches!(op, Or) { boolean_op!(lhs, rhs, compute::boolean_kleene::or) } else if matches!(op, And) { @@ -543,26 +577,110 @@ impl PhysicalExpr for BinaryExpr { } } -fn is_distinct_from( - left: &PrimitiveArray, - right: &PrimitiveArray, -) -> Result { - Ok(left - .iter() +fn is_distinct_from_primitive( + left: &dyn Array, + right: &dyn Array, +) -> BooleanArray { + let left = left + .as_any() + .downcast_ref::>() + .expect("distinct_from op failed to downcast to primitive array"); + let right = right + .as_any() + .downcast_ref::>() + .expect("distinct_from op failed to downcast to primitive array"); + left.iter() .zip(right.iter()) .map(|(x, y)| Some(x != y)) - .collect()) + .collect() } -fn is_not_distinct_from( - left: &PrimitiveArray, - right: &PrimitiveArray, -) -> Result { - Ok(left - .iter() +fn is_not_distinct_from_primitive( + left: &dyn Array, + right: &dyn Array, +) -> BooleanArray { + let left = left + .as_any() + .downcast_ref::>() + .expect("not_distinct_from op failed to downcast to primitive array"); + let right = right + .as_any() + .downcast_ref::>() + .expect("not_distinct_from op failed to downcast to primitive array"); + left.iter() .zip(right.iter()) .map(|(x, y)| Some(x == y)) - .collect()) + .collect() +} + +fn is_distinct_from(left: &dyn Array, right: &dyn Array) -> Result> { + match (left.data_type(), right.data_type()) { + (DataType::Int8, DataType::Int8) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::Int32, DataType::Int32) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::Int64, DataType::Int64) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::UInt8, DataType::UInt8) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::UInt16, DataType::UInt16) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::UInt32, DataType::UInt32) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::UInt64, DataType::UInt64) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::Float32, DataType::Float32) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::Float64, DataType::Float64) => { + Ok(Arc::new(is_distinct_from_primitive::(left, right))) + } + (DataType::Boolean, DataType::Boolean) => { + Ok(Arc::new(is_distinct_from_bool(left, right))) + } + } +} + +fn is_not_distinct_from(left: &dyn Array, right: &dyn Array) -> Result> { + match (left.data_type(), right.data_type()) { + (DataType::Int8, DataType::Int8) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::Int32, DataType::Int32) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::Int64, DataType::Int64) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::UInt8, DataType::UInt8) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::UInt16, DataType::UInt16) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::UInt32, DataType::UInt32) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::UInt64, DataType::UInt64) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::Float32, DataType::Float32) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::Float64, DataType::Float64) => { + Ok(Arc::new(is_not_distinct_from_primitive::(left, right))) + } + (DataType::Boolean, DataType::Boolean) => { + Ok(Arc::new(is_not_distinct_from_bool(left, right))) + } + } } /// return two physical expressions that are optionally coerced to a diff --git a/datafusion/src/physical_plan/expressions/get_indexed_field.rs b/datafusion/src/physical_plan/expressions/get_indexed_field.rs index b8ed71bad42e..9431fed26a5d 100644 --- a/datafusion/src/physical_plan/expressions/get_indexed_field.rs +++ b/datafusion/src/physical_plan/expressions/get_indexed_field.rs @@ -94,7 +94,7 @@ impl PhysicalExpr for GetIndexedFieldExpr { } let sliced_array: Vec> = as_list_array .iter() - .filter_map(|o| o.map(|list| list.slice(*i as usize, 1))) + .filter_map(|o| o.map(|list| list.slice(*i as usize, 1).into())) .collect(); let vec = sliced_array.iter().map(|a| a.as_ref()).collect::>(); let iter = concatenate(vec.as_slice()).unwrap(); diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 58fd200a0707..65d72be57224 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -39,6 +39,7 @@ use crate::{ datasource::{object_store::ObjectStore, PartitionedFile}, scalar::ScalarValue, }; +use arrow::array::UInt8Array; use lazy_static::lazy_static; use std::{ collections::HashMap, @@ -46,7 +47,6 @@ use std::{ sync::Arc, vec, }; -use arrow::array::UInt8Array; use super::{ColumnStatistics, Statistics}; @@ -178,7 +178,7 @@ struct PartitionColumnProjector { /// An Arrow buffer initialized to zeros that represents the key array of all partition /// columns (partition columns are materialized by dictionary arrays with only one /// value in the dictionary, thus all the keys are equal to zero). - key_buffer_cache: Option, + key_array_cache: Option, /// Mapping between the indexes in the list of partition columns and the target /// schema. Sorted by index in the target schema so that we can iterate on it to /// insert the partition columns in the target record batch. @@ -256,13 +256,12 @@ fn create_dict_array( let sliced_keys = match key_array_cache { Some(buf) if buf.len() >= len => buf.slice(0, len), _ => key_array_cache - .insert(UInt8Array::from_trusted_len_values_iter(iter::repeat(0).take(len))) + .insert(UInt8Array::from_trusted_len_values_iter( + iter::repeat(0).take(len), + )) .clone(), }; - Arc::new(DictionaryArray::::from_data( - sliced_keys, - dict_vals, - )) + Arc::new(DictionaryArray::::from_data(sliced_keys, dict_vals)) } #[cfg(test)] diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 8e355522a966..bfa9358a7999 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -687,8 +687,8 @@ fn build_join_indexes( &keys_values, *null_equals_null, )? { - left_indices.append(i); - right_indices.append(row as u32); + left_indices.push(i); + right_indices.push(row as u32); } } } @@ -725,8 +725,8 @@ fn build_join_indexes( &keys_values, *null_equals_null, )? { - left_indices.append_value(i)?; - right_indices.append_value(row as u32)?; + left_indices.push(i); + right_indices.push(row as u32); } } }; @@ -846,7 +846,7 @@ fn equal_rows( } DataType::Timestamp(_, None) => { equal_rows_elem!(Int64Array, l, r, left, right, null_equals_null) - }, + } DataType::Utf8 => { equal_rows_elem!(StringArray, l, r, left, right, null_equals_null) } diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs index f4f8ccb6246a..0a0dfcab64ee 100644 --- a/datafusion/src/physical_plan/values.rs +++ b/datafusion/src/physical_plan/values.rs @@ -57,7 +57,7 @@ impl ValuesExec { schema .fields() .iter() - .map(|field| new_null_array(field.data_type(), 1)) + .map(|field| new_null_array(field.data_type(), 1).into()) .collect::>(), )?; let arr = (0..n_col)