From ea7c6b2aa21ec9512647074924db04d73d80811f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 11 Oct 2024 16:53:59 +0200 Subject: [PATCH] Use fixed for repartitioner --- datafusion/common/src/hash_utils.rs | 27 ++++++++++--------- datafusion/common/src/scalar/mod.rs | 3 ++- .../src/aggregate/count_distinct/native.rs | 2 +- .../functions-aggregate/src/bit_and_or_xor.rs | 2 +- datafusion/functions-aggregate/src/count.rs | 2 +- datafusion/functions-aggregate/src/sum.rs | 2 +- .../physical-expr-common/src/binary_map.rs | 2 +- .../src/binary_view_map.rs | 2 +- .../src/aggregates/group_values/column.rs | 2 +- .../src/aggregates/group_values/primitive.rs | 2 +- .../src/aggregates/group_values/row.rs | 2 +- .../src/aggregates/topk/hash_table.rs | 2 +- .../physical-plan/src/joins/hash_join.rs | 2 +- .../src/joins/symmetric_hash_join.rs | 2 +- .../physical-plan/src/repartition/mod.rs | 23 +++++++++++----- .../src/windows/bounded_window_agg_exec.rs | 2 +- 16 files changed, 46 insertions(+), 33 deletions(-) diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 1e733b94711e..1819e2089087 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -46,8 +46,11 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 { } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_null(random_state: &H, hashes_buffer: &'_ mut [u64], mul_col: bool) { - +fn hash_null( + random_state: &H, + hashes_buffer: &'_ mut [u64], + mul_col: bool, +) { if mul_col { hashes_buffer.iter_mut().for_each(|hash| { // stable hash for null value @@ -61,11 +64,11 @@ fn hash_null(random_state: &H, hashes_buffer: &'_ mut [u64], mul } pub trait HashValue { - fn hash_one(&self, state: &H) -> u64; + fn hash_one(&self, state: &H) -> u64; } impl<'a, T: HashValue + ?Sized> HashValue for &'a T { - fn hash_one(&self, state: &H) -> u64 { + fn hash_one(&self, state: &H) -> u64 { T::hash_one(self, state) } } @@ -97,7 +100,7 @@ hash_float_value!((half::f16, u16), (f32, u32), (f64, u64)); /// If `rehash==true` this combines the previous hash value in the buffer /// with the new hash using `combine_hashes` #[cfg(not(feature = "force_hash_collisions"))] -fn hash_array_primitive( +fn hash_array_primitive( array: &PrimitiveArray, random_state: &H, hashes_buffer: &mut [u64], @@ -376,7 +379,6 @@ pub fn create_hashes<'a, H: BuildHasher>( random_state: &H, hashes_buffer: &'a mut Vec, ) -> Result<&'a mut Vec> { - for (i, col) in arrays.iter().enumerate() { let array = col.as_ref(); // combine hashes with `combine_hashes` for all columns besides the first @@ -443,7 +445,6 @@ pub fn create_hashes<'a, H: BuildHasher>( mod tests { use std::sync::Arc; - use arrow::array::*; #[cfg(not(feature = "force_hash_collisions"))] use arrow::datatypes::*; @@ -540,8 +541,8 @@ mod tests { let fixed_size_binary_array = Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap()); - let random_state = RandomState::default(); - let hashes_buff = &mut vec![0; fixed_size_binary_array.len()]; + let random_state = RandomState::default(); + let hashes_buff = &mut vec![0; fixed_size_binary_array.len()]; let hashes = create_hashes(&[fixed_size_binary_array], &random_state, hashes_buff)?; assert_eq!(hashes.len(), 3,); @@ -666,8 +667,8 @@ mod tests { ]; let list_array = Arc::new(ListArray::from_iter_primitive::(data)) as ArrayRef; - let random_state = RandomState::default(); - let mut hashes = vec![0; list_array.len()]; + let random_state = RandomState::default(); + let mut hashes = vec![0; list_array.len()]; create_hashes(&[list_array], &random_state, &mut hashes).unwrap(); assert_eq!(hashes[0], hashes[5]); assert_eq!(hashes[1], hashes[4]); @@ -691,8 +692,8 @@ mod tests { Arc::new(FixedSizeListArray::from_iter_primitive::( data, 3, )) as ArrayRef; - let random_state = RandomState::default(); - let mut hashes = vec![0; list_array.len()]; + let random_state = RandomState::default(); + let mut hashes = vec![0; list_array.len()]; create_hashes(&[list_array], &random_state, &mut hashes).unwrap(); assert_eq!(hashes[0], hashes[5]); assert_eq!(hashes[1], hashes[4]); diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 8731a799f304..7ed2518e7fc3 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -771,7 +771,8 @@ impl std::hash::Hash for ScalarValue { fn hash_nested_array(arr: ArrayRef, state: &mut H) { let arrays = vec![arr.to_owned()]; let hashes_buffer = &mut vec![0; arr.len()]; - let hashes = create_hashes(&arrays, &FixedState::with_seed(0), hashes_buffer).unwrap(); + let hashes = + create_hashes(&arrays, &FixedState::with_seed(0), hashes_buffer).unwrap(); // Hash back to std::hash::Hasher hashes.hash(state); } diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index 830cb65b346e..3a856177fa92 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -25,11 +25,11 @@ use std::fmt::Debug; use std::hash::Hash; use std::sync::Arc; -use foldhash::fast::RandomState; use arrow::array::types::ArrowPrimitiveType; use arrow::array::ArrayRef; use arrow::array::PrimitiveArray; use arrow::datatypes::DataType; +use foldhash::fast::RandomState; use datafusion_common::cast::{as_list_array, as_primitive_array}; use datafusion_common::utils::array_into_list_array_nullable; diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 94e3fb3eb7a5..5de8726d8e9b 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -21,13 +21,13 @@ use std::any::Any; use std::collections::HashSet; use std::fmt::{Display, Formatter}; -use foldhash::fast::RandomState; use arrow::array::{downcast_integer, Array, ArrayRef, AsArray}; use arrow::datatypes::{ ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use arrow_schema::Field; +use foldhash::fast::RandomState; use datafusion_common::cast::as_list_array; use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index b9aabdecea5a..153b324a8c8a 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use foldhash::fast::RandomState; use datafusion_common::stats::Precision; use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use datafusion_physical_expr::expressions; +use foldhash::fast::RandomState; use std::collections::HashSet; use std::ops::BitAnd; use std::{fmt::Debug, sync::Arc}; diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 113b90619eec..925ffd6636e7 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -17,8 +17,8 @@ //! Defines `SUM` and `SUM DISTINCT` aggregate accumulators -use foldhash::fast::RandomState; use datafusion_expr::utils::AggregateOrderSensitivity; +use foldhash::fast::RandomState; use std::any::Any; use std::collections::HashSet; diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index 18abddb03271..d01dabdb4662 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -18,7 +18,6 @@ //! [`ArrowBytesMap`] and [`ArrowBytesSet`] for storing maps/sets of values from //! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray. -use foldhash::fast::RandomState; use arrow::array::cast::AsArray; use arrow::array::types::{ByteArrayType, GenericBinaryType, GenericStringType}; use arrow::array::{ @@ -29,6 +28,7 @@ use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow::datatypes::DataType; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt}; +use foldhash::fast::RandomState; use std::any::type_name; use std::fmt::Debug; use std::mem; diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index c54d1a1b7311..bb384e370c0f 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -19,12 +19,12 @@ //! `StringViewArray`/`BinaryViewArray`. //! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the //! [`GenericByteViewBuilder`]. -use foldhash::fast::RandomState; use arrow::array::cast::AsArray; use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder}; use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt}; +use foldhash::fast::RandomState; use std::fmt::Debug; use std::sync::Arc; diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index d3cbf3eaeccd..c70d5a580a69 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -19,7 +19,6 @@ use crate::aggregates::group_values::group_column::{ ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder, }; use crate::aggregates::group_values::GroupValues; -use foldhash::fast::RandomState; use arrow::compute::cast; use arrow::datatypes::{ Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, @@ -33,6 +32,7 @@ use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; +use foldhash::fast::RandomState; use hashbrown::raw::RawTable; diff --git a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs index 165b643cf521..82b71191b39c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/primitive.rs @@ -16,7 +16,6 @@ // under the License. use crate::aggregates::group_values::GroupValues; -use foldhash::fast::RandomState; use arrow::array::BooleanBufferBuilder; use arrow::buffer::NullBuffer; use arrow::datatypes::i256; @@ -28,6 +27,7 @@ use arrow_schema::DataType; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; +use foldhash::fast::RandomState; use half::f16; use hashbrown::raw::RawTable; use std::sync::Arc; diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index 0087fb89a201..596b3ce805cb 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -16,7 +16,6 @@ // under the License. use crate::aggregates::group_values::GroupValues; -use foldhash::fast::RandomState; use arrow::compute::cast; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, Rows, SortField}; @@ -26,6 +25,7 @@ use datafusion_common::hash_utils::create_hashes; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; +use foldhash::fast::RandomState; use hashbrown::raw::RawTable; use std::sync::Arc; diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index a05c9d80324c..f7580d85c54d 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -19,7 +19,6 @@ use crate::aggregates::group_values::primitive::HashValue; use crate::aggregates::topk::heap::Comparable; -use foldhash::fast::RandomState; use arrow::datatypes::i256; use arrow_array::builder::PrimitiveBuilder; use arrow_array::cast::AsArray; @@ -30,6 +29,7 @@ use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::DataType; use datafusion_common::DataFusionError; use datafusion_common::Result; +use foldhash::fast::RandomState; use half::f16; use hashbrown::raw::RawTable; use std::fmt::Debug; diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 87dc3cc9b31a..61382cce595c 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -70,9 +70,9 @@ use datafusion_physical_expr::equivalence::{ }; use datafusion_physical_expr::PhysicalExprRef; -use foldhash::fast::RandomState; use datafusion_expr::Operator; use datafusion_physical_expr_common::datum::compare_op_for_nested; +use foldhash::fast::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 8f1998f24587..b193118219dd 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -71,8 +71,8 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; -use foldhash::fast::RandomState; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use foldhash::fast::RandomState; use futures::{ready, Stream, StreamExt}; use hashbrown::HashSet; use parking_lot::Mutex; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 100f8bafa576..ebca41b6e430 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -47,7 +47,7 @@ use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr}; -use foldhash::fast::RandomState; +use foldhash::fast::FixedState; use crate::execution_plan::CardinalityEffect; use futures::stream::Stream; @@ -124,6 +124,8 @@ impl RepartitionExecState { // launch one async task per *input* partition let mut spawned_tasks = Vec::with_capacity(num_input_partitions); + let random_state = FixedState::default(); + for i in 0..num_input_partitions { let txs: HashMap<_, _> = channels .iter() @@ -141,6 +143,7 @@ impl RepartitionExecState { partitioning.clone(), r_metrics, Arc::clone(&context), + random_state, )); // In a separate task, wait for each input to be done @@ -183,7 +186,7 @@ pub struct BatchPartitioner { enum BatchPartitionerState { Hash { - random_state: foldhash::fast::RandomState, + random_state: foldhash::fast::FixedState, exprs: Vec>, num_partitions: usize, hash_buffer: Vec, @@ -198,7 +201,11 @@ impl BatchPartitioner { /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] /// /// The time spent repartitioning will be recorded to `timer` - pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result { + pub fn try_new( + partitioning: Partitioning, + timer: metrics::Time, + random_state: FixedState, + ) -> Result { let state = match partitioning { Partitioning::RoundRobinBatch(num_partitions) => { BatchPartitionerState::RoundRobin { @@ -209,7 +216,7 @@ impl BatchPartitioner { Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash { exprs, num_partitions, - random_state: RandomState::default(), + random_state: random_state, hash_buffer: vec![], }, other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"), @@ -780,9 +787,13 @@ impl RepartitionExec { partitioning: Partitioning, metrics: RepartitionMetrics, context: Arc, + random_state: FixedState, ) -> Result<()> { - let mut partitioner = - BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; + let mut partitioner = BatchPartitioner::try_new( + partitioning, + metrics.repartition_time.clone(), + random_state, + )?; // execute the child operator let timer = metrics.fetch_time.timer(); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 534319b8919a..7370b201a7c2 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -39,7 +39,6 @@ use crate::{ ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, }; -use foldhash::fast::RandomState; use arrow::{ array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder}, compute::{concat, concat_batches, sort_to_indices}, @@ -61,6 +60,7 @@ use datafusion_physical_expr::window::{ }; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use foldhash::fast::RandomState; use futures::stream::Stream; use futures::{ready, StreamExt}; use hashbrown::raw::RawTable;