From 88ed668940aa14b3e85f996996b1bbc28452a022 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 12 Jul 2023 11:37:11 -0400 Subject: [PATCH] ipdates --- .../core/src/physical_plan/aggregates/order.rs | 17 ++++++++++++++++- .../src/physical_plan/aggregates/row_hash.rs | 11 ++++------- datafusion/physical-expr/src/aggregate/count.rs | 2 +- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/order.rs b/datafusion/core/src/physical_plan/aggregates/order.rs index 665e107c928d..5dbcbd9887f3 100644 --- a/datafusion/core/src/physical_plan/aggregates/order.rs +++ b/datafusion/core/src/physical_plan/aggregates/order.rs @@ -83,7 +83,22 @@ impl GroupOrdering { /// Returns the index of the last completed group, if any pub fn last_completed(&self) -> Option { - todo!() + match self.state { + OrderState::Start => None, + OrderState::InProgress(group_index) if group_index == 0 => None, + OrderState::InProgress(group_index) => Some(group_index - 1), + OrderState::Complete => todo!(), + } + } + + /// returns true if any groups are completed and thus can be emitted + pub fn can_emit(&self) -> bool { + match self.state { + OrderState::Start => false, + OrderState::InProgress(group_index) if group_index == 0 => false, + OrderState::InProgress(_) => true, + OrderState::Complete => true, + } } /// Note that the input is complete so any outstanding groups are done as well diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 79ca039978e5..9451a584f079 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -678,15 +678,12 @@ impl GroupedHashAggregateStream { Ok(output) } - /// Can any of the hash table be emitted? + /// Can at least part of the hash table be emitted? fn can_emit(&self) -> bool { - let some_groups_done = self - .group_ordering + self.group_ordering .as_ref() - .map(|group_ordering| group_ordering.last_completed().is_some()) - .unwrap_or(false); - - self.input_done || some_groups_done + .map(|group_ordering| group_ordering.can_emit()) + .unwrap_or(self.input_done) } /// Update state when the last input is seen diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 73a212f1f1da..9ca3a847fc7b 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -188,7 +188,7 @@ impl GroupsAccumulator for CountGroupsAccumulator { // return arrays for counts fn state(&mut self, emit_to: EmitTo) -> Result> { if emit_to.is_some() { - panic!("emit_to handling not yet implemented"); + panic!("emit_to handling not yet implemented: {emit_to:?}"); } let counts = std::mem::take(&mut self.counts); let counts: PrimitiveArray = Int64Array::from(counts); // zero copy, no nulls