Skip to content

Commit

Permalink
fix is distict from and muttable buffer use
Browse files Browse the repository at this point in the history
  • Loading branch information
houqp committed Dec 21, 2021
1 parent 626591f commit 83ed429
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 30 deletions.
150 changes: 134 additions & 16 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,40 @@ use super::coercion::{
use arrow::scalar::Scalar;
use arrow::types::NativeType;

// Simple (low performance) kernels until optimized kernels are added to arrow
// See https://github.com/apache/arrow-rs/issues/960

fn is_distinct_from_bool(left: &dyn Array, right: &dyn Array) -> BooleanArray {
// Different from `neq_bool` because `null is distinct from null` is false and not null
let left = left
.as_any()
.downcast_ref::<BooleanArray>()
.expect("distinct_from op failed to downcast to boolean array");
let right = right
.as_any()
.downcast_ref::<BooleanArray>()
.expect("distinct_from op failed to downcast to boolean array");
left.iter()
.zip(right.iter())
.map(|(left, right)| Some(left != right))
.collect()
}

fn is_not_distinct_from_bool(left: &dyn Array, right: &dyn Array) -> BooleanArray {
let left = left
.as_any()
.downcast_ref::<BooleanArray>()
.expect("not_distinct_from op failed to downcast to boolean array");
let right = right
.as_any()
.downcast_ref::<BooleanArray>()
.expect("not_distinct_from op failed to downcast to boolean array");
left.iter()
.zip(right.iter())
.map(|(left, right)| Some(left == right))
.collect()
}

/// Binary expression
#[derive(Debug)]
pub struct BinaryExpr {
Expand Down Expand Up @@ -142,9 +176,9 @@ fn evaluate(lhs: &dyn Array, op: &Operator, rhs: &dyn Array) -> Result<Arc<dyn A
};
Ok(Arc::new(arr) as Arc<dyn Array>)
} else if matches!(op, IsDistinctFrom) {
boolean_op!(lhs, rhs, is_distinct_from)
is_distinct_from(lhs, rhs)
} else if matches!(op, IsNotDistinctFrom) {
boolean_op!(lhs, rhs, is_not_distinct_from)
is_not_distinct_from(lhs, rhs)
} else if matches!(op, Or) {
boolean_op!(lhs, rhs, compute::boolean_kleene::or)
} else if matches!(op, And) {
Expand Down Expand Up @@ -543,26 +577,110 @@ impl PhysicalExpr for BinaryExpr {
}
}

fn is_distinct_from<T: NativeType>(
left: &PrimitiveArray<T>,
right: &PrimitiveArray<T>,
) -> Result<BooleanArray> {
Ok(left
.iter()
fn is_distinct_from_primitive<T: NativeType>(
left: &dyn Array,
right: &dyn Array,
) -> BooleanArray {
let left = left
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.expect("distinct_from op failed to downcast to primitive array");
let right = right
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.expect("distinct_from op failed to downcast to primitive array");
left.iter()
.zip(right.iter())
.map(|(x, y)| Some(x != y))
.collect())
.collect()
}

fn is_not_distinct_from<T: NativeType>(
left: &PrimitiveArray<T>,
right: &PrimitiveArray<T>,
) -> Result<BooleanArray> {
Ok(left
.iter()
fn is_not_distinct_from_primitive<T: NativeType>(
left: &dyn Array,
right: &dyn Array,
) -> BooleanArray {
let left = left
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.expect("not_distinct_from op failed to downcast to primitive array");
let right = right
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.expect("not_distinct_from op failed to downcast to primitive array");
left.iter()
.zip(right.iter())
.map(|(x, y)| Some(x == y))
.collect())
.collect()
}

fn is_distinct_from(left: &dyn Array, right: &dyn Array) -> Result<Arc<dyn Array>> {
match (left.data_type(), right.data_type()) {
(DataType::Int8, DataType::Int8) => {
Ok(Arc::new(is_distinct_from_primitive::<i8>(left, right)))
}
(DataType::Int32, DataType::Int32) => {
Ok(Arc::new(is_distinct_from_primitive::<i32>(left, right)))
}
(DataType::Int64, DataType::Int64) => {
Ok(Arc::new(is_distinct_from_primitive::<i64>(left, right)))
}
(DataType::UInt8, DataType::UInt8) => {
Ok(Arc::new(is_distinct_from_primitive::<u8>(left, right)))
}
(DataType::UInt16, DataType::UInt16) => {
Ok(Arc::new(is_distinct_from_primitive::<u16>(left, right)))
}
(DataType::UInt32, DataType::UInt32) => {
Ok(Arc::new(is_distinct_from_primitive::<u32>(left, right)))
}
(DataType::UInt64, DataType::UInt64) => {
Ok(Arc::new(is_distinct_from_primitive::<u64>(left, right)))
}
(DataType::Float32, DataType::Float32) => {
Ok(Arc::new(is_distinct_from_primitive::<f32>(left, right)))
}
(DataType::Float64, DataType::Float64) => {
Ok(Arc::new(is_distinct_from_primitive::<f64>(left, right)))
}
(DataType::Boolean, DataType::Boolean) => {
Ok(Arc::new(is_distinct_from_bool(left, right)))
}
}
}

fn is_not_distinct_from(left: &dyn Array, right: &dyn Array) -> Result<Arc<dyn Array>> {
match (left.data_type(), right.data_type()) {
(DataType::Int8, DataType::Int8) => {
Ok(Arc::new(is_not_distinct_from_primitive::<i8>(left, right)))
}
(DataType::Int32, DataType::Int32) => {
Ok(Arc::new(is_not_distinct_from_primitive::<i32>(left, right)))
}
(DataType::Int64, DataType::Int64) => {
Ok(Arc::new(is_not_distinct_from_primitive::<i64>(left, right)))
}
(DataType::UInt8, DataType::UInt8) => {
Ok(Arc::new(is_not_distinct_from_primitive::<u8>(left, right)))
}
(DataType::UInt16, DataType::UInt16) => {
Ok(Arc::new(is_not_distinct_from_primitive::<u16>(left, right)))
}
(DataType::UInt32, DataType::UInt32) => {
Ok(Arc::new(is_not_distinct_from_primitive::<u32>(left, right)))
}
(DataType::UInt64, DataType::UInt64) => {
Ok(Arc::new(is_not_distinct_from_primitive::<u64>(left, right)))
}
(DataType::Float32, DataType::Float32) => {
Ok(Arc::new(is_not_distinct_from_primitive::<f32>(left, right)))
}
(DataType::Float64, DataType::Float64) => {
Ok(Arc::new(is_not_distinct_from_primitive::<f64>(left, right)))
}
(DataType::Boolean, DataType::Boolean) => {
Ok(Arc::new(is_not_distinct_from_bool(left, right)))
}
}
}

/// return two physical expressions that are optionally coerced to a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl PhysicalExpr for GetIndexedFieldExpr {
}
let sliced_array: Vec<Arc<dyn Array>> = as_list_array
.iter()
.filter_map(|o| o.map(|list| list.slice(*i as usize, 1)))
.filter_map(|o| o.map(|list| list.slice(*i as usize, 1).into()))
.collect();
let vec = sliced_array.iter().map(|a| a.as_ref()).collect::<Vec<&dyn Array>>();
let iter = concatenate(vec.as_slice()).unwrap();
Expand Down
13 changes: 6 additions & 7 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ use crate::{
datasource::{object_store::ObjectStore, PartitionedFile},
scalar::ScalarValue,
};
use arrow::array::UInt8Array;
use lazy_static::lazy_static;
use std::{
collections::HashMap,
fmt::{Display, Formatter, Result as FmtResult},
sync::Arc,
vec,
};
use arrow::array::UInt8Array;

use super::{ColumnStatistics, Statistics};

Expand Down Expand Up @@ -178,7 +178,7 @@ struct PartitionColumnProjector {
/// An Arrow buffer initialized to zeros that represents the key array of all partition
/// columns (partition columns are materialized by dictionary arrays with only one
/// value in the dictionary, thus all the keys are equal to zero).
key_buffer_cache: Option<UInt8Array>,
key_array_cache: Option<UInt8Array>,
/// Mapping between the indexes in the list of partition columns and the target
/// schema. Sorted by index in the target schema so that we can iterate on it to
/// insert the partition columns in the target record batch.
Expand Down Expand Up @@ -256,13 +256,12 @@ fn create_dict_array(
let sliced_keys = match key_array_cache {
Some(buf) if buf.len() >= len => buf.slice(0, len),
_ => key_array_cache
.insert(UInt8Array::from_trusted_len_values_iter(iter::repeat(0).take(len)))
.insert(UInt8Array::from_trusted_len_values_iter(
iter::repeat(0).take(len),
))
.clone(),
};
Arc::new(DictionaryArray::<u8>::from_data(
sliced_keys,
dict_vals,
))
Arc::new(DictionaryArray::<u8>::from_data(sliced_keys, dict_vals))
}

#[cfg(test)]
Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,8 @@ fn build_join_indexes(
&keys_values,
*null_equals_null,
)? {
left_indices.append(i);
right_indices.append(row as u32);
left_indices.push(i);
right_indices.push(row as u32);
}
}
}
Expand Down Expand Up @@ -725,8 +725,8 @@ fn build_join_indexes(
&keys_values,
*null_equals_null,
)? {
left_indices.append_value(i)?;
right_indices.append_value(row as u32)?;
left_indices.push(i);
right_indices.push(row as u32);
}
}
};
Expand Down Expand Up @@ -846,7 +846,7 @@ fn equal_rows(
}
DataType::Timestamp(_, None) => {
equal_rows_elem!(Int64Array, l, r, left, right, null_equals_null)
},
}
DataType::Utf8 => {
equal_rows_elem!(StringArray, l, r, left, right, null_equals_null)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl ValuesExec {
schema
.fields()
.iter()
.map(|field| new_null_array(field.data_type(), 1))
.map(|field| new_null_array(field.data_type(), 1).into())
.collect::<Vec<_>>(),
)?;
let arr = (0..n_col)
Expand Down

0 comments on commit 83ed429

Please sign in to comment.