Skip to content

Commit

Permalink
ipdates
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 12, 2023
1 parent ba6c525 commit 88ed668
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
17 changes: 16 additions & 1 deletion datafusion/core/src/physical_plan/aggregates/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,22 @@ impl GroupOrdering {

/// Returns the index of the last completed group, if any
pub fn last_completed(&self) -> Option<usize> {
todo!()
match self.state {
OrderState::Start => None,
OrderState::InProgress(group_index) if group_index == 0 => None,
OrderState::InProgress(group_index) => Some(group_index - 1),
OrderState::Complete => todo!(),
}
}

/// returns true if any groups are completed and thus can be emitted
pub fn can_emit(&self) -> bool {
match self.state {
OrderState::Start => false,
OrderState::InProgress(group_index) if group_index == 0 => false,
OrderState::InProgress(_) => true,
OrderState::Complete => true,
}
}

/// Note that the input is complete so any outstanding groups are done as well
Expand Down
11 changes: 4 additions & 7 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,15 +678,12 @@ impl GroupedHashAggregateStream {
Ok(output)
}

/// Can any of the hash table be emitted?
/// Can at least part of the hash table be emitted?
fn can_emit(&self) -> bool {
let some_groups_done = self
.group_ordering
self.group_ordering
.as_ref()
.map(|group_ordering| group_ordering.last_completed().is_some())
.unwrap_or(false);

self.input_done || some_groups_done
.map(|group_ordering| group_ordering.can_emit())
.unwrap_or(self.input_done)
}

/// Update state when the last input is seen
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl GroupsAccumulator for CountGroupsAccumulator {
// return arrays for counts
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
if emit_to.is_some() {
panic!("emit_to handling not yet implemented");
panic!("emit_to handling not yet implemented: {emit_to:?}");
}
let counts = std::mem::take(&mut self.counts);
let counts: PrimitiveArray<Int64Type> = Int64Array::from(counts); // zero copy, no nulls
Expand Down

0 comments on commit 88ed668

Please sign in to comment.