Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized hash grouping #6904

Merged
merged 32 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
54a5c95
Vectorized hash grouping
alamb Jul 10, 2023
1991a76
Prepare for merge to main
alamb Jul 10, 2023
8464816
Improve comments and update size calculations
alamb Jul 10, 2023
4bd3066
Implement test for accumulate_boolean
alamb Jul 10, 2023
7c97b24
Use resize instead of resize_with
alamb Jul 10, 2023
3ca27ac
fix avg size calculation
alamb Jul 10, 2023
e4a52f9
Simplify sum accumulator
alamb Jul 10, 2023
f9eaa68
Add comments explaining i64 as counts
alamb Jul 11, 2023
fc96b13
Clarify `aggreate_arguments`
alamb Jul 11, 2023
9db6f4b
Apply suggestions from code review
alamb Jul 11, 2023
edc8c43
Merge remote-tracking branch 'apache/main' into alamb/fast_gby_hash
alamb Jul 11, 2023
19b8981
Merge branch 'alamb/fast_gby_hash' of github.com:alamb/arrow-datafusi…
alamb Jul 11, 2023
90f8730
Clarify rationale for ScratchSpace being a field
alamb Jul 11, 2023
3369ec1
use slice syntax
alamb Jul 12, 2023
4124bfa
Merge remote-tracking branch 'apache/main' into alamb/fast_gby_hash
alamb Jul 12, 2023
58e3b6d
Update datafusion/physical-expr/src/aggregate/average.rs
alamb Jul 12, 2023
47135ba
Update datafusion/physical-expr/src/aggregate/count.rs
alamb Jul 12, 2023
744b4aa
Update datafusion/physical-expr/src/aggregate/groups_accumulator/adap…
alamb Jul 12, 2023
c3d5ff2
fix diagram
alamb Jul 12, 2023
92f6234
Update datafusion/physical-expr/src/aggregate/groups_accumulator/adap…
alamb Jul 12, 2023
f35f2ae
Merge branch 'alamb/fast_gby_hash' of github.com:alamb/arrow-datafusi…
alamb Jul 12, 2023
d19c41e
simplify the supported logic
alamb Jul 12, 2023
da911a3
Add a log message when using slow adapter
alamb Jul 12, 2023
de7b250
fmt
alamb Jul 12, 2023
b313278
Revert "chore(deps): update bigdecimal requirement from 0.3.0 to 0.4.…
alamb Jul 12, 2023
2bff155
Make FileScanConfig::project pub (#6931)
Dandandan Jul 12, 2023
453b71e
feat: add round trip test of physical plan in tpch unit tests (#6918)
r4ntix Jul 12, 2023
32ff16e
Use thiserror to implement the From trait for DFSqlLogicTestError (#6…
jonahgao Jul 12, 2023
54f96e6
parallel csv scan (#6801)
2010YOUY01 Jul 12, 2023
d96dfa2
Add additional test coverage for aggregaes using dates/times/timestam…
alamb Jul 12, 2023
a98b6a0
Support timestamp types for min/max
alamb Jul 13, 2023
5ab75b1
Fix aggregate nullability calculation
alamb Jul 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@ lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
rpath = false
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 18 additions & 3 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::sync::Arc;
mod bounded_aggregate_stream;
mod no_grouping;
mod row_hash;
mod row_hash2;
mod utils;

pub use datafusion_expr::AggregateFunction;
Expand All @@ -58,6 +59,7 @@ use datafusion_physical_expr::utils::{
get_finer_ordering, ordering_satisfy_requirement_concrete,
};

use self::row_hash2::GroupedHashAggregateStream2;
use super::DisplayAs;

/// Hash aggregate modes
Expand Down Expand Up @@ -212,6 +214,7 @@ impl PartialEq for PhysicalGroupBy {
enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStream(GroupedHashAggregateStream),
GroupedHashAggregateStream2(GroupedHashAggregateStream2),
BoundedAggregate(BoundedAggregateStream),
}

Expand All @@ -220,6 +223,7 @@ impl From<StreamType> for SendableRecordBatchStream {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream2(stream) => Box::pin(stream),
StreamType::BoundedAggregate(stream) => Box::pin(stream),
}
}
Expand Down Expand Up @@ -727,12 +731,23 @@ impl AggregateExec {
partition,
aggregation_ordering,
)?))
} else if self.use_poc_group_by() {
Ok(StreamType::GroupedHashAggregateStream2(
GroupedHashAggregateStream2::new(self, context, partition)?,
))
} else {
Ok(StreamType::GroupedHashAggregateStream(
GroupedHashAggregateStream::new(self, context, partition)?,
))
}
}

/// Returns true if we should use the POC group by stream
/// TODO: check for actually supported aggregates, etc
fn use_poc_group_by(&self) -> bool {
//info!("AAL Checking POC group by: {self:#?}");
true
}
}

impl DisplayAs for AggregateExec {
Expand Down Expand Up @@ -998,7 +1013,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
Arc::new(Schema::new(group_fields))
}

/// returns physical expressions to evaluate against a batch
/// returns physical expressions for arguments to evaluate against a batch
/// The expressions are different depending on `mode`:
/// * Partial: AggregateExpr::expressions
/// * Final: columns of `AggregateExpr::state_fields()`
Expand Down Expand Up @@ -1801,10 +1816,10 @@ mod tests {
assert!(matches!(stream, StreamType::AggregateStream(_)));
}
1 => {
assert!(matches!(stream, StreamType::GroupedHashAggregateStream(_)));
assert!(matches!(stream, StreamType::GroupedHashAggregateStream2(_)));
}
2 => {
assert!(matches!(stream, StreamType::GroupedHashAggregateStream(_)));
assert!(matches!(stream, StreamType::GroupedHashAggregateStream2(_)));
}
_ => panic!("Unknown version: {version}"),
}
Expand Down
17 changes: 12 additions & 5 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Hash aggregation through row format

use log::info;
use std::cmp::min;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -110,6 +111,8 @@ pub(crate) struct GroupedHashAggregateStream {
/// first element in the array corresponds to normal accumulators
/// second element in the array corresponds to row accumulators
indices: [Vec<Range<usize>>; 2],
// buffer to be reused to store hashes
hashes_buffer: Vec<u64>,
}

impl GroupedHashAggregateStream {
Expand All @@ -119,6 +122,7 @@ impl GroupedHashAggregateStream {
context: Arc<TaskContext>,
partition: usize,
) -> Result<Self> {
info!("Creating GroupedHashAggregateStream");
let agg_schema = Arc::clone(&agg.schema);
let agg_group_by = agg.group_by.clone();
let agg_filter_expr = agg.filter_expr.clone();
Expand Down Expand Up @@ -229,6 +233,7 @@ impl GroupedHashAggregateStream {
scalar_update_factor,
row_group_skip_position: 0,
indices: [normal_agg_indices, row_agg_indices],
hashes_buffer: vec![],
})
}
}
Expand Down Expand Up @@ -322,15 +327,17 @@ impl GroupedHashAggregateStream {
let mut groups_with_rows = vec![];

// 1.1 Calculate the group keys for the group values
let mut batch_hashes = vec![0; n_rows];
create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(group_values, &self.random_state, batch_hashes)?;

let AggregationState {
map, group_states, ..
} = &mut self.aggr_state;

for (row, hash) in batch_hashes.into_iter().enumerate() {
let entry = map.get_mut(hash, |(_hash, group_idx)| {
for (row, hash) in batch_hashes.iter_mut().enumerate() {
let entry = map.get_mut(*hash, |(_hash, group_idx)| {
// verify that a group that we are inserting with hash is
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
Expand Down Expand Up @@ -385,7 +392,7 @@ impl GroupedHashAggregateStream {

// for hasher function, use precomputed hash value
map.insert_accounted(
(hash, group_idx),
(*hash, group_idx),
|(hash, _group_index)| *hash,
allocated,
);
Expand Down
Loading
Loading