Skip to content

Commit

Permalink
Use fixed for repartitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Oct 11, 2024
1 parent e1dfa62 commit ea7c6b2
Show file tree
Hide file tree
Showing 16 changed files with 46 additions and 33 deletions.
27 changes: 14 additions & 13 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_null<H: BuildHasher>(random_state: &H, hashes_buffer: &'_ mut [u64], mul_col: bool) {

fn hash_null<H: BuildHasher>(
random_state: &H,
hashes_buffer: &'_ mut [u64],
mul_col: bool,
) {
if mul_col {
hashes_buffer.iter_mut().for_each(|hash| {
// stable hash for null value
Expand All @@ -61,11 +64,11 @@ fn hash_null<H: BuildHasher>(random_state: &H, hashes_buffer: &'_ mut [u64], mul
}

pub trait HashValue {
fn hash_one<H:BuildHasher>(&self, state: &H) -> u64;
fn hash_one<H: BuildHasher>(&self, state: &H) -> u64;
}

impl<'a, T: HashValue + ?Sized> HashValue for &'a T {
fn hash_one<H:BuildHasher>(&self, state: &H) -> u64 {
fn hash_one<H: BuildHasher>(&self, state: &H) -> u64 {
T::hash_one(self, state)
}
}
Expand Down Expand Up @@ -97,7 +100,7 @@ hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
/// If `rehash==true` this combines the previous hash value in the buffer
/// with the new hash using `combine_hashes`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array_primitive<T, H:BuildHasher>(
fn hash_array_primitive<T, H: BuildHasher>(
array: &PrimitiveArray<T>,
random_state: &H,
hashes_buffer: &mut [u64],
Expand Down Expand Up @@ -376,7 +379,6 @@ pub fn create_hashes<'a, H: BuildHasher>(
random_state: &H,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {

for (i, col) in arrays.iter().enumerate() {
let array = col.as_ref();
// combine hashes with `combine_hashes` for all columns besides the first
Expand Down Expand Up @@ -443,7 +445,6 @@ pub fn create_hashes<'a, H: BuildHasher>(
mod tests {
use std::sync::Arc;


use arrow::array::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::datatypes::*;
Expand Down Expand Up @@ -540,8 +541,8 @@ mod tests {
let fixed_size_binary_array =
Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());

let random_state = RandomState::default();
let hashes_buff = &mut vec![0; fixed_size_binary_array.len()];
let random_state = RandomState::default();
let hashes_buff = &mut vec![0; fixed_size_binary_array.len()];
let hashes =
create_hashes(&[fixed_size_binary_array], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 3,);
Expand Down Expand Up @@ -666,8 +667,8 @@ mod tests {
];
let list_array =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
let random_state = RandomState::default();
let mut hashes = vec![0; list_array.len()];
let random_state = RandomState::default();
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
Expand All @@ -691,8 +692,8 @@ mod tests {
Arc::new(FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
data, 3,
)) as ArrayRef;
let random_state = RandomState::default();
let mut hashes = vec![0; list_array.len()];
let random_state = RandomState::default();
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ impl std::hash::Hash for ScalarValue {
fn hash_nested_array<H: std::hash::Hasher>(arr: ArrayRef, state: &mut H) {
let arrays = vec![arr.to_owned()];
let hashes_buffer = &mut vec![0; arr.len()];
let hashes = create_hashes(&arrays, &FixedState::with_seed(0), hashes_buffer).unwrap();
let hashes =
create_hashes(&arrays, &FixedState::with_seed(0), hashes_buffer).unwrap();
// Hash back to std::hash::Hasher
hashes.hash(state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;

use foldhash::fast::RandomState;
use arrow::array::types::ArrowPrimitiveType;
use arrow::array::ArrayRef;
use arrow::array::PrimitiveArray;
use arrow::datatypes::DataType;
use foldhash::fast::RandomState;

use datafusion_common::cast::{as_list_array, as_primitive_array};
use datafusion_common::utils::array_into_list_array_nullable;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/bit_and_or_xor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use std::any::Any;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};

use foldhash::fast::RandomState;
use arrow::array::{downcast_integer, Array, ArrayRef, AsArray};
use arrow::datatypes::{
ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, Int64Type,
Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow_schema::Field;
use foldhash::fast::RandomState;

use datafusion_common::cast::as_list_array;
use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use foldhash::fast::RandomState;
use datafusion_common::stats::Precision;
use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator;
use datafusion_physical_expr::expressions;
use foldhash::fast::RandomState;
use std::collections::HashSet;
use std::ops::BitAnd;
use std::{fmt::Debug, sync::Arc};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Defines `SUM` and `SUM DISTINCT` aggregate accumulators

use foldhash::fast::RandomState;
use datafusion_expr::utils::AggregateOrderSensitivity;
use foldhash::fast::RandomState;
use std::any::Any;
use std::collections::HashSet;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr-common/src/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! [`ArrowBytesMap`] and [`ArrowBytesSet`] for storing maps/sets of values from
//! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray.

use foldhash::fast::RandomState;
use arrow::array::cast::AsArray;
use arrow::array::types::{ByteArrayType, GenericBinaryType, GenericStringType};
use arrow::array::{
Expand All @@ -29,6 +28,7 @@ use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt};
use foldhash::fast::RandomState;
use std::any::type_name;
use std::fmt::Debug;
use std::mem;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr-common/src/binary_view_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
//! `StringViewArray`/`BinaryViewArray`.
//! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the
//! [`GenericByteViewBuilder`].
use foldhash::fast::RandomState;
use arrow::array::cast::AsArray;
use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder};
use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt};
use foldhash::fast::RandomState;
use std::fmt::Debug;
use std::sync::Arc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::aggregates::group_values::group_column::{
ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder,
};
use crate::aggregates::group_values::GroupValues;
use foldhash::fast::RandomState;
use arrow::compute::cast;
use arrow::datatypes::{
Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Expand All @@ -33,6 +32,7 @@ use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use datafusion_physical_expr::binary_map::OutputType;
use foldhash::fast::RandomState;

use hashbrown::raw::RawTable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use crate::aggregates::group_values::GroupValues;
use foldhash::fast::RandomState;
use arrow::array::BooleanBufferBuilder;
use arrow::buffer::NullBuffer;
use arrow::datatypes::i256;
Expand All @@ -28,6 +27,7 @@ use arrow_schema::DataType;
use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
use foldhash::fast::RandomState;
use half::f16;
use hashbrown::raw::RawTable;
use std::sync::Arc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use crate::aggregates::group_values::GroupValues;
use foldhash::fast::RandomState;
use arrow::compute::cast;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, Rows, SortField};
Expand All @@ -26,6 +25,7 @@ use datafusion_common::hash_utils::create_hashes;
use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use foldhash::fast::RandomState;
use hashbrown::raw::RawTable;
use std::sync::Arc;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/topk/hash_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use crate::aggregates::group_values::primitive::HashValue;
use crate::aggregates::topk::heap::Comparable;
use foldhash::fast::RandomState;
use arrow::datatypes::i256;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::cast::AsArray;
Expand All @@ -30,6 +29,7 @@ use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use foldhash::fast::RandomState;
use half::f16;
use hashbrown::raw::RawTable;
use std::fmt::Debug;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ use datafusion_physical_expr::equivalence::{
};
use datafusion_physical_expr::PhysicalExprRef;

use foldhash::fast::RandomState;
use datafusion_expr::Operator;
use datafusion_physical_expr_common::datum::compare_op_for_nested;
use foldhash::fast::RandomState;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use parking_lot::Mutex;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};

use foldhash::fast::RandomState;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use foldhash::fast::RandomState;
use futures::{ready, Stream, StreamExt};
use hashbrown::HashSet;
use parking_lot::Mutex;
Expand Down
23 changes: 17 additions & 6 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr};
use foldhash::fast::RandomState;
use foldhash::fast::FixedState;

use crate::execution_plan::CardinalityEffect;
use futures::stream::Stream;
Expand Down Expand Up @@ -124,6 +124,8 @@ impl RepartitionExecState {

// launch one async task per *input* partition
let mut spawned_tasks = Vec::with_capacity(num_input_partitions);
let random_state = FixedState::default();

for i in 0..num_input_partitions {
let txs: HashMap<_, _> = channels
.iter()
Expand All @@ -141,6 +143,7 @@ impl RepartitionExecState {
partitioning.clone(),
r_metrics,
Arc::clone(&context),
random_state,
));

// In a separate task, wait for each input to be done
Expand Down Expand Up @@ -183,7 +186,7 @@ pub struct BatchPartitioner {

enum BatchPartitionerState {
Hash {
random_state: foldhash::fast::RandomState,
random_state: foldhash::fast::FixedState,
exprs: Vec<Arc<dyn PhysicalExpr>>,
num_partitions: usize,
hash_buffer: Vec<u64>,
Expand All @@ -198,7 +201,11 @@ impl BatchPartitioner {
/// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
///
/// The time spent repartitioning will be recorded to `timer`
pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result<Self> {
pub fn try_new(
partitioning: Partitioning,
timer: metrics::Time,
random_state: FixedState,
) -> Result<Self> {
let state = match partitioning {
Partitioning::RoundRobinBatch(num_partitions) => {
BatchPartitionerState::RoundRobin {
Expand All @@ -209,7 +216,7 @@ impl BatchPartitioner {
Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash {
exprs,
num_partitions,
random_state: RandomState::default(),
random_state: random_state,
hash_buffer: vec![],
},
other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"),
Expand Down Expand Up @@ -780,9 +787,13 @@ impl RepartitionExec {
partitioning: Partitioning,
metrics: RepartitionMetrics,
context: Arc<TaskContext>,
random_state: FixedState,
) -> Result<()> {
let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;
let mut partitioner = BatchPartitioner::try_new(
partitioning,
metrics.repartition_time.clone(),
random_state,
)?;

// execute the child operator
let timer = metrics.fetch_time.timer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::{
ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
use foldhash::fast::RandomState;
use arrow::{
array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
compute::{concat, concat_batches, sort_to_indices},
Expand All @@ -61,6 +60,7 @@ use datafusion_physical_expr::window::{
};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use foldhash::fast::RandomState;
use futures::stream::Stream;
use futures::{ready, StreamExt};
use hashbrown::raw::RawTable;
Expand Down

0 comments on commit ea7c6b2

Please sign in to comment.