Skip to content

Commit

Permalink
Revert "Wip"
Browse files Browse the repository at this point in the history
This reverts commit f2fbcc0.
  • Loading branch information
Dandandan committed Oct 11, 2024
1 parent f2fbcc0 commit e1dfa62
Show file tree
Hide file tree
Showing 16 changed files with 43 additions and 56 deletions.
27 changes: 13 additions & 14 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_null<H: BuildHasher>(
random_state: &H,
hashes_buffer: &'_ mut [u64],
mul_col: bool,
) {
fn hash_null<H: BuildHasher>(random_state: &H, hashes_buffer: &'_ mut [u64], mul_col: bool) {

if mul_col {
hashes_buffer.iter_mut().for_each(|hash| {
// stable hash for null value
Expand All @@ -64,11 +61,11 @@ fn hash_null<H: BuildHasher>(
}

pub trait HashValue {
fn hash_one<H: BuildHasher>(&self, state: &H) -> u64;
fn hash_one<H:BuildHasher>(&self, state: &H) -> u64;
}

impl<'a, T: HashValue + ?Sized> HashValue for &'a T {
fn hash_one<H: BuildHasher>(&self, state: &H) -> u64 {
fn hash_one<H:BuildHasher>(&self, state: &H) -> u64 {
T::hash_one(self, state)
}
}
Expand Down Expand Up @@ -100,7 +97,7 @@ hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
/// If `rehash==true` this combines the previous hash value in the buffer
/// with the new hash using `combine_hashes`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array_primitive<T, H: BuildHasher>(
fn hash_array_primitive<T, H:BuildHasher>(
array: &PrimitiveArray<T>,
random_state: &H,
hashes_buffer: &mut [u64],
Expand Down Expand Up @@ -379,6 +376,7 @@ pub fn create_hashes<'a, H: BuildHasher>(
random_state: &H,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {

for (i, col) in arrays.iter().enumerate() {
let array = col.as_ref();
// combine hashes with `combine_hashes` for all columns besides the first
Expand Down Expand Up @@ -445,6 +443,7 @@ pub fn create_hashes<'a, H: BuildHasher>(
mod tests {
use std::sync::Arc;


use arrow::array::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::datatypes::*;
Expand Down Expand Up @@ -541,8 +540,8 @@ mod tests {
let fixed_size_binary_array =
Arc::new(FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap());

let random_state = RandomState::default();
let hashes_buff = &mut vec![0; fixed_size_binary_array.len()];
let random_state = RandomState::default();
let hashes_buff = &mut vec![0; fixed_size_binary_array.len()];
let hashes =
create_hashes(&[fixed_size_binary_array], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 3,);
Expand Down Expand Up @@ -667,8 +666,8 @@ mod tests {
];
let list_array =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) as ArrayRef;
let random_state = RandomState::default();
let mut hashes = vec![0; list_array.len()];
let random_state = RandomState::default();
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
Expand All @@ -692,8 +691,8 @@ mod tests {
Arc::new(FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
data, 3,
)) as ArrayRef;
let random_state = RandomState::default();
let mut hashes = vec![0; list_array.len()];
let random_state = RandomState::default();
let mut hashes = vec![0; list_array.len()];
create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
assert_eq!(hashes[0], hashes[5]);
assert_eq!(hashes[1], hashes[4]);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,7 @@ impl std::hash::Hash for ScalarValue {
fn hash_nested_array<H: std::hash::Hasher>(arr: ArrayRef, state: &mut H) {
let arrays = vec![arr.to_owned()];
let hashes_buffer = &mut vec![0; arr.len()];
let hashes =
create_hashes(&arrays, &FixedState::with_seed(0), hashes_buffer).unwrap();
let hashes = create_hashes(&arrays, &FixedState::with_seed(0), hashes_buffer).unwrap();
// Hash back to std::hash::Hasher
hashes.hash(state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;

use foldhash::fast::RandomState;
use arrow::array::types::ArrowPrimitiveType;
use arrow::array::ArrayRef;
use arrow::array::PrimitiveArray;
use arrow::datatypes::DataType;
use foldhash::fast::RandomState;

use datafusion_common::cast::{as_list_array, as_primitive_array};
use datafusion_common::utils::array_into_list_array_nullable;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/bit_and_or_xor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use std::any::Any;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};

use foldhash::fast::RandomState;
use arrow::array::{downcast_integer, Array, ArrayRef, AsArray};
use arrow::datatypes::{
ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, Int64Type,
Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow_schema::Field;
use foldhash::fast::RandomState;

use datafusion_common::cast::as_list_array;
use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use foldhash::fast::RandomState;
use datafusion_common::stats::Precision;
use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator;
use datafusion_physical_expr::expressions;
use foldhash::fast::RandomState;
use std::collections::HashSet;
use std::ops::BitAnd;
use std::{fmt::Debug, sync::Arc};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Defines `SUM` and `SUM DISTINCT` aggregate accumulators

use datafusion_expr::utils::AggregateOrderSensitivity;
use foldhash::fast::RandomState;
use datafusion_expr::utils::AggregateOrderSensitivity;
use std::any::Any;
use std::collections::HashSet;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr-common/src/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`ArrowBytesMap`] and [`ArrowBytesSet`] for storing maps/sets of values from
//! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray.

use foldhash::fast::RandomState;
use arrow::array::cast::AsArray;
use arrow::array::types::{ByteArrayType, GenericBinaryType, GenericStringType};
use arrow::array::{
Expand All @@ -28,7 +29,6 @@ use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt};
use foldhash::fast::RandomState;
use std::any::type_name;
use std::fmt::Debug;
use std::mem;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr-common/src/binary_view_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
//! `StringViewArray`/`BinaryViewArray`.
//! Much of the code is from `binary_map.rs`, but with simpler implementation because we directly use the
//! [`GenericByteViewBuilder`].
use foldhash::fast::RandomState;
use arrow::array::cast::AsArray;
use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder};
use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::proxy::{RawTableAllocExt, VecAllocExt};
use foldhash::fast::RandomState;
use std::fmt::Debug;
use std::sync::Arc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::aggregates::group_values::group_column::{
ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder,
};
use crate::aggregates::group_values::GroupValues;
use foldhash::fast::RandomState;
use arrow::compute::cast;
use arrow::datatypes::{
Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Expand All @@ -32,7 +33,6 @@ use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use datafusion_physical_expr::binary_map::OutputType;
use foldhash::fast::RandomState;

use hashbrown::raw::RawTable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::aggregates::group_values::GroupValues;
use foldhash::fast::RandomState;
use arrow::array::BooleanBufferBuilder;
use arrow::buffer::NullBuffer;
use arrow::datatypes::i256;
Expand All @@ -27,7 +28,6 @@ use arrow_schema::DataType;
use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
use foldhash::fast::RandomState;
use half::f16;
use hashbrown::raw::RawTable;
use std::sync::Arc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::aggregates::group_values::GroupValues;
use foldhash::fast::RandomState;
use arrow::compute::cast;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, Rows, SortField};
Expand All @@ -25,7 +26,6 @@ use datafusion_common::hash_utils::create_hashes;
use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use foldhash::fast::RandomState;
use hashbrown::raw::RawTable;
use std::sync::Arc;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/topk/hash_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use crate::aggregates::group_values::primitive::HashValue;
use crate::aggregates::topk::heap::Comparable;
use foldhash::fast::RandomState;
use arrow::datatypes::i256;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::cast::AsArray;
Expand All @@ -29,7 +30,6 @@ use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use foldhash::fast::RandomState;
use half::f16;
use hashbrown::raw::RawTable;
use std::fmt::Debug;
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ use datafusion_physical_expr::equivalence::{
};
use datafusion_physical_expr::PhysicalExprRef;

use foldhash::fast::RandomState;
use datafusion_expr::Operator;
use datafusion_physical_expr_common::datum::compare_op_for_nested;
use foldhash::fast::FixedState;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use parking_lot::Mutex;

Expand Down Expand Up @@ -310,7 +310,7 @@ pub struct HashJoinExec {
/// Future that consumes left input and builds the hash table
left_fut: OnceAsync<JoinLeftData>,
/// Shared the `RandomState` for the hashing algorithm
random_state: FixedState,
random_state: RandomState,
/// Partitioning mode to use
pub mode: PartitionMode,
/// Execution metrics
Expand Down Expand Up @@ -355,7 +355,7 @@ impl HashJoinExec {
let (join_schema, column_indices) =
build_join_schema(&left_schema, &right_schema, join_type);

let random_state = FixedState::default();
let random_state = RandomState::default();

let join_schema = Arc::new(join_schema);

Expand Down Expand Up @@ -798,7 +798,7 @@ impl ExecutionPlan for HashJoinExec {
#[allow(clippy::too_many_arguments)]
async fn collect_left_input(
partition: Option<usize>,
random_state: FixedState,
random_state: RandomState,
left: Arc<dyn ExecutionPlan>,
on_left: Vec<PhysicalExprRef>,
context: Arc<TaskContext>,
Expand Down Expand Up @@ -910,7 +910,7 @@ pub fn update_hash<T>(
batch: &RecordBatch,
hash_map: &mut T,
offset: usize,
random_state: &FixedState,
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
deleted_offset: usize,
fifo_hashmap: bool,
Expand Down Expand Up @@ -1074,7 +1074,7 @@ struct HashJoinStream {
/// right (probe) input
right: SendableRecordBatchStream,
/// Random state used for hashing initialization
random_state: FixedState,
random_state: RandomState,
/// Metrics
join_metrics: BuildProbeJoinMetrics,
/// Information of index and left / right placement of columns
Expand Down
14 changes: 7 additions & 7 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};

use foldhash::fast::RandomState;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use foldhash::fast::{FixedState, RandomState};
use futures::{ready, Stream, StreamExt};
use hashbrown::HashSet;
use parking_lot::Mutex;
Expand Down Expand Up @@ -177,7 +177,7 @@ pub struct SymmetricHashJoinExec {
/// How the join is performed
pub(crate) join_type: JoinType,
/// Shares the `RandomState` for the hashing algorithm
random_state: FixedState,
random_state: RandomState,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Information of index and left / right placement of columns
Expand Down Expand Up @@ -231,7 +231,7 @@ impl SymmetricHashJoinExec {
build_join_schema(&left_schema, &right_schema, join_type);

// Initialize the random state for the join operation:
let random_state = FixedState::default();
let random_state = RandomState::default();
let schema = Arc::new(schema);
let cache =
Self::compute_properties(&left, &right, Arc::clone(&schema), *join_type, &on);
Expand Down Expand Up @@ -547,7 +547,7 @@ struct SymmetricHashJoinStream {
// Right globally sorted filter expr
right_sorted_filter_expr: Option<SortedFilterExpr>,
/// Random state used for hashing initialization
random_state: FixedState,
random_state: RandomState,
/// If null_equals_null is true, null == null else null != null
null_equals_null: bool,
/// Metrics
Expand Down Expand Up @@ -786,7 +786,7 @@ pub(crate) fn join_with_probe_batch(
filter: Option<&JoinFilter>,
probe_batch: &RecordBatch,
column_indices: &[ColumnIndex],
random_state: &FixedState,
random_state: &RandomState,
null_equals_null: bool,
) -> Result<Option<RecordBatch>> {
if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
Expand Down Expand Up @@ -879,7 +879,7 @@ fn lookup_join_hashmap(
probe_batch: &RecordBatch,
build_on: &[PhysicalExprRef],
probe_on: &[PhysicalExprRef],
random_state: &FixedState,
random_state: &RandomState,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
deleted_offset: Option<usize>,
Expand Down Expand Up @@ -1010,7 +1010,7 @@ impl OneSideHashJoiner {
pub(crate) fn update_internal_state(
&mut self,
batch: &RecordBatch,
random_state: &FixedState,
random_state: &RandomState,
) -> Result<()> {
// Merge the incoming batch with the existing input buffer:
self.input_buffer = concat_batches(&batch.schema(), [&self.input_buffer, batch])?;
Expand Down
Loading

0 comments on commit e1dfa62

Please sign in to comment.