Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support batching for RANGE running window aggregations. Including on [databricks] #9544

Merged
merged 23 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4ccf617
Batching support for ROW-based FIRST() window function
mythrocks Oct 18, 2023
7d5adc9
Support for ignoreNulls.
mythrocks Oct 18, 2023
28743c5
Fixed backing up previous from current batch.
mythrocks Oct 18, 2023
b39a4f3
Reworded GpuNthValue Literal check with match.
mythrocks Oct 18, 2023
6987164
Merge remote-tracking branch 'origin/branch-23.12' into batched-windo…
mythrocks Oct 19, 2023
5cc1985
Fix package path.
mythrocks Oct 19, 2023
5c6e85d
Code-style fix: Fixed line length.
mythrocks Oct 19, 2023
9373b7a
Moved First-specific logic out of GpuWindowExec.
mythrocks Oct 23, 2023
af49389
Added test for nth_value.
mythrocks Oct 24, 2023
42ec892
Explicit canFix() check.
mythrocks Oct 24, 2023
11ec8db
Merge remote-tracking branch 'origin/branch-23.12' into batched-windo…
mythrocks Oct 25, 2023
0c27e15
First swipe.
mythrocks Oct 24, 2023
a14bb8e
Removed unnecessary TODO.
mythrocks Oct 25, 2023
b98560d
Merge remote-tracking branch 'origin/branch-23.12' into batched-range…
mythrocks Oct 27, 2023
d2915d6
Fixed rebase error.
mythrocks Oct 27, 2023
917fa07
Range [UNB, CURRENT] tests. Round#1.
mythrocks Oct 27, 2023
b8bec1b
Fixed verify errors.
mythrocks Oct 27, 2023
3ba6480
Merge remote-tracking branch 'origin/branch-23.12' into batched-range…
mythrocks Oct 31, 2023
22aaf7c
More comprehensive RANGE window tests.
mythrocks Oct 31, 2023
f6f6c62
Revert "More comprehensive RANGE window tests."
mythrocks Oct 31, 2023
2d8eb65
Separate tests for ROWS and RANGE, with repeated rows in order-by.
mythrocks Nov 1, 2023
d9540f9
Partitioned test for batched RANGE query.
mythrocks Nov 1, 2023
9b1ac51
Partitioned tests for batched range window functions.
mythrocks Nov 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ def test_window_running_no_part(b_gen, batch_size):
'FIRST(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls',
'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls',
'NTH_VALUE(b, 1) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls']

if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen):
query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col')

Expand All @@ -514,6 +515,37 @@ def test_window_running_no_part(b_gen, batch_size):
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)


# TODO: ROW vs RANGE parametrization?
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:'))
def test_range_running_window_no_part(b_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['COUNT(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col',
'MIN(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col',
'MAX(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col',
'FIRST(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls',
'FIRST(b, TRUE) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls',
'NTH_VALUE(b, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls']

if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen):
query_parts.append('SUM(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col')

if spark_version() > "3.1.1":
query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER '
'(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls')

assert_gpu_and_cpu_are_equal_sql(
lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14),
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)


# Test that we can do a running window sum on floats and doubles. This becomes problematic because we do the agg in parallel
# which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations.
# We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,23 @@ object GpuWindowExec {
}

def isRunningWindow(spec: GpuWindowSpecDefinition): Boolean = spec match {
case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RowFrame,
GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow))) => true
case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RowFrame,
GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _))) if value == 0 => true
case _ => false
case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(
RowFrame,
GpuSpecialFrameBoundary(UnboundedPreceding),
GpuSpecialFrameBoundary(CurrentRow))) => true
case GpuWindowSpecDefinition(_, _,
GpuSpecifiedWindowFrame(RowFrame,
GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _)))
if value == 0 => true
case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(
RangeFrame,
GpuSpecialFrameBoundary(UnboundedPreceding),
GpuSpecialFrameBoundary(CurrentRow))) => true
case GpuWindowSpecDefinition(_, _,
GpuSpecifiedWindowFrame(RangeFrame,
GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _)))
if value == 0 => true
case _ => false
}

def isUnboundedToUnboundedWindow(spec: GpuWindowSpecDefinition): Boolean = spec match {
Expand Down Expand Up @@ -1674,6 +1686,28 @@ case class GpuRunningWindowExec(

override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: Nil

override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching)

override def outputBatching: CoalesceGoal = {
val isRangeFrame = windowOps.exists {
case GpuAlias(
revans2 marked this conversation as resolved.
Show resolved Hide resolved
GpuWindowExpression(
_, GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RangeFrame, _, _))),
_) => true
case _ => false
}
if (!isRangeFrame) {
return null // NO batching restrictions on ROW frames.
}
if (gpuPartitionSpec.isEmpty) {
// If unpartitioned, batch on the order-by column.
BatchedByKey(gpuOrderSpec)(cpuOrderSpec)
} else {
// If partitioned, batch on partition-columns + order-by columns.
BatchedByKey(gpuPartitionOrdering ++ gpuOrderSpec)(cpuPartitionOrdering ++ cpuOrderSpec)
}
}

override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputBatches = gpuLongMetric(GpuMetric.NUM_OUTPUT_BATCHES)
val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS)
Expand Down