Skip to content

Commit

Permalink
Add spill support to TopNRowNumber operator (#7139)
Browse files Browse the repository at this point in the history
Summary:
TopNRowNumber operator maintains a hash table of unique partition keys and a
RowContainer of input rows with up to N rows per partition.

When asked to spill, TopNRowNumber operator sorts accumulated input rows by
partition and sorting keys, spills all of them, clears both hash table and
RowContainer, and continues to accumulate future input into freed up hash table
and container.

After receiving (and spilling) all input, TopNRowNumber operator sort-merged
spilled data and generates output. Since at this point all data is sorted by
partition and sorting keys, the operator produces data in streaming fashion.

Spilling is enabled by default. It can be disabled by setting
topn_row_number_spill_enabled configuration property to false.

Spilling after noMoreInput() is not supported yet.

Pull Request resolved: #7139

Reviewed By: xiaoxmeng

Differential Revision: D50455632

Pulled By: mbasmanova

fbshipit-source-id: 76c42d19adec46ebb53ce514e344b6835dff3259
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Oct 21, 2023
1 parent d87866b commit 55d5636
Show file tree
Hide file tree
Showing 7 changed files with 483 additions and 22 deletions.
4 changes: 4 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,10 @@ class TopNRowNumberNode : public PlanNode {
return outputType_;
}

bool canSpill(const QueryConfig& queryConfig) const override {
return !partitionKeys_.empty() && queryConfig.topNRowNumberSpillEnabled();
}

const RowTypePtr& inputType() const {
return sources_[0]->outputType();
}
Expand Down
14 changes: 12 additions & 2 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ class QueryConfig {
static constexpr const char* kRowNumberSpillEnabled =
"row_number_spill_enabled";

/// TopNRowNumber spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kTopNRowNumberSpillEnabled =
"topn_row_number_spill_enabled";

/// The max memory that a final aggregation can use before spilling. If it 0,
/// then there is no limit.
static constexpr const char* kAggregationSpillMemoryThreshold =
Expand Down Expand Up @@ -479,12 +483,18 @@ class QueryConfig {
return get<bool>(kWriterSpillEnabled, true);
}

/// Returns 'is row_number spilling enabled' flag. Must also check the
/// spillEnabled()!
/// Returns true if spilling is enabled for RowNumber operator. Must also
/// check the spillEnabled()!
bool rowNumberSpillEnabled() const {
return get<bool>(kRowNumberSpillEnabled, true);
}

/// Returns true if spilling is enabled for TopNRowNumber operator. Must also
/// check the spillEnabled()!
bool topNRowNumberSpillEnabled() const {
return get<bool>(kTopNRowNumberSpillEnabled, true);
}

/// Returns a percentage of aggregation or join input batches that will be
/// forced to spill for testing. 0 means no extra spilling.
int32_t testingSpillPct() const {
Expand Down
4 changes: 4 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ Spilling
- boolean
- true
- When `spill_enabled` is true, determines whether RowNumber operator can spill to disk under memory pressure.
* - topn_row_number_spill_enabled
- boolean
- true
- When `spill_enabled` is true, determines whether TopNRowNumber operator can spill to disk under memory pressure.
* - writer_spill_enabled
- boolean
- true
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void RowNumber::restoreNextSpillPartition() {
}

void RowNumber::ensureInputFits(const RowVectorPtr& input) {
if (!spillConfig_.has_value()) {
if (!spillEnabled()) {
// Spilling is disabled.
return;
}
Expand Down
Loading

0 comments on commit 55d5636

Please sign in to comment.