Skip to content

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 14, 2023
1 parent 23f125b commit 98a6933
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 66 deletions.
94 changes: 47 additions & 47 deletions datafusion/core/src/physical_plan/aggregates/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,15 @@ impl GroupOrdering {
}
}

/// returns true if any groups can be emitted prior to seeing the
/// end of input
pub fn can_emit(&self) -> bool {
// How far can data be emitted based on groups seen so far?
// Returns `None` if nothing can be emitted at this point based on
// ordering information
pub fn emit_to(&self) -> Option<EmitTo> {
match self {
GroupOrdering::Taken => panic!("group state taken"),
GroupOrdering::None => false,
GroupOrdering::None => None,
GroupOrdering::Partial(partial) => todo!(),
GroupOrdering::Full(full) => full.can_emit(),
}
}

// How far can data be emitted?
pub fn emit_to(&self) -> EmitTo {
match self {
GroupOrdering::Taken => panic!("group state taken"),
GroupOrdering::None => EmitTo::All,
GroupOrdering::Partial(partial) => todo!(),
GroupOrdering::Full(full) => full.emit_to(),
GroupOrdering::Full(full) => full.emit_to()
}
}

Expand Down Expand Up @@ -123,26 +114,41 @@ pub(crate) enum GroupOrderingPartial {
/// Tracks grouping state when the data is ordered entirely by its
/// group keys
///
/// For example, when the group values are sorted, as soon as we see
/// group `n+1` we know we will never see any rows for group `n again
/// and thus they can be emitted.
///
/// For a query with GROUP BY c1, c2 with input ORDER BY c1, c2
///
/// group_index (group by value) Hash value
/// Not stored here (stored here)
/// 0 A,X 123
/// 1 A,Y 456 InProgress(1) -- group is still in progress
/// 2 B,Z 789
/// ```text
/// ┌─────┐ ┌──────────────────┐
/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓
/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 12 ┃
/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛
/// │ ... │ │ ... │ │
/// │┌───┐│ │ ┌──────────────┐ │ │ current
/// ││12 ││ │ │ 234 │ │ │
/// │├───┤│ │ ├──────────────┤ │ │
/// ││12 ││ │ │ 234 │ │ │
/// │├───┤│ │ ├──────────────┤ │ │
/// ││13 ││ │ │ 456 │◀┼───┘
/// │└───┘│ │ └──────────────┘ │
/// └─────┘ └──────────────────┘
///
/// when we see group_index2 the state will progress to `InProgress(1)`
/// group indices group_val current tracks the most
/// (in group value ues recent group. This group may
/// order) yet see more input (in the
/// ```
///
/// In the above diagram
#[derive(Debug)]
pub(crate) enum GroupOrderingFull {
/// Have seen no input yet
Start,

/// Have seen all groups with indexes less than `completed_index`
InProgress {
/// Largest completed index
completed: usize,
/// index of the current group for which values are being
/// generated (can emit current - 1)
current: usize,
/// Hash values for groups in 0..completed
hashes: Vec<u64>,
},
Expand All @@ -166,12 +172,15 @@ impl GroupOrderingFull {
Self::Start
}

// How far can data be emitted?
pub fn emit_to(&self) -> EmitTo {
// How far can data be emitted? Returns None if no data can be
// emitted
pub fn emit_to(&self) -> Option<EmitTo> {
match self {
Self::Start => panic!("Can not emit data witout any seen"),
Self::InProgress { completed, .. } => EmitTo::First(*completed),
Self::Complete { .. } => EmitTo::All,
Self::Start => None,
Self::InProgress { current, .. } if *current == 0 => None,
// emit all rows prior to the current group
Self::InProgress { current, .. } => Some(EmitTo::First(*current - 1)),
Self::Complete { .. } => Some(EmitTo::All),
}
}

Expand All @@ -182,26 +191,17 @@ impl GroupOrderingFull {
println!("remove_groups n:{n}, self: {self:?}");
match self {
Self::Start => panic!("invalid state: start"),
Self::InProgress { completed, hashes } => {
Self::InProgress { current, hashes } => {
// shift down by n
assert!(*completed >= n);
*completed -= n;
assert!(*current >= n);
*current -= n;
hashes.drain(0..n);
hashes
}
Self::Complete { .. } => panic!("invalid state: start"),
}
}

/// returns true if any groups are completed and thus can be
/// emitted prior to the end of input
pub fn can_emit(&self) -> bool {
match self {
Self::Start => false,
Self::InProgress { .. } => true,
Self::Complete { .. } => true,
}
}

/// Note that the input is complete so any outstanding groups are done as well
pub fn input_done(&mut self) {
Expand All @@ -220,22 +220,22 @@ impl GroupOrderingFull {
Self::Start => {
assert_eq!(group_index, 0);
Self::InProgress {
completed: group_index,
current: group_index,
hashes: vec![hash],
}
}
Self::InProgress {
completed,
current,
mut hashes,
} => {
// expect to see group_index the next after this
assert_eq!(group_index, hashes.len());
assert_eq!(group_index, completed + 1);
assert_eq!(group_index, current + 1);
hashes.push(hash);
Self::InProgress { completed, hashes }
Self::InProgress { current, hashes }
}
Self::Complete { .. } => {
panic!("Saw new group after completion");
panic!("Saw new group after input was complete");
}
}
}
Expand Down
28 changes: 13 additions & 15 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,14 @@ impl Stream for GroupedHashAggregateStream {

// If we can begin emitting rows, do so,
// otherwise keep consuming input
if self.can_emit() {
let batch = extract_ok!(self.create_batch_from_map());
let to_emit = if self.input_done {
Some(EmitTo::All)
} else {
self.group_ordering.emit_to()
};

if let Some(to_emit) = to_emit {
let batch = extract_ok!(self.create_batch_from_map(to_emit));
self.exec_state = ExecutionState::ProducingOutput(batch);
}
timer.done();
Expand All @@ -370,11 +376,11 @@ impl Stream for GroupedHashAggregateStream {
return Poll::Ready(Some(Err(e)));
}
None => {
// inner is done, switch to producing output
// inner is done, emit all rows and switch to producing output
self.input_done = true;
self.group_ordering.input_done();
let timer = elapsed_compute.timer();
let batch = extract_ok!(self.create_batch_from_map());
let batch = extract_ok!(self.create_batch_from_map(EmitTo::All));
self.exec_state = ExecutionState::ProducingOutput(batch);
timer.done();
}
Expand Down Expand Up @@ -629,17 +635,13 @@ impl GroupedHashAggregateStream {
Ok(allocated)
}

/// Create an output RecordBatch with all group keys and
/// accumulator states/values
///
/// If there is group_ordering, only emit groups marked as "CanEmit"
fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
/// Create an output RecordBatch with the group keys and
/// accumulator states/values specified in emit_to
fn create_batch_from_map(&mut self, emit_to: EmitTo) -> Result<RecordBatch> {
if self.group_values.num_rows() == 0 {
return Ok(RecordBatch::new_empty(self.schema()));
}

// How many groups should be emitted?
let emit_to = self.group_ordering.emit_to();
let output = self.build_output(emit_to)?;
self.remove_emitted(emit_to)?;
let batch = RecordBatch::try_new(self.schema(), output)?;
Expand Down Expand Up @@ -674,10 +676,6 @@ impl GroupedHashAggregateStream {
Ok(output)
}

/// Can at least part of the hash table be emitted?
fn can_emit(&self) -> bool {
self.input_done || self.group_ordering.can_emit()
}

/// Removes the first `n` groups, and adjust all
/// group_indices appropriately
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/sqllogictests/test_files/aal.slt
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,26 @@ GROUP BY a;


# Run it
query II
query II rowsort
SELECT a, SUM(b)
FROM t
GROUP BY a;
----
1 125
0 25
1 125

# use partial grouping (data sorted on a, b, c but group by d, a)

# # Run it
# query
# query rowsort
# SELECT d, a, COUNT(b)
# FROM t
# GROUP BY d, a;



# # Run it
# query
# query rowsort
# SELECT d, a, SUM(b)
# FROM t
# GROUP BY d, a;

0 comments on commit 98a6933

Please sign in to comment.