Skip to content

Commit

Permalink
refactor(rust): Add groupby partitioning and parallel groupby finishi…
Browse files Browse the repository at this point in the history
…ng to new-streaming engine (#19451)
  • Loading branch information
orlp authored Oct 28, 2024
1 parent 75aa4a3 commit 00b4d61
Show file tree
Hide file tree
Showing 14 changed files with 500 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 100 additions & 0 deletions crates/polars-arrow/src/bitmap/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::storage::SharedStorage;

/// Used to build bitmaps bool-by-bool in sequential order.
#[derive(Default, Clone)]
pub struct BitmapBuilder {
buf: u64,
len: usize,
cap: usize,
set_bits: usize,
bytes: Vec<u8>,
}

impl BitmapBuilder {
pub fn new() -> Self {
Self::default()
}

pub fn len(&self) -> usize {
self.len
}

pub fn capacity(&self) -> usize {
self.cap
}

pub fn with_capacity(bits: usize) -> Self {
let bytes = Vec::with_capacity(bits.div_ceil(64) * 8);
let words_available = bytes.capacity() / 8;
Self {
buf: 0,
len: 0,
cap: words_available * 64,
set_bits: 0,
bytes,
}
}

#[inline(always)]
pub fn reserve(&mut self, additional: usize) {
if self.len + additional > self.cap {
self.reserve_slow(additional)
}
}

#[cold]
#[inline(never)]
fn reserve_slow(&mut self, additional: usize) {
let bytes_needed = (self.len + additional).div_ceil(64) * 8;
self.bytes.reserve(bytes_needed - self.bytes.capacity());
let words_available = self.bytes.capacity() / 8;
self.cap = words_available * 64;
}

#[inline(always)]
pub fn push(&mut self, x: bool) {
self.reserve(1);
unsafe { self.push_unchecked(x) }
}

/// # Safety
/// self.len() < self.capacity() must hold.
#[inline(always)]
pub unsafe fn push_unchecked(&mut self, x: bool) {
debug_assert!(self.len < self.cap);
self.buf |= (x as u64) << (self.len % 64);
self.len += 1;
if self.len % 64 == 0 {
let p = self.bytes.as_mut_ptr().add(self.bytes.len()).cast::<u64>();
p.write_unaligned(self.buf.to_le());
self.bytes.set_len(self.bytes.len() + 8);
self.set_bits += self.buf.count_ones() as usize;
self.buf = 0;
}
}

/// # Safety
/// May only be called once at the end.
unsafe fn finish(&mut self) {
if self.len % 64 != 0 {
self.bytes.extend_from_slice(&self.buf.to_le_bytes());
self.set_bits += self.buf.count_ones() as usize;
}
}

pub fn into_mut(mut self) -> MutableBitmap {
unsafe {
self.finish();
MutableBitmap::from_vec(self.bytes, self.len)
}
}

pub fn freeze(mut self) -> Bitmap {
unsafe {
self.finish();
let storage = SharedStorage::from_vec(self.bytes);
Bitmap::from_inner_unchecked(storage, 0, self.len, Some(self.len - self.set_bits))
}
}
}
3 changes: 3 additions & 0 deletions crates/polars-arrow/src/bitmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ pub use assign_ops::*;
pub mod utils;

pub mod bitmask;

mod builder;
pub use builder::*;
6 changes: 1 addition & 5 deletions crates/polars-core/src/hashing/vector_hasher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow::bitmap::utils::get_bit_unchecked;
use polars_utils::hashing::folded_multiply;
use polars_utils::total_ord::{ToTotalOrd, TotalHash};
use rayon::prelude::*;
use xxhash_rust::xxh3::xxh3_64_with_seed;
Expand Down Expand Up @@ -30,11 +31,6 @@ pub trait VecHash {
}
}

pub(crate) const fn folded_multiply(s: u64, by: u64) -> u64 {
let result = (s as u128).wrapping_mul(by as u128);
((result & 0xffff_ffff_ffff_ffff) as u64) ^ ((result >> 64) as u64)
}

pub(crate) fn get_null_hash_value(random_state: &PlRandomState) -> u64 {
// we just start with a large prime number and hash that twice
// to get a constant hash value for null/None
Expand Down
1 change: 1 addition & 0 deletions crates/polars-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ polars-plan = { workspace = true }
polars-row = { workspace = true }
polars-time = { workspace = true, optional = true }
polars-utils = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }

[features]
Expand Down
13 changes: 6 additions & 7 deletions crates/polars-expr/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ pub trait Grouper: Any + Send {
/// the ith group of other now has group index group_idxs[i] in self.
fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec<IdxSize>);

/// Partitions this Grouper into the given partitions.
/// Partitions this Grouper into the given number of partitions.
///
/// Updates partition_idxs and group_idxs such that the ith group of self
/// has group index group_idxs[i] in partition partition_idxs[i].
/// Updates partition_idxs such that the ith group of self moves to partition
/// partition_idxs[i].
///
/// It is guaranteed that two equal keys in two independent partition_into
/// calls map to the same partition index if the seed and the number of
/// partitions is equal.
fn partition_into(
fn partition(
&self,
seed: u64,
partitions: &mut [Box<dyn Grouper>],
num_partitions: usize,
partition_idxs: &mut Vec<IdxSize>,
group_idxs: &mut Vec<IdxSize>,
);
) -> Vec<Box<dyn Grouper>>;

/// Returns the keys in this Grouper in group order, that is the key for
/// group i is returned in row i.
Expand Down
88 changes: 78 additions & 10 deletions crates/polars-expr/src/groups/row_encoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use hashbrown::hash_table::{Entry, HashTable};
use polars_core::chunked_array::ops::row_encode::_get_rows_encoded_unordered;
use polars_row::EncodingField;
use polars_utils::aliases::PlRandomState;
use polars_utils::hashing::{folded_multiply, hash_to_partition};
use polars_utils::itertools::Itertools;
use polars_utils::vec::PushUnchecked;
use rand::Rng;

use super::*;

Expand All @@ -27,24 +29,31 @@ pub struct RowEncodedHashGrouper {
key_schema: Arc<Schema>,
table: HashTable<Group>,
key_data: Vec<u8>,

// Used for computing canonical hashes.
random_state: PlRandomState,

// Internal random seed used to keep hash iteration order decorrelated.
// We simply store a random odd number and multiply the canonical hash by it.
seed: u64,
}

impl RowEncodedHashGrouper {
pub fn new(key_schema: Arc<Schema>, random_state: PlRandomState) -> Self {
Self {
key_schema,
random_state,
seed: rand::random::<u64>() | 1,
..Default::default()
}
}

fn insert_key(&mut self, hash: u64, key: &[u8]) -> IdxSize {
let num_groups = self.table.len();
let entry = self.table.entry(
hash,
hash.wrapping_mul(self.seed),
|g| unsafe { hash == g.key_hash && key == g.key(&self.key_data) },
|g| g.key_hash,
|g| g.key_hash.wrapping_mul(self.seed),
);

match entry {
Expand All @@ -64,6 +73,23 @@ impl RowEncodedHashGrouper {
}
}

/// Insert a key, without checking that it is unique.
fn insert_key_unique(&mut self, hash: u64, key: &[u8]) -> IdxSize {
let group_idx = self.table.len().try_into().unwrap();
let group = Group {
key_hash: hash,
key_offset: self.key_data.len(),
key_length: key.len().try_into().unwrap(),
group_idx,
};
self.key_data.extend(key);
self.table
.insert_unique(hash.wrapping_mul(self.seed), group, |g| {
g.key_hash.wrapping_mul(self.seed)
});
group_idx
}

fn finalize_keys(&self, mut key_rows: Vec<&[u8]>) -> DataFrame {
let key_dtypes = self
.key_schema
Expand Down Expand Up @@ -125,7 +151,9 @@ impl Grouper for RowEncodedHashGrouper {
fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec<IdxSize>) {
let other = other.as_any().downcast_ref::<Self>().unwrap();

self.table.reserve(other.table.len(), |g| g.key_hash); // TODO: cardinality estimation.
// TODO: cardinality estimation.
self.table
.reserve(other.table.len(), |g| g.key_hash.wrapping_mul(self.seed));

unsafe {
group_idxs.clear();
Expand Down Expand Up @@ -167,14 +195,54 @@ impl Grouper for RowEncodedHashGrouper {
)
}

fn partition_into(
fn partition(
&self,
_seed: u64,
_partitions: &mut [Box<dyn Grouper>],
_partition_idxs: &mut Vec<IdxSize>,
_group_idxs: &mut Vec<IdxSize>,
) {
unimplemented!()
seed: u64,
num_partitions: usize,
partition_idxs: &mut Vec<IdxSize>,
) -> Vec<Box<dyn Grouper>> {
assert!(num_partitions > 0);

// Two-pass algorithm to prevent reallocations.
let mut partition_size = vec![(0, 0); num_partitions]; // (keys, bytes)
unsafe {
for group in self.table.iter() {
let ph = folded_multiply(group.key_hash, seed | 1);
let p_idx = hash_to_partition(ph, num_partitions);
let (p_keys, p_bytes) = partition_size.get_unchecked_mut(p_idx as usize);
*p_keys += 1;
*p_bytes += group.key_length as usize;
}
}

let mut rng = rand::thread_rng();
let mut partitions = partition_size
.into_iter()
.map(|(keys, bytes)| Self {
key_schema: self.key_schema.clone(),
table: HashTable::with_capacity(keys),
key_data: Vec::with_capacity(bytes),
random_state: self.random_state.clone(),
seed: rng.gen::<u64>() | 1,
})
.collect_vec();

unsafe {
partition_idxs.clear();
partition_idxs.reserve(self.table.len());
let partition_idxs_out = partition_idxs.spare_capacity_mut();
for group in self.table.iter() {
let ph = folded_multiply(group.key_hash, seed | 1);
let p_idx = hash_to_partition(ph, num_partitions);
let p = partitions.get_unchecked_mut(p_idx);
p.insert_key_unique(group.key_hash, group.key(&self.key_data));
*partition_idxs_out.get_unchecked_mut(group.group_idx as usize) =
MaybeUninit::new(p_idx as IdxSize);
}
partition_idxs.set_len(self.table.len());
}

partitions.into_iter().map(|p| Box::new(p) as _).collect()
}

fn as_any(&self) -> &dyn Any {
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-expr/src/reduce/len.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use polars_core::error::constants::LENGTH_LIMIT_MSG;

use super::*;
use crate::reduce::partition::partition_vec;

#[derive(Default)]
pub struct LenReduce {
Expand Down Expand Up @@ -61,6 +62,17 @@ impl GroupedReduction for LenReduce {
Ok(ca.into_series())
}

unsafe fn partition(
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
partition_vec(self.groups, partition_sizes, partition_idxs)
.into_iter()
.map(|groups| Box::new(Self { groups }) as _)
.collect()
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
39 changes: 39 additions & 0 deletions crates/polars-expr/src/reduce/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use polars_utils::float::IsFloat;
use polars_utils::min_max::MinMax;

use super::*;
use crate::reduce::partition::partition_mask;

pub fn new_min_reduction(dtype: DataType, propagate_nans: bool) -> Box<dyn GroupedReduction> {
use DataType::*;
Expand Down Expand Up @@ -344,6 +345,25 @@ impl GroupedReduction for BoolMinGroupedReduction {
Ok(())
}

unsafe fn partition(
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
let p_values = partition_mask(&self.values.freeze(), partition_sizes, partition_idxs);
let p_mask = partition_mask(&self.mask.freeze(), partition_sizes, partition_idxs);
p_values
.into_iter()
.zip(p_mask)
.map(|(values, mask)| {
Box::new(Self {
values: values.into_mut(),
mask: mask.into_mut(),
}) as _
})
.collect()
}

fn finalize(&mut self) -> PolarsResult<Series> {
let v = core::mem::take(&mut self.values);
let m = core::mem::take(&mut self.mask);
Expand Down Expand Up @@ -450,6 +470,25 @@ impl GroupedReduction for BoolMaxGroupedReduction {
})
}

unsafe fn partition(
self: Box<Self>,
partition_sizes: &[IdxSize],
partition_idxs: &[IdxSize],
) -> Vec<Box<dyn GroupedReduction>> {
let p_values = partition_mask(&self.values.freeze(), partition_sizes, partition_idxs);
let p_mask = partition_mask(&self.mask.freeze(), partition_sizes, partition_idxs);
p_values
.into_iter()
.zip(p_mask)
.map(|(values, mask)| {
Box::new(Self {
values: values.into_mut(),
mask: mask.into_mut(),
}) as _
})
.collect()
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
Loading

0 comments on commit 00b4d61

Please sign in to comment.