From 00b4d61956c76a0df055dcbf13ddff254c7cf04e Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Mon, 28 Oct 2024 19:48:08 +0100 Subject: [PATCH] refactor(rust): Add groupby partitioning and parallel groupby finishing to new-streaming engine (#19451) --- Cargo.lock | 1 + crates/polars-arrow/src/bitmap/builder.rs | 100 +++++++++++++++++ crates/polars-arrow/src/bitmap/mod.rs | 3 + .../polars-core/src/hashing/vector_hasher.rs | 6 +- crates/polars-expr/Cargo.toml | 1 + crates/polars-expr/src/groups/mod.rs | 13 +-- crates/polars-expr/src/groups/row_encoded.rs | 88 +++++++++++++-- crates/polars-expr/src/reduce/len.rs | 12 ++ crates/polars-expr/src/reduce/min_max.rs | 39 +++++++ crates/polars-expr/src/reduce/mod.rs | 57 ++++++++++ crates/polars-expr/src/reduce/partition.rs | 105 ++++++++++++++++++ crates/polars-expr/src/reduce/sum.rs | 16 +++ crates/polars-stream/src/nodes/group_by.rs | 83 ++++++++++++-- crates/polars-utils/src/hashing.rs | 5 + 14 files changed, 500 insertions(+), 29 deletions(-) create mode 100644 crates/polars-arrow/src/bitmap/builder.rs create mode 100644 crates/polars-expr/src/reduce/partition.rs diff --git a/Cargo.lock b/Cargo.lock index d10a91b718db..85f52a567c9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2888,6 +2888,7 @@ dependencies = [ "polars-row", "polars-time", "polars-utils", + "rand", "rayon", ] diff --git a/crates/polars-arrow/src/bitmap/builder.rs b/crates/polars-arrow/src/bitmap/builder.rs new file mode 100644 index 000000000000..c507df97c5ba --- /dev/null +++ b/crates/polars-arrow/src/bitmap/builder.rs @@ -0,0 +1,100 @@ +use crate::bitmap::{Bitmap, MutableBitmap}; +use crate::storage::SharedStorage; + +/// Used to build bitmaps bool-by-bool in sequential order. +#[derive(Default, Clone)] +pub struct BitmapBuilder { + buf: u64, + len: usize, + cap: usize, + set_bits: usize, + bytes: Vec, +} + +impl BitmapBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn capacity(&self) -> usize { + self.cap + } + + pub fn with_capacity(bits: usize) -> Self { + let bytes = Vec::with_capacity(bits.div_ceil(64) * 8); + let words_available = bytes.capacity() / 8; + Self { + buf: 0, + len: 0, + cap: words_available * 64, + set_bits: 0, + bytes, + } + } + + #[inline(always)] + pub fn reserve(&mut self, additional: usize) { + if self.len + additional > self.cap { + self.reserve_slow(additional) + } + } + + #[cold] + #[inline(never)] + fn reserve_slow(&mut self, additional: usize) { + let bytes_needed = (self.len + additional).div_ceil(64) * 8; + self.bytes.reserve(bytes_needed - self.bytes.capacity()); + let words_available = self.bytes.capacity() / 8; + self.cap = words_available * 64; + } + + #[inline(always)] + pub fn push(&mut self, x: bool) { + self.reserve(1); + unsafe { self.push_unchecked(x) } + } + + /// # Safety + /// self.len() < self.capacity() must hold. + #[inline(always)] + pub unsafe fn push_unchecked(&mut self, x: bool) { + debug_assert!(self.len < self.cap); + self.buf |= (x as u64) << (self.len % 64); + self.len += 1; + if self.len % 64 == 0 { + let p = self.bytes.as_mut_ptr().add(self.bytes.len()).cast::(); + p.write_unaligned(self.buf.to_le()); + self.bytes.set_len(self.bytes.len() + 8); + self.set_bits += self.buf.count_ones() as usize; + self.buf = 0; + } + } + + /// # Safety + /// May only be called once at the end. + unsafe fn finish(&mut self) { + if self.len % 64 != 0 { + self.bytes.extend_from_slice(&self.buf.to_le_bytes()); + self.set_bits += self.buf.count_ones() as usize; + } + } + + pub fn into_mut(mut self) -> MutableBitmap { + unsafe { + self.finish(); + MutableBitmap::from_vec(self.bytes, self.len) + } + } + + pub fn freeze(mut self) -> Bitmap { + unsafe { + self.finish(); + let storage = SharedStorage::from_vec(self.bytes); + Bitmap::from_inner_unchecked(storage, 0, self.len, Some(self.len - self.set_bits)) + } + } +} diff --git a/crates/polars-arrow/src/bitmap/mod.rs b/crates/polars-arrow/src/bitmap/mod.rs index e7ed5fa363e8..6d518bf596b4 100644 --- a/crates/polars-arrow/src/bitmap/mod.rs +++ b/crates/polars-arrow/src/bitmap/mod.rs @@ -19,3 +19,6 @@ pub use assign_ops::*; pub mod utils; pub mod bitmask; + +mod builder; +pub use builder::*; diff --git a/crates/polars-core/src/hashing/vector_hasher.rs b/crates/polars-core/src/hashing/vector_hasher.rs index 7dfb07c64d58..e00e45f1ede8 100644 --- a/crates/polars-core/src/hashing/vector_hasher.rs +++ b/crates/polars-core/src/hashing/vector_hasher.rs @@ -1,4 +1,5 @@ use arrow::bitmap::utils::get_bit_unchecked; +use polars_utils::hashing::folded_multiply; use polars_utils::total_ord::{ToTotalOrd, TotalHash}; use rayon::prelude::*; use xxhash_rust::xxh3::xxh3_64_with_seed; @@ -30,11 +31,6 @@ pub trait VecHash { } } -pub(crate) const fn folded_multiply(s: u64, by: u64) -> u64 { - let result = (s as u128).wrapping_mul(by as u128); - ((result & 0xffff_ffff_ffff_ffff) as u64) ^ ((result >> 64) as u64) -} - pub(crate) fn get_null_hash_value(random_state: &PlRandomState) -> u64 { // we just start with a large prime number and hash that twice // to get a constant hash value for null/None diff --git a/crates/polars-expr/Cargo.toml b/crates/polars-expr/Cargo.toml index 0911445617aa..29aa34652146 100644 --- a/crates/polars-expr/Cargo.toml +++ b/crates/polars-expr/Cargo.toml @@ -24,6 +24,7 @@ polars-plan = { workspace = true } polars-row = { workspace = true } polars-time = { workspace = true, optional = true } polars-utils = { workspace = true } +rand = { workspace = true } rayon = { workspace = true } [features] diff --git a/crates/polars-expr/src/groups/mod.rs b/crates/polars-expr/src/groups/mod.rs index 5eb32b34a052..43091244c661 100644 --- a/crates/polars-expr/src/groups/mod.rs +++ b/crates/polars-expr/src/groups/mod.rs @@ -23,21 +23,20 @@ pub trait Grouper: Any + Send { /// the ith group of other now has group index group_idxs[i] in self. fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec); - /// Partitions this Grouper into the given partitions. + /// Partitions this Grouper into the given number of partitions. /// - /// Updates partition_idxs and group_idxs such that the ith group of self - /// has group index group_idxs[i] in partition partition_idxs[i]. + /// Updates partition_idxs such that the ith group of self moves to partition + /// partition_idxs[i]. /// /// It is guaranteed that two equal keys in two independent partition_into /// calls map to the same partition index if the seed and the number of /// partitions is equal. - fn partition_into( + fn partition( &self, seed: u64, - partitions: &mut [Box], + num_partitions: usize, partition_idxs: &mut Vec, - group_idxs: &mut Vec, - ); + ) -> Vec>; /// Returns the keys in this Grouper in group order, that is the key for /// group i is returned in row i. diff --git a/crates/polars-expr/src/groups/row_encoded.rs b/crates/polars-expr/src/groups/row_encoded.rs index 46ec956106a5..1a2fd5209436 100644 --- a/crates/polars-expr/src/groups/row_encoded.rs +++ b/crates/polars-expr/src/groups/row_encoded.rs @@ -4,8 +4,10 @@ use hashbrown::hash_table::{Entry, HashTable}; use polars_core::chunked_array::ops::row_encode::_get_rows_encoded_unordered; use polars_row::EncodingField; use polars_utils::aliases::PlRandomState; +use polars_utils::hashing::{folded_multiply, hash_to_partition}; use polars_utils::itertools::Itertools; use polars_utils::vec::PushUnchecked; +use rand::Rng; use super::*; @@ -27,7 +29,13 @@ pub struct RowEncodedHashGrouper { key_schema: Arc, table: HashTable, key_data: Vec, + + // Used for computing canonical hashes. random_state: PlRandomState, + + // Internal random seed used to keep hash iteration order decorrelated. + // We simply store a random odd number and multiply the canonical hash by it. + seed: u64, } impl RowEncodedHashGrouper { @@ -35,6 +43,7 @@ impl RowEncodedHashGrouper { Self { key_schema, random_state, + seed: rand::random::() | 1, ..Default::default() } } @@ -42,9 +51,9 @@ impl RowEncodedHashGrouper { fn insert_key(&mut self, hash: u64, key: &[u8]) -> IdxSize { let num_groups = self.table.len(); let entry = self.table.entry( - hash, + hash.wrapping_mul(self.seed), |g| unsafe { hash == g.key_hash && key == g.key(&self.key_data) }, - |g| g.key_hash, + |g| g.key_hash.wrapping_mul(self.seed), ); match entry { @@ -64,6 +73,23 @@ impl RowEncodedHashGrouper { } } + /// Insert a key, without checking that it is unique. + fn insert_key_unique(&mut self, hash: u64, key: &[u8]) -> IdxSize { + let group_idx = self.table.len().try_into().unwrap(); + let group = Group { + key_hash: hash, + key_offset: self.key_data.len(), + key_length: key.len().try_into().unwrap(), + group_idx, + }; + self.key_data.extend(key); + self.table + .insert_unique(hash.wrapping_mul(self.seed), group, |g| { + g.key_hash.wrapping_mul(self.seed) + }); + group_idx + } + fn finalize_keys(&self, mut key_rows: Vec<&[u8]>) -> DataFrame { let key_dtypes = self .key_schema @@ -125,7 +151,9 @@ impl Grouper for RowEncodedHashGrouper { fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec) { let other = other.as_any().downcast_ref::().unwrap(); - self.table.reserve(other.table.len(), |g| g.key_hash); // TODO: cardinality estimation. + // TODO: cardinality estimation. + self.table + .reserve(other.table.len(), |g| g.key_hash.wrapping_mul(self.seed)); unsafe { group_idxs.clear(); @@ -167,14 +195,54 @@ impl Grouper for RowEncodedHashGrouper { ) } - fn partition_into( + fn partition( &self, - _seed: u64, - _partitions: &mut [Box], - _partition_idxs: &mut Vec, - _group_idxs: &mut Vec, - ) { - unimplemented!() + seed: u64, + num_partitions: usize, + partition_idxs: &mut Vec, + ) -> Vec> { + assert!(num_partitions > 0); + + // Two-pass algorithm to prevent reallocations. + let mut partition_size = vec![(0, 0); num_partitions]; // (keys, bytes) + unsafe { + for group in self.table.iter() { + let ph = folded_multiply(group.key_hash, seed | 1); + let p_idx = hash_to_partition(ph, num_partitions); + let (p_keys, p_bytes) = partition_size.get_unchecked_mut(p_idx as usize); + *p_keys += 1; + *p_bytes += group.key_length as usize; + } + } + + let mut rng = rand::thread_rng(); + let mut partitions = partition_size + .into_iter() + .map(|(keys, bytes)| Self { + key_schema: self.key_schema.clone(), + table: HashTable::with_capacity(keys), + key_data: Vec::with_capacity(bytes), + random_state: self.random_state.clone(), + seed: rng.gen::() | 1, + }) + .collect_vec(); + + unsafe { + partition_idxs.clear(); + partition_idxs.reserve(self.table.len()); + let partition_idxs_out = partition_idxs.spare_capacity_mut(); + for group in self.table.iter() { + let ph = folded_multiply(group.key_hash, seed | 1); + let p_idx = hash_to_partition(ph, num_partitions); + let p = partitions.get_unchecked_mut(p_idx); + p.insert_key_unique(group.key_hash, group.key(&self.key_data)); + *partition_idxs_out.get_unchecked_mut(group.group_idx as usize) = + MaybeUninit::new(p_idx as IdxSize); + } + partition_idxs.set_len(self.table.len()); + } + + partitions.into_iter().map(|p| Box::new(p) as _).collect() } fn as_any(&self) -> &dyn Any { diff --git a/crates/polars-expr/src/reduce/len.rs b/crates/polars-expr/src/reduce/len.rs index fa5aedb91f18..57641b1a02b6 100644 --- a/crates/polars-expr/src/reduce/len.rs +++ b/crates/polars-expr/src/reduce/len.rs @@ -1,6 +1,7 @@ use polars_core::error::constants::LENGTH_LIMIT_MSG; use super::*; +use crate::reduce::partition::partition_vec; #[derive(Default)] pub struct LenReduce { @@ -61,6 +62,17 @@ impl GroupedReduction for LenReduce { Ok(ca.into_series()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + ) -> Vec> { + partition_vec(self.groups, partition_sizes, partition_idxs) + .into_iter() + .map(|groups| Box::new(Self { groups }) as _) + .collect() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/crates/polars-expr/src/reduce/min_max.rs b/crates/polars-expr/src/reduce/min_max.rs index f4541d7a88a1..de25d3efc927 100644 --- a/crates/polars-expr/src/reduce/min_max.rs +++ b/crates/polars-expr/src/reduce/min_max.rs @@ -11,6 +11,7 @@ use polars_utils::float::IsFloat; use polars_utils::min_max::MinMax; use super::*; +use crate::reduce::partition::partition_mask; pub fn new_min_reduction(dtype: DataType, propagate_nans: bool) -> Box { use DataType::*; @@ -344,6 +345,25 @@ impl GroupedReduction for BoolMinGroupedReduction { Ok(()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + ) -> Vec> { + let p_values = partition_mask(&self.values.freeze(), partition_sizes, partition_idxs); + let p_mask = partition_mask(&self.mask.freeze(), partition_sizes, partition_idxs); + p_values + .into_iter() + .zip(p_mask) + .map(|(values, mask)| { + Box::new(Self { + values: values.into_mut(), + mask: mask.into_mut(), + }) as _ + }) + .collect() + } + fn finalize(&mut self) -> PolarsResult { let v = core::mem::take(&mut self.values); let m = core::mem::take(&mut self.mask); @@ -450,6 +470,25 @@ impl GroupedReduction for BoolMaxGroupedReduction { }) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + ) -> Vec> { + let p_values = partition_mask(&self.values.freeze(), partition_sizes, partition_idxs); + let p_mask = partition_mask(&self.mask.freeze(), partition_sizes, partition_idxs); + p_values + .into_iter() + .zip(p_mask) + .map(|(values, mask)| { + Box::new(Self { + values: values.into_mut(), + mask: mask.into_mut(), + }) as _ + }) + .collect() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/crates/polars-expr/src/reduce/mod.rs b/crates/polars-expr/src/reduce/mod.rs index 170ee6abff6c..bfe4cb56417b 100644 --- a/crates/polars-expr/src/reduce/mod.rs +++ b/crates/polars-expr/src/reduce/mod.rs @@ -2,6 +2,7 @@ mod convert; mod len; mod mean; mod min_max; +mod partition; mod sum; mod var_std; @@ -49,6 +50,22 @@ pub trait GroupedReduction: Any + Send { group_idxs: &[IdxSize], ) -> PolarsResult<()>; + /// Partitions this GroupedReduction into several partitions. + /// + /// The ith group of this GroupedReduction should becomes the group_idxs[i] + /// group in partition partition_idxs[i]. + /// + /// # Safety + /// partitions_idxs[i] < partition_sizes.len() for all i. + /// group_idxs[i] < partition_sizes[partition_idxs[i]] for all i. + /// Each partition p has an associated set of group_idxs, this set contains + /// 0..partition_size[p] exactly once. + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + ) -> Vec>; + /// Returns the finalized value per group as a Series. /// /// After this operation the number of groups is reset to 0. @@ -245,6 +262,23 @@ where Ok(()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + ) -> Vec> { + partition::partition_vec(self.values, partition_sizes, partition_idxs) + .into_iter() + .map(|values| { + Box::new(Self { + values, + in_dtype: self.in_dtype.clone(), + reducer: self.reducer.clone(), + }) as _ + }) + .collect() + } + fn finalize(&mut self) -> PolarsResult { let v = core::mem::take(&mut self.values); self.reducer.finish(v, None, &self.in_dtype) @@ -353,6 +387,29 @@ where Ok(()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + ) -> Vec> { + partition::partition_vec_mask( + self.values, + &self.mask.freeze(), + partition_sizes, + partition_idxs, + ) + .into_iter() + .map(|(values, mask)| { + Box::new(Self { + values, + mask: mask.into_mut(), + in_dtype: self.in_dtype.clone(), + reducer: self.reducer.clone(), + }) as _ + }) + .collect() + } + fn finalize(&mut self) -> PolarsResult { let v = core::mem::take(&mut self.values); let m = core::mem::take(&mut self.mask); diff --git a/crates/polars-expr/src/reduce/partition.rs b/crates/polars-expr/src/reduce/partition.rs new file mode 100644 index 000000000000..0152035879bd --- /dev/null +++ b/crates/polars-expr/src/reduce/partition.rs @@ -0,0 +1,105 @@ +use arrow::bitmap::{Bitmap, BitmapBuilder}; +use polars_utils::itertools::Itertools; +use polars_utils::vec::PushUnchecked; +use polars_utils::IdxSize; + +/// Partitions this Vec into multiple Vecs. +/// +/// # Safety +/// partitions_idxs[i] < partition_sizes.len() for all i. +/// idx_in_partition[i] < partition_sizes[partition_idxs[i]] for all i. +/// Each partition p has an associated set of idx_in_partition, this set +/// contains 0..partition_size[p] exactly once. +pub unsafe fn partition_vec( + v: Vec, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], +) -> Vec> { + assert!(partition_idxs.len() == v.len()); + + let mut partitions = partition_sizes + .iter() + .map(|sz| Vec::::with_capacity(*sz as usize)) + .collect_vec(); + + unsafe { + // Scatter into each partition. + for (i, val) in v.into_iter().enumerate() { + let p_idx = *partition_idxs.get_unchecked(i) as usize; + debug_assert!(p_idx < partitions.len()); + let p = partitions.get_unchecked_mut(p_idx); + p.push_unchecked(val); + } + + for (p, sz) in partitions.iter_mut().zip(partition_sizes) { + p.set_len(*sz as usize); + } + } + + partitions +} + +/// # Safety +/// Same as partition_vec. +pub unsafe fn partition_mask( + m: &Bitmap, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], +) -> Vec { + assert!(partition_idxs.len() == m.len()); + + let mut partitions = partition_sizes + .iter() + .map(|sz| BitmapBuilder::with_capacity(*sz as usize)) + .collect_vec(); + + unsafe { + // Scatter into each partition. + for i in 0..m.len() { + let p_idx = *partition_idxs.get_unchecked(i) as usize; + let p = partitions.get_unchecked_mut(p_idx); + p.push_unchecked(m.get_bit_unchecked(i)); + } + } + + partitions +} + +/// A fused loop of partition_vec and partition_mask. +/// # Safety +/// Same as partition_vec. +pub unsafe fn partition_vec_mask( + v: Vec, + m: &Bitmap, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], +) -> Vec<(Vec, BitmapBuilder)> { + assert!(partition_idxs.len() == v.len()); + assert!(m.len() == v.len()); + + let mut partitions = partition_sizes + .iter() + .map(|sz| { + ( + Vec::::with_capacity(*sz as usize), + BitmapBuilder::with_capacity(*sz as usize), + ) + }) + .collect_vec(); + + unsafe { + // Scatter into each partition. + for (i, val) in v.into_iter().enumerate() { + let p_idx = *partition_idxs.get_unchecked(i) as usize; + let (pv, pm) = partitions.get_unchecked_mut(p_idx); + pv.push_unchecked(val); + pm.push_unchecked(m.get_bit_unchecked(i)); + } + + for (p, sz) in partitions.iter_mut().zip(partition_sizes) { + p.0.set_len(*sz as usize); + } + } + + partitions +} diff --git a/crates/polars-expr/src/reduce/sum.rs b/crates/polars-expr/src/reduce/sum.rs index 111d69eec4f2..466d5ffb9f9d 100644 --- a/crates/polars-expr/src/reduce/sum.rs +++ b/crates/polars-expr/src/reduce/sum.rs @@ -126,6 +126,22 @@ where Ok(()) } + unsafe fn partition( + self: Box, + partition_sizes: &[IdxSize], + partition_idxs: &[IdxSize], + ) -> Vec> { + partition::partition_vec(self.sums, partition_sizes, partition_idxs) + .into_iter() + .map(|sums| { + Box::new(Self { + sums, + in_dtype: self.in_dtype.clone(), + }) as _ + }) + .collect() + } + fn finalize(&mut self) -> PolarsResult { let v = core::mem::take(&mut self.sums); let arr = Box::new(PrimitiveArray::::from_vec(v)); diff --git a/crates/polars-stream/src/nodes/group_by.rs b/crates/polars-stream/src/nodes/group_by.rs index 6954263bb99b..9827268a4ff5 100644 --- a/crates/polars-stream/src/nodes/group_by.rs +++ b/crates/polars-stream/src/nodes/group_by.rs @@ -1,9 +1,14 @@ +use std::mem::ManuallyDrop; use std::sync::Arc; use polars_core::prelude::IntoColumn; use polars_core::schema::Schema; +use polars_core::utils::accumulate_dataframes_vertical_unchecked; use polars_expr::groups::Grouper; use polars_expr::reduce::GroupedReduction; +use polars_utils::itertools::Itertools; +use polars_utils::sync::SyncPtr; +use rayon::prelude::*; use super::compute_node_prelude::*; use crate::async_primitives::connector::Receiver; @@ -77,12 +82,13 @@ impl GroupBySinkState { } } - fn into_source(mut self, output_schema: &Schema) -> PolarsResult { - // TODO: parallelize this with partitions. + fn combine_locals( + output_schema: &Schema, + mut locals: Vec, + ) -> PolarsResult { let mut group_idxs = Vec::new(); - let num_pipelines = self.local.len(); - let mut combined = self.local.pop().unwrap(); - for local in self.local { + let mut combined = locals.pop().unwrap(); + for local in locals { combined.grouper.combine(&*local.grouper, &mut group_idxs); for (l, r) in combined .grouped_reductions @@ -102,10 +108,73 @@ impl GroupBySinkState { out.with_column_unchecked(r.finalize()?.with_name(name.clone()).into_column()); } } - let mut source_node = InMemorySourceNode::new(Arc::new(out)); - source_node.initialize(num_pipelines); + Ok(out) + } + + fn into_source_parallel(self, output_schema: &Schema) -> PolarsResult { + let num_partitions = self.local.len(); + let seed = 0xdeadbeef; + let partitioned_locals: Vec<_> = self + .local + .into_par_iter() + .with_max_len(1) + .map(|local| { + let mut partition_idxs = Vec::new(); + let p_groupers = local + .grouper + .partition(seed, num_partitions, &mut partition_idxs); + let partition_sizes = p_groupers.iter().map(|g| g.num_groups()).collect_vec(); + let grouped_reductions_p = local + .grouped_reductions + .into_iter() + .map(|r| unsafe { r.partition(&partition_sizes, &partition_idxs) }) + .collect_vec(); + (p_groupers, grouped_reductions_p) + }) + .collect(); + + let frames = unsafe { + let mut partitioned_locals = ManuallyDrop::new(partitioned_locals); + let partitioned_locals_ptr = SyncPtr::new(partitioned_locals.as_mut_ptr()); + (0..num_partitions) + .into_par_iter() + .with_max_len(1) + .map(|p| { + let locals_in_p = (0..num_partitions) + .map(|l| { + let partitioned_local = &*partitioned_locals_ptr.get().add(l); + let (p_groupers, grouped_reductions_p) = partitioned_local; + LocalGroupBySinkState { + grouper: p_groupers.as_ptr().add(p).read(), + grouped_reductions: grouped_reductions_p + .iter() + .map(|r| r.as_ptr().add(p).read()) + .collect(), + } + }) + .collect(); + Self::combine_locals(output_schema, locals_in_p) + }) + .collect::>>() + }; + + let df = accumulate_dataframes_vertical_unchecked(frames?); + let mut source_node = InMemorySourceNode::new(Arc::new(df)); + source_node.initialize(num_partitions); Ok(source_node) } + + fn into_source(self, output_schema: &Schema) -> PolarsResult { + if std::env::var("POLARS_PARALLEL_GROUPBY_FINALIZE").as_deref() == Ok("1") { + self.into_source_parallel(output_schema) + } else { + let num_pipelines = self.local.len(); + let df = Self::combine_locals(output_schema, self.local); + let mut source_node = InMemorySourceNode::new(Arc::new(df?)); + source_node.initialize(num_pipelines); + Ok(source_node) + } + } } enum GroupByState { diff --git a/crates/polars-utils/src/hashing.rs b/crates/polars-utils/src/hashing.rs index 0682cf371457..63f4c661a2c3 100644 --- a/crates/polars-utils/src/hashing.rs +++ b/crates/polars-utils/src/hashing.rs @@ -2,6 +2,11 @@ use std::hash::{Hash, Hasher}; use crate::nulls::IsNull; +pub const fn folded_multiply(a: u64, b: u64) -> u64 { + let full = (a as u128).wrapping_mul(b as u128); + (full as u64) ^ ((full >> 64) as u64) +} + /// Contains a byte slice and a precomputed hash for that string. /// During rehashes, we will rehash the hash instead of the string, that makes /// rehashing cheap and allows cache coherent small hash tables.