Skip to content

Commit

Permalink
Consolidate BoundedAggregateStream
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 14, 2023
1 parent 8012c4d commit 981862f
Show file tree
Hide file tree
Showing 14 changed files with 1,113 additions and 111 deletions.
20 changes: 7 additions & 13 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
//! Aggregates functionalities

use crate::physical_plan::aggregates::{
bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
row_hash::GroupedHashAggregateStream,
no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
Expand All @@ -46,8 +45,10 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

#[allow(dead_code)] // TODO remove
mod bounded_aggregate_stream;
mod no_grouping;
mod order;
mod row_hash;
mod utils;

Expand Down Expand Up @@ -89,7 +90,7 @@ pub enum AggregateMode {
/// Specifically, each distinct combination of the relevant columns
/// are contiguous in the input, and once a new combination is seen
/// previous combinations are guaranteed never to appear again
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupByOrderMode {
/// The input is not (known to be) ordered by any of the
/// expressions in the GROUP BY clause.
Expand Down Expand Up @@ -212,15 +213,15 @@ impl PartialEq for PhysicalGroupBy {
enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStream(GroupedHashAggregateStream),
BoundedAggregate(BoundedAggregateStream),
//BoundedAggregate(BoundedAggregateStream),
}

impl From<StreamType> for SendableRecordBatchStream {
fn from(stream: StreamType) -> Self {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
StreamType::BoundedAggregate(stream) => Box::pin(stream),
//StreamType::BoundedAggregate(stream) => Box::pin(stream),
}
}
}
Expand Down Expand Up @@ -719,14 +720,6 @@ impl AggregateExec {
Ok(StreamType::AggregateStream(AggregateStream::new(
self, context, partition,
)?))
} else if let Some(aggregation_ordering) = &self.aggregation_ordering {
let aggregation_ordering = aggregation_ordering.clone();
Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
self,
context,
partition,
aggregation_ordering,
)?))
} else {
Ok(StreamType::GroupedHashAggregateStream(
GroupedHashAggregateStream::new(self, context, partition)?,
Expand Down Expand Up @@ -1105,6 +1098,7 @@ fn create_accumulators(
.collect::<Result<Vec<_>>>()
}

#[allow(dead_code)]
fn create_row_accumulators(
aggr_expr: &[Arc<dyn AggregateExpr>],
) -> Result<Vec<RowAccumulatorItem>> {
Expand Down
157 changes: 157 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/order/full.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::physical_expr::EmitTo;

/// Tracks grouping state when the data is ordered entirely by its
/// group keys
///
/// When the group values are sorted, as soon as we see group `n+1` we
/// know we will never see any rows for group `n again and thus they
/// can be emitted.
///
/// ```text
/// SUM(amt) GROUP BY id
///
/// The input is sorted by id
///
///
/// ┌─────┐ ┌──────────────────┐
/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓
/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃
/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛
/// │ ... │ │ ... │ │
/// │┌───┐│ │ ┌──────────────┐ │ │ current
/// ││12 ││ │ │ 234 │ │ │
/// │├───┤│ │ ├──────────────┤ │ │
/// ││12 ││ │ │ 234 │ │ │
/// │├───┤│ │ ├──────────────┤ │ │
/// ││13 ││ │ │ 456 │◀┼───┘
/// │└───┘│ │ └──────────────┘ │
/// └─────┘ └──────────────────┘
///
/// group indices group_values current tracks the most
/// (in group value recent group index
/// order)
/// ```
///
/// In the above diagram the current group is `13` groups `0..12` can
/// be emitted. Group `13` can not be emitted because it may have more
/// values in the next batch.
#[derive(Debug)]
pub(crate) struct GroupOrderingFull {
state: State,
/// Hash values for groups in 0..completed
hashes: Vec<u64>,
}

#[derive(Debug)]
enum State {
/// Have seen no input yet
Start,

/// Have seen all groups with indexes less than `completed_index`
InProgress {
/// index of the current group for which values are being
/// generated (can emit current - 1)
current: usize,
},

/// Seen end of input, all groups can be emitted
Complete,
}

impl GroupOrderingFull {
pub fn new() -> Self {
Self {
state: State::Start,
hashes: vec![],
}
}

// How far can data be emitted? Returns None if no data can be
// emitted
pub fn emit_to(&self) -> Option<EmitTo> {
match &self.state {
State::Start => None,
State::InProgress { current, .. } => {
// Can not emit if we are still on the first row,
// otherwise emit all rows prior to the current group
if *current == 0 {
None
} else {
Some(EmitTo::First(*current - 1))
}
}
State::Complete { .. } => Some(EmitTo::All),
}
}

/// removes the first n groups from this ordering, shifting all
/// existing indexes down by N and returns a reference to the
/// updated hashes
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
println!("remove_groups n:{n}, self: {self:?}");
match &mut self.state {
State::Start => panic!("invalid state: start"),
State::InProgress { current } => {
// shift down by n
assert!(*current >= n);
*current = *current - n;
self.hashes.drain(0..n);
}
State::Complete { .. } => panic!("invalid state: complete"),
};
&self.hashes
}

/// Note that the input is complete so any outstanding groups are done as well
pub fn input_done(&mut self) {
println!("input done");
self.state = match self.state {
State::Start => State::Complete,
State::InProgress { .. } => State::Complete,
State::Complete => State::Complete,
};
}

/// Note that we saw a new distinct group
pub fn new_group(&mut self, group_index: usize, hash: u64) {
println!("new group: group_index: {group_index}");
self.state = match self.state {
State::Start => {
assert_eq!(group_index, 0);
self.hashes.push(hash);
State::InProgress {
current: group_index,
}
}
State::InProgress { current } => {
// expect to see group_index the next after this
assert_eq!(group_index, self.hashes.len());
assert_eq!(group_index, current + 1);
self.hashes.push(hash);
State::InProgress {
current: group_index,
}
}
State::Complete { .. } => {
panic!("Saw new group after input was complete");
}
};
}
}
117 changes: 117 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/order/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Order tracking for memory bounded grouping

use arrow_schema::Schema;
use datafusion_common::Result;
use datafusion_physical_expr::EmitTo;

use super::{AggregationOrdering, GroupByOrderMode};

mod full;
mod partial;

pub(crate) use full::GroupOrderingFull;
pub(crate) use partial::GroupOrderingPartial;

/// Group ordering state, if present, for each group in the hash
/// table.
#[derive(Debug)]
pub(crate) enum GroupOrdering {
/// Groups are not ordered
None,
/// Groups are orderd by some pre-set of the group keys
Partial(GroupOrderingPartial),
/// Groups are entirely contiguous,
Full(GroupOrderingFull),
/// The ordering was temporarily taken / borrowed
/// Note: `Self::Taken` is left when the GroupOrdering is temporarily
/// taken to satisfy the borrow checker. If an error happens
/// before it can be restored the ordering information is lost and
/// execution can not proceed. By panic'ing the behavior remains
/// well defined if something tries to use a ordering that was
/// taken.
Taken,
}

// Default is used for `std::mem::take` to satisfy the borrow checker
impl Default for GroupOrdering {
fn default() -> Self {
Self::Taken
}
}

impl GroupOrdering {
/// Create a `GroupOrdering` for the ordering
pub fn try_new(
input_schema: &Schema,
ordering: &AggregationOrdering,
) -> Result<Self> {
let AggregationOrdering {
mode,
order_indices,
ordering,
} = ordering;

Ok(match mode {
GroupByOrderMode::None => GroupOrdering::None,
GroupByOrderMode::PartiallyOrdered => {
let partial =
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;
GroupOrdering::Partial(partial)
}
GroupByOrderMode::FullyOrdered => {
GroupOrdering::Full(GroupOrderingFull::new())
}
})
}

// How far can data be emitted based on groups seen so far?
// Returns `None` if nothing can be emitted at this point based on
// ordering information
pub fn emit_to(&self) -> Option<EmitTo> {
match self {
GroupOrdering::Taken => panic!("group state taken"),
GroupOrdering::None => None,
GroupOrdering::Partial(partial) => partial.emit_to(),
GroupOrdering::Full(full) => full.emit_to(),
}
}

/// Updates the state the input is done
pub fn input_done(&mut self) {
match self {
GroupOrdering::Taken => panic!("group state taken"),
GroupOrdering::None => {}
GroupOrdering::Partial(partial) => partial.input_done(),
GroupOrdering::Full(full) => full.input_done(),
}
}

/// removes the first n groups from this ordering, shifting all
/// existing indexes down by N and returns a reference to the
/// updated hashes
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
match self {
GroupOrdering::Taken => panic!("group state taken"),
GroupOrdering::None => &[],
GroupOrdering::Partial(partial) => partial.remove_groups(n),
GroupOrdering::Full(full) => full.remove_groups(n),
}
}
}
Loading

0 comments on commit 981862f

Please sign in to comment.