From 4ccf61713de4a8ed9fc880eae958fbc55e33d07a Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 18 Oct 2023 11:33:40 -0700 Subject: [PATCH 01/19] Batching support for ROW-based FIRST() window function This commit adds support for `FIRST()` window functions, running in a ROWS context. This does not currently support ignore/include nulls. Signed-off-by: MithunR --- .../nvidia/spark/rapids/GpuWindowExec.scala | 15 +++- .../spark/rapids/GpuWindowExpression.scala | 86 ++++++++++++++++++- .../spark/sql/rapids/AggregateFunctions.scala | 12 +++ 3 files changed, 110 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index b52670d6c1e..37bd05a4f31 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -30,11 +30,11 @@ import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimUnaryExecNode} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, CurrentRow, Expression, FrameType, NamedExpression, RangeFrame, RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, CurrentRow, Expression, FrameType, Literal, NamedExpression, RangeFrame, RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.rapids.GpuAggregateExpression +import org.apache.spark.sql.rapids.{GpuAggregateExpression, GpuFirst, GpuNthValue} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.CalendarInterval @@ -483,6 +483,10 @@ object GpuWindowExec { val isFuncOkay = func match { case _: GpuBatchedRunningWindowWithFixer => true case GpuAggregateExpression(_: GpuBatchedRunningWindowWithFixer, _, _, _ , _) => true + case GpuNthValue(_, offset, _) + if (offset.isInstanceOf[Literal] && offset.asInstanceOf[Literal].value == 1) => + print(offset) + true case _ => false } isSpecOkay && isFuncOkay @@ -1529,6 +1533,13 @@ class GpuRunningWindowIterator( Some((index, f.newFixer())) case GpuAggregateExpression(f: GpuBatchedRunningWindowWithFixer, _, _, _, _) => Some((index, f.newFixer())) + case GpuNthValue(child@_, offset, ignoreNulls@_) => + GpuOverrides.extractLit(offset) match { + case Some(Literal(value@_, IntegerType)) if value == 1 => + // Piggyback on GpuFirst. + Some((index, GpuFirst(child, ignoreNulls).newFixer())) + case _ => None + } case _ => None } case _ => None diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 9d9f4101e26..53244182ba1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -19,11 +19,12 @@ package com.nvidia.spark.rapids import java.util.concurrent.TimeUnit import ai.rapids.cudf -import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, GroupByScanAggregation, RollingAggregation, RollingAggregationOnColumn, Scalar, ScanAggregation} +import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, GroupByScanAggregation, RollingAggregation, RollingAggregationOnColumn, Scalar, ScanAggregation} import com.nvidia.spark.Retryable import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuOverrides.wrapExpr import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimExpression} +import scala.util.{Left, Right} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.TypeCheckResult @@ -1133,6 +1134,89 @@ class BatchedRunningWindowBinaryFixer(val binOp: BinaryOp, val name: String) } } +class FirstRunningWindowFixer(ignoreNulls: Boolean = false) + extends BatchedRunningWindowFixer with Logging { + private val name = "first" + private var previousResult: Option[Scalar] = None + private var chkptPreviousResult: Option[Scalar] = None + /** + * Fix up `windowedColumnOutput` with any stored state from previous batches. + * Like all window operations the input data will have been sorted by the partition + * by columns and the order by columns. + * + * @param samePartitionMask a mask that uses `true` to indicate the row + * is for the same partition by keys that was the last row in the + * previous batch or `false` to indicate it is not. If this is known + * to be all true or all false values a single boolean is used. If + * it can change for different rows than a column vector is provided. + * Only values that are for the same partition by keys should be + * modified. Because the input data is sorted by the partition by + * columns the boolean values will be grouped together. + * @param sameOrderMask a mask just like `samePartitionMask` but for ordering. This happens + * for some operations like `rank` and `dense_rank` that use the ordering + * columns in a row based query. This is not needed for all fixers and is not + * free to calculate, so you must set `needsOrderMask` to true if you are + * going to use it. + * @param unfixedWindowResults the output of the windowAggregation without anything + * fixed/modified. This should not be closed by `fixUp` as it will be + * handled by the framework. + * @return a fixed ColumnVector that was with outputs updated for items that were in the same + * group by key as the last row in the previous batch. + */ + override def fixUp(samePartitionMask: Either[ColumnVector, Boolean], + sameOrderMask: Option[Either[ColumnVector, Boolean]], + unfixedWindowResults: ColumnView): ColumnVector = { + // Ignore `ignoreNulls` for the moment. + // Also, `sameOrderMask` is irrelevant for this operation. + logDebug(s"$name: fix up $previousResult $samePartitionMask") + val ret = (previousResult, samePartitionMask) match { + case (None, _) => + // No previous result. Current result needs no fixing. + incRef(unfixedWindowResults) + case (Some(prev), Right(allRowsInSamePartition)) => // Boolean flag. + // All the current batch results may be replaced. + if (allRowsInSamePartition) { + ColumnVector.fromScalar(prev, unfixedWindowResults.getRowCount.toInt) + } else { + // No rows in the same partition. Current result needs no fixing. + incRef(unfixedWindowResults) + } + case (Some(prev), Left(someRowsInSamePartition)) => // Boolean vector. + someRowsInSamePartition.ifElse(prev, unfixedWindowResults) + } + // Reset previous result. + ret + } + + /** + * Save the state, so it can be restored in the case of a retry. + * (This is called inside a Spark task context on executors.) + */ + override def checkpoint(): Unit = chkptPreviousResult = previousResult + + /** + * Restore the state that was saved by calling to "checkpoint". + * (This is called inside a Spark task context on executors.) + */ + override def restore(): Unit = { + // If there is a previous checkpoint result, restore it to previousResult. + if (chkptPreviousResult.isDefined) { + // Close erstwhile previousResult. + previousResult match { + case Some(r) if r != chkptPreviousResult.get => r.close() + case _ => // Nothing to close if result is None, or matches the checkpoint. + } + } + previousResult = chkptPreviousResult + chkptPreviousResult = None + } + + override def close(): Unit = { + previousResult.foreach(_.close) + previousResult = None + } +} + /** * This class fixes up batched running windows for sum. Sum is a lot like other binary op * fixers, but it has to special case nulls and that is not super generic. In the future we diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 618ba14b792..885fa4051f2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -1788,7 +1788,9 @@ case class GpuDecimal128Average(child: Expression, dt: DecimalType) */ case class GpuFirst(child: Expression, ignoreNulls: Boolean) extends GpuAggregateFunction + with GpuBatchedRunningWindowWithFixer with GpuAggregateWindowFunction +// with GpuRunningWindowFunction with GpuDeterministicFirstLastCollectShim with ImplicitCastInputTypes with Serializable { @@ -1839,6 +1841,16 @@ case class GpuFirst(child: Expression, ignoreNulls: Boolean) RollingAggregation.nth(0, if (ignoreNulls) NullPolicy.EXCLUDE else NullPolicy.INCLUDE) .onColumn(inputs.head._2) + override def newFixer(): BatchedRunningWindowFixer = new FirstRunningWindowFixer(ignoreNulls) + + /* + // RUNNING WINDOW + + override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] = inputProjection + + override def groupByScanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[GroupByScanAggregation]] = + Seq(AggAndReplace(GroupByScanAggregation.fi)) + */ } case class GpuLast(child: Expression, ignoreNulls: Boolean) From 7d5adc90589aecdac71a2f2271948e15e01f4e92 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 18 Oct 2023 13:02:28 -0700 Subject: [PATCH 02/19] Support for ignoreNulls. --- .../spark/rapids/GpuWindowExpression.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 53244182ba1..a8a87bc3ac8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -1152,11 +1152,7 @@ class FirstRunningWindowFixer(ignoreNulls: Boolean = false) * Only values that are for the same partition by keys should be * modified. Because the input data is sorted by the partition by * columns the boolean values will be grouped together. - * @param sameOrderMask a mask just like `samePartitionMask` but for ordering. This happens - * for some operations like `rank` and `dense_rank` that use the ordering - * columns in a row based query. This is not needed for all fixers and is not - * free to calculate, so you must set `needsOrderMask` to true if you are - * going to use it. + * @param sameOrderMask a mask just like `samePartitionMask` but for ordering. Unused for `FIRST`. * @param unfixedWindowResults the output of the windowAggregation without anything * fixed/modified. This should not be closed by `fixUp` as it will be * handled by the framework. @@ -1166,8 +1162,7 @@ class FirstRunningWindowFixer(ignoreNulls: Boolean = false) override def fixUp(samePartitionMask: Either[ColumnVector, Boolean], sameOrderMask: Option[Either[ColumnVector, Boolean]], unfixedWindowResults: ColumnView): ColumnVector = { - // Ignore `ignoreNulls` for the moment. - // Also, `sameOrderMask` is irrelevant for this operation. + // `sameOrderMask` is irrelevant for this operation. logDebug(s"$name: fix up $previousResult $samePartitionMask") val ret = (previousResult, samePartitionMask) match { case (None, _) => @@ -1176,13 +1171,24 @@ class FirstRunningWindowFixer(ignoreNulls: Boolean = false) case (Some(prev), Right(allRowsInSamePartition)) => // Boolean flag. // All the current batch results may be replaced. if (allRowsInSamePartition) { - ColumnVector.fromScalar(prev, unfixedWindowResults.getRowCount.toInt) + if (!ignoreNulls || prev.isValid) { + // If !ignoreNulls, `prev` is the result for all rows. + // If ignoreNulls *AND* `prev` isn't null, `prev` is the result for all rows. + ColumnVector.fromScalar(prev, unfixedWindowResults.getRowCount.toInt) + } else { + // If ignoreNulls, *AND* `prev` is null, keep the current result. + incRef(unfixedWindowResults) + } } else { // No rows in the same partition. Current result needs no fixing. incRef(unfixedWindowResults) } case (Some(prev), Left(someRowsInSamePartition)) => // Boolean vector. - someRowsInSamePartition.ifElse(prev, unfixedWindowResults) + if (!ignoreNulls || prev.isValid) { + someRowsInSamePartition.ifElse(prev, unfixedWindowResults) + } else { + incRef(unfixedWindowResults) + } } // Reset previous result. ret From 28743c510981ad685449280082b646c3eea08680 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 18 Oct 2023 15:33:55 -0700 Subject: [PATCH 03/19] Fixed backing up previous from current batch. Plus, tests for partitioned/unpartitioned, and keep/ignore nulls. --- .../src/main/python/window_function_test.py | 10 +++++++--- .../spark/rapids/GpuWindowExpression.scala | 17 ++++++++++++++++- .../spark/sql/rapids/AggregateFunctions.scala | 9 --------- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index b4708b89668..dd2a55cdfba 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -494,7 +494,9 @@ def test_window_running_no_part(b_gen, batch_size): 'dense_rank() over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dense_rank_val', 'count(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col', 'min(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', - 'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col'] + 'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col', + 'FIRST(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls'] if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col') @@ -595,7 +597,7 @@ def test_window_running_rank(data_gen): @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen, c_gen', [(long_gen, x) for x in running_part_and_order_gens] + [(x, long_gen) for x in all_basic_gens + [decimal_gen_32bit]], ids=idfn) -def test_window_running(b_gen, c_gen, batch_size): +def test_window_running_foo(b_gen, c_gen, batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} @@ -604,7 +606,9 @@ def test_window_running(b_gen, c_gen, batch_size): 'dense_rank() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dense_rank_val', 'count(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col', 'min(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', - 'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col'] + 'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col', + 'FIRST(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls'] # Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering if isinstance(c_gen.data_type, NumericType) and (not isinstance(c_gen, FloatGen)) and (not isinstance(c_gen, DoubleGen)) and (not isinstance(c_gen, DecimalGen)): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index a8a87bc3ac8..512caa2650e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -1139,6 +1139,18 @@ class FirstRunningWindowFixer(ignoreNulls: Boolean = false) private val name = "first" private var previousResult: Option[Scalar] = None private var chkptPreviousResult: Option[Scalar] = None + + private[this] def resetPrevious(finalOutputColumn: cudf.ColumnVector): Unit = { + val numRows = finalOutputColumn.getRowCount.toInt + if (numRows > 0) { + val lastIndex = numRows - 1 + logDebug(s"$name: updateState from $previousResult to...") + previousResult.foreach(_.close) + previousResult = Some(finalOutputColumn.getScalarElement(lastIndex)) + logDebug(s"$name: ... $previousResult") + } + } + /** * Fix up `windowedColumnOutput` with any stored state from previous batches. * Like all window operations the input data will have been sorted by the partition @@ -1191,7 +1203,10 @@ class FirstRunningWindowFixer(ignoreNulls: Boolean = false) } } // Reset previous result. - ret + closeOnExcept(ret) { ret => + resetPrevious(ret) + ret + } } /** diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 885fa4051f2..bf75d43ceaf 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -1842,15 +1842,6 @@ case class GpuFirst(child: Expression, ignoreNulls: Boolean) .onColumn(inputs.head._2) override def newFixer(): BatchedRunningWindowFixer = new FirstRunningWindowFixer(ignoreNulls) - - /* - // RUNNING WINDOW - - override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] = inputProjection - - override def groupByScanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[GroupByScanAggregation]] = - Seq(AggAndReplace(GroupByScanAggregation.fi)) - */ } case class GpuLast(child: Expression, ignoreNulls: Boolean) From b39a4f36397bef013963976949a7f1bd604e33f5 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 18 Oct 2023 15:53:50 -0700 Subject: [PATCH 04/19] Reworded GpuNthValue Literal check with match. --- .../src/main/python/window_function_test.py | 2 +- .../com/nvidia/spark/rapids/GpuWindowExec.scala | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index dd2a55cdfba..ad5b6f18a26 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -597,7 +597,7 @@ def test_window_running_rank(data_gen): @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen, c_gen', [(long_gen, x) for x in running_part_and_order_gens] + [(x, long_gen) for x in all_basic_gens + [decimal_gen_32bit]], ids=idfn) -def test_window_running_foo(b_gen, c_gen, batch_size): +def test_window_running(b_gen, c_gen, batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 37bd05a4f31..da91df6f948 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -483,10 +483,13 @@ object GpuWindowExec { val isFuncOkay = func match { case _: GpuBatchedRunningWindowWithFixer => true case GpuAggregateExpression(_: GpuBatchedRunningWindowWithFixer, _, _, _ , _) => true - case GpuNthValue(_, offset, _) - if (offset.isInstanceOf[Literal] && offset.asInstanceOf[Literal].value == 1) => - print(offset) - true + case GpuNthValue(_, offset, _) => + GpuOverrides.extractLit(offset) match { + case Some(Literal(value, IntegerType)) if value == 1 => + // FIRST()! Can be solved as batched running window. + true + case _ => false + } case _ => false } isSpecOkay && isFuncOkay @@ -1533,9 +1536,9 @@ class GpuRunningWindowIterator( Some((index, f.newFixer())) case GpuAggregateExpression(f: GpuBatchedRunningWindowWithFixer, _, _, _, _) => Some((index, f.newFixer())) - case GpuNthValue(child@_, offset, ignoreNulls@_) => + case GpuNthValue(child, offset, ignoreNulls) => GpuOverrides.extractLit(offset) match { - case Some(Literal(value@_, IntegerType)) if value == 1 => + case Some(Literal(value, IntegerType)) if value == 1 => // Piggyback on GpuFirst. Some((index, GpuFirst(child, ignoreNulls).newFixer())) case _ => None From 5cc19856f14302329c680c2e95f625f44e649ac7 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Oct 2023 11:04:10 -0700 Subject: [PATCH 05/19] Fix package path. --- .../src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index da91df6f948..be7274f6c97 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Attribut import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.rapids.{GpuAggregateExpression, GpuFirst, GpuNthValue} +import org.apache.spark.sql.rapids.aggregate.{GpuAggregateExpression, GpuFirst, GpuNthValue} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.CalendarInterval From 5c6e85de25045f6f9f6226126ffcbb95ba7f535a Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Oct 2023 11:39:08 -0700 Subject: [PATCH 06/19] Code-style fix: Fixed line length. Also removed unused import. --- .../scala/com/nvidia/spark/rapids/GpuWindowExpression.scala | 2 +- .../apache/spark/sql/rapids/aggregate/aggregateFunctions.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 6b6a03f710e..923078572fd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -1165,7 +1165,7 @@ class FirstRunningWindowFixer(ignoreNulls: Boolean = false) * Only values that are for the same partition by keys should be * modified. Because the input data is sorted by the partition by * columns the boolean values will be grouped together. - * @param sameOrderMask a mask just like `samePartitionMask` but for ordering. Unused for `FIRST`. + * @param sameOrderMask Similar mask for ordering. Unused for `FIRST`. * @param unfixedWindowResults the output of the windowAggregation without anything * fixed/modified. This should not be closed by `fixUp` as it will be * handled by the framework. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala index 687a8a45bfc..ae5a3ba251e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala @@ -1522,7 +1522,6 @@ case class GpuFirst(child: Expression, ignoreNulls: Boolean) extends GpuAggregateFunction with GpuBatchedRunningWindowWithFixer with GpuAggregateWindowFunction -// with GpuRunningWindowFunction with GpuDeterministicFirstLastCollectShim with ImplicitCastInputTypes with Serializable { From 9373b7a02ec77d3340903b925e8eaefa7abe872e Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 23 Oct 2023 13:54:05 -0700 Subject: [PATCH 07/19] Moved First-specific logic out of GpuWindowExec. Signed-off-by: MithunR --- .../nvidia/spark/rapids/GpuWindowExec.scala | 24 ++++----------- .../spark/rapids/GpuWindowExpression.scala | 10 +++---- .../rapids/aggregate/aggregateFunctions.scala | 30 ++++++++++++------- 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index be7274f6c97..2fabf761f4c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -30,11 +30,11 @@ import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimUnaryExecNode} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, CurrentRow, Expression, FrameType, Literal, NamedExpression, RangeFrame, RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, CurrentRow, Expression, FrameType, NamedExpression, RangeFrame, RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.rapids.aggregate.{GpuAggregateExpression, GpuFirst, GpuNthValue} +import org.apache.spark.sql.rapids.aggregate.GpuAggregateExpression import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.CalendarInterval @@ -481,15 +481,8 @@ object GpuWindowExec { def isBatchedRunningFunc(func: Expression, spec: GpuWindowSpecDefinition): Boolean = { val isSpecOkay = isRunningWindow(spec) val isFuncOkay = func match { - case _: GpuBatchedRunningWindowWithFixer => true + case f: GpuBatchedRunningWindowWithFixer => f.newFixer().isDefined case GpuAggregateExpression(_: GpuBatchedRunningWindowWithFixer, _, _, _ , _) => true - case GpuNthValue(_, offset, _) => - GpuOverrides.extractLit(offset) match { - case Some(Literal(value, IntegerType)) if value == 1 => - // FIRST()! Can be solved as batched running window. - true - case _ => false - } case _ => false } isSpecOkay && isFuncOkay @@ -1533,16 +1526,9 @@ class GpuRunningWindowIterator( case (GpuAlias(GpuWindowExpression(func, _), _), index) => func match { case f: GpuBatchedRunningWindowWithFixer => - Some((index, f.newFixer())) + f.newFixer().map(fixer => (index, fixer)) case GpuAggregateExpression(f: GpuBatchedRunningWindowWithFixer, _, _, _, _) => - Some((index, f.newFixer())) - case GpuNthValue(child, offset, ignoreNulls) => - GpuOverrides.extractLit(offset) match { - case Some(Literal(value, IntegerType)) if value == 1 => - // Piggyback on GpuFirst. - Some((index, GpuFirst(child, ignoreNulls).newFixer())) - case _ => None - } + f.newFixer().map(fixer => (index, fixer)) case _ => None } case _ => None diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 923078572fd..059da7ef87f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -950,7 +950,7 @@ trait GpuBatchedRunningWindowWithFixer { /** * Get a new class that can be used to fix up batched running window operations. */ - def newFixer(): BatchedRunningWindowFixer + def newFixer(): Option[BatchedRunningWindowFixer] } /** @@ -1832,7 +1832,7 @@ case class GpuRank(children: Seq[Expression]) extends GpuRunningWindowFunction } } - override def newFixer(): BatchedRunningWindowFixer = new RankFixer() + override def newFixer(): Option[BatchedRunningWindowFixer] = Some(new RankFixer()) } /** @@ -1873,7 +1873,7 @@ case class GpuDenseRank(children: Seq[Expression]) extends GpuRunningWindowFunct override def scanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[ScanAggregation]] = Seq(AggAndReplace(ScanAggregation.denseRank(), None)) - override def newFixer(): BatchedRunningWindowFixer = new DenseRankFixer() + override def newFixer(): Option[BatchedRunningWindowFixer] = Some(new DenseRankFixer()) } /** @@ -1887,8 +1887,8 @@ case object GpuRowNumber extends GpuRunningWindowFunction override def children: Seq[Expression] = Nil - override def newFixer(): BatchedRunningWindowFixer = - new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "row_number") + override def newFixer(): Option[BatchedRunningWindowFixer] = + Some(new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "row_number")) // For group by scans cudf does not support ROW_NUMBER so we will do a SUM // on a column of 1s. We could do a COUNT_ALL too, but it would not be as consistent diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala index ae5a3ba251e..df992e6d2c0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala @@ -26,7 +26,7 @@ import com.nvidia.spark.rapids.shims.{GpuDeterministicFirstLastCollectShim, Shim import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckSuccess -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ImplicitCastInputTypes, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ImplicitCastInputTypes, Literal, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, TypeUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids._ @@ -332,8 +332,8 @@ abstract class GpuMin(child: Expression) extends GpuAggregateFunction RollingAggregation.min().onColumn(inputs.head._2) // RUNNING WINDOW - override def newFixer(): BatchedRunningWindowFixer = - new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MIN, "min") + override def newFixer(): Option[BatchedRunningWindowFixer] = + Some(new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MIN, "min")) // UNBOUNDED TO UNBOUNDED WINDOW override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer = @@ -516,8 +516,8 @@ abstract class GpuMax(child: Expression) extends GpuAggregateFunction RollingAggregation.max().onColumn(inputs.head._2) // RUNNING WINDOW - override def newFixer(): BatchedRunningWindowFixer = - new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MAX, "max") + override def newFixer(): Option[BatchedRunningWindowFixer] = + Some(new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MAX, "max")) // UNBOUNDED TO UNBOUNDED WINDOW override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer = @@ -928,8 +928,8 @@ abstract class GpuSum( override def windowOutput(result: ColumnVector): ColumnVector = result.incRefCount() // RUNNING WINDOW - override def newFixer(): BatchedRunningWindowFixer = - new SumBinaryFixer(resultType, failOnErrorOverride) + override def newFixer(): Option[BatchedRunningWindowFixer] = + Some(new SumBinaryFixer(resultType, failOnErrorOverride)) override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] = windowInputProjection @@ -1286,8 +1286,8 @@ case class GpuCount(children: Seq[Expression], } // RUNNING WINDOW - override def newFixer(): BatchedRunningWindowFixer = - new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "count") + override def newFixer(): Option[BatchedRunningWindowFixer] = + Some(new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "count")) // Scan and group by scan do not support COUNT with nulls excluded. // one of them does not even support count at all, so we are going to SUM @@ -1572,7 +1572,8 @@ case class GpuFirst(child: Expression, ignoreNulls: Boolean) RollingAggregation.nth(0, if (ignoreNulls) NullPolicy.EXCLUDE else NullPolicy.INCLUDE) .onColumn(inputs.head._2) - override def newFixer(): BatchedRunningWindowFixer = new FirstRunningWindowFixer(ignoreNulls) + override def newFixer(): Option[BatchedRunningWindowFixer] = + Some(new FirstRunningWindowFixer(ignoreNulls)) } case class GpuLast(child: Expression, ignoreNulls: Boolean) @@ -1630,6 +1631,7 @@ case class GpuLast(child: Expression, ignoreNulls: Boolean) case class GpuNthValue(child: Expression, offset: Expression, ignoreNulls: Boolean) extends GpuAggregateWindowFunction + with GpuBatchedRunningWindowWithFixer // Only if the N == 1. with ImplicitCastInputTypes with Serializable { @@ -1662,6 +1664,14 @@ case class GpuNthValue(child: Expression, offset: Expression, ignoreNulls: Boole RollingAggregation.nth(offsetVal - 1, if (ignoreNulls) NullPolicy.EXCLUDE else NullPolicy.INCLUDE) .onColumn(inputs.head._2) + + override def newFixer(): Option[BatchedRunningWindowFixer] = { + GpuOverrides.extractLit(offset) match { + case Some(Literal(value, IntegerType)) if value == 1 => + Some(new FirstRunningWindowFixer(ignoreNulls)) + case _ => None + } + } } trait GpuCollectBase From af4938959ee4f04029f22de04489af213e850d26 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 24 Oct 2023 12:14:27 -0700 Subject: [PATCH 08/19] Added test for nth_value. --- integration_tests/src/main/python/window_function_test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 39d921175cb..dc243582f98 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -496,7 +496,9 @@ def test_window_running_no_part(b_gen, batch_size): 'min(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', 'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col', 'FIRST(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', - 'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls'] + 'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(b, 1) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls', + 'NTH_VALUE(b, 1) IGNORE NULLS OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls'] if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col') @@ -608,7 +610,9 @@ def test_window_running(b_gen, c_gen, batch_size): 'min(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', 'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col', 'FIRST(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', - 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls'] + 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(c, 1) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls', + 'NTH_VALUE(c, 1) IGNORE NULLS OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls'] # Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering if isinstance(c_gen.data_type, NumericType) and (not isinstance(c_gen, FloatGen)) and (not isinstance(c_gen, DoubleGen)) and (not isinstance(c_gen, DecimalGen)): From 42ec8924c0a97a64cd755ec0a52682a73bb26683 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 24 Oct 2023 13:44:20 -0700 Subject: [PATCH 09/19] Explicit canFix() check. --- .../nvidia/spark/rapids/GpuWindowExec.scala | 12 +++---- .../spark/rapids/GpuWindowExpression.scala | 16 +++++++--- .../rapids/aggregate/aggregateFunctions.scala | 32 +++++++++++-------- 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 2fabf761f4c..735fba38437 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -481,8 +481,8 @@ object GpuWindowExec { def isBatchedRunningFunc(func: Expression, spec: GpuWindowSpecDefinition): Boolean = { val isSpecOkay = isRunningWindow(spec) val isFuncOkay = func match { - case f: GpuBatchedRunningWindowWithFixer => f.newFixer().isDefined - case GpuAggregateExpression(_: GpuBatchedRunningWindowWithFixer, _, _, _ , _) => true + case f: GpuBatchedRunningWindowWithFixer => f.canFixUp + case GpuAggregateExpression(f: GpuBatchedRunningWindowWithFixer, _, _, _ , _) => f.canFixUp case _ => false } isSpecOkay && isFuncOkay @@ -1525,10 +1525,10 @@ class GpuRunningWindowIterator( boundWindowOps.zipWithIndex.flatMap { case (GpuAlias(GpuWindowExpression(func, _), _), index) => func match { - case f: GpuBatchedRunningWindowWithFixer => - f.newFixer().map(fixer => (index, fixer)) - case GpuAggregateExpression(f: GpuBatchedRunningWindowWithFixer, _, _, _, _) => - f.newFixer().map(fixer => (index, fixer)) + case f: GpuBatchedRunningWindowWithFixer if f.canFixUp => + Some((index, f.newFixer())) + case GpuAggregateExpression(f: GpuBatchedRunningWindowWithFixer, _, _, _, _) + if f.canFixUp => Some((index, f.newFixer())) case _ => None } case _ => None diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 059da7ef87f..fbcf1cdbb81 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -947,10 +947,16 @@ trait BatchedUnboundedToUnboundedWindowFixer extends AutoCloseable { */ trait GpuBatchedRunningWindowWithFixer { + /** + * Checks whether the running window can be fixed up. This should be called before + * newFixer(), to check whether the fixer would work. + */ + def canFixUp: Boolean = true + /** * Get a new class that can be used to fix up batched running window operations. */ - def newFixer(): Option[BatchedRunningWindowFixer] + def newFixer(): BatchedRunningWindowFixer } /** @@ -1832,7 +1838,7 @@ case class GpuRank(children: Seq[Expression]) extends GpuRunningWindowFunction } } - override def newFixer(): Option[BatchedRunningWindowFixer] = Some(new RankFixer()) + override def newFixer(): BatchedRunningWindowFixer = new RankFixer() } /** @@ -1873,7 +1879,7 @@ case class GpuDenseRank(children: Seq[Expression]) extends GpuRunningWindowFunct override def scanAggregation(isRunningBatched: Boolean): Seq[AggAndReplace[ScanAggregation]] = Seq(AggAndReplace(ScanAggregation.denseRank(), None)) - override def newFixer(): Option[BatchedRunningWindowFixer] = Some(new DenseRankFixer()) + override def newFixer(): BatchedRunningWindowFixer = new DenseRankFixer() } /** @@ -1887,8 +1893,8 @@ case object GpuRowNumber extends GpuRunningWindowFunction override def children: Seq[Expression] = Nil - override def newFixer(): Option[BatchedRunningWindowFixer] = - Some(new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "row_number")) + override def newFixer(): BatchedRunningWindowFixer = + new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "row_number") // For group by scans cudf does not support ROW_NUMBER so we will do a SUM // on a column of 1s. We could do a COUNT_ALL too, but it would not be as consistent diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala index df992e6d2c0..29d22e98baf 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala @@ -332,8 +332,8 @@ abstract class GpuMin(child: Expression) extends GpuAggregateFunction RollingAggregation.min().onColumn(inputs.head._2) // RUNNING WINDOW - override def newFixer(): Option[BatchedRunningWindowFixer] = - Some(new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MIN, "min")) + override def newFixer(): BatchedRunningWindowFixer = + new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MIN, "min") // UNBOUNDED TO UNBOUNDED WINDOW override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer = @@ -516,8 +516,8 @@ abstract class GpuMax(child: Expression) extends GpuAggregateFunction RollingAggregation.max().onColumn(inputs.head._2) // RUNNING WINDOW - override def newFixer(): Option[BatchedRunningWindowFixer] = - Some(new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MAX, "max")) + override def newFixer(): BatchedRunningWindowFixer = + new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MAX, "max") // UNBOUNDED TO UNBOUNDED WINDOW override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer = @@ -928,8 +928,8 @@ abstract class GpuSum( override def windowOutput(result: ColumnVector): ColumnVector = result.incRefCount() // RUNNING WINDOW - override def newFixer(): Option[BatchedRunningWindowFixer] = - Some(new SumBinaryFixer(resultType, failOnErrorOverride)) + override def newFixer(): BatchedRunningWindowFixer = + new SumBinaryFixer(resultType, failOnErrorOverride) override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] = windowInputProjection @@ -1286,8 +1286,8 @@ case class GpuCount(children: Seq[Expression], } // RUNNING WINDOW - override def newFixer(): Option[BatchedRunningWindowFixer] = - Some(new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "count")) + override def newFixer(): BatchedRunningWindowFixer = + new BatchedRunningWindowBinaryFixer(BinaryOp.ADD, "count") // Scan and group by scan do not support COUNT with nulls excluded. // one of them does not even support count at all, so we are going to SUM @@ -1572,8 +1572,8 @@ case class GpuFirst(child: Expression, ignoreNulls: Boolean) RollingAggregation.nth(0, if (ignoreNulls) NullPolicy.EXCLUDE else NullPolicy.INCLUDE) .onColumn(inputs.head._2) - override def newFixer(): Option[BatchedRunningWindowFixer] = - Some(new FirstRunningWindowFixer(ignoreNulls)) + override def newFixer(): BatchedRunningWindowFixer = + new FirstRunningWindowFixer(ignoreNulls) } case class GpuLast(child: Expression, ignoreNulls: Boolean) @@ -1665,13 +1665,17 @@ case class GpuNthValue(child: Expression, offset: Expression, ignoreNulls: Boole if (ignoreNulls) NullPolicy.EXCLUDE else NullPolicy.INCLUDE) .onColumn(inputs.head._2) - override def newFixer(): Option[BatchedRunningWindowFixer] = { + override def canFixUp: Boolean = { GpuOverrides.extractLit(offset) match { - case Some(Literal(value, IntegerType)) if value == 1 => - Some(new FirstRunningWindowFixer(ignoreNulls)) - case _ => None + case Some(Literal(value, IntegerType)) if value == 1 => true // Only FIRST() is supported. + case _ => false } } + + override def newFixer(): BatchedRunningWindowFixer = { + assert(canFixUp, "NthValue fixup cannot be done when offset != 1.") + new FirstRunningWindowFixer(ignoreNulls) + } } trait GpuCollectBase From 0c27e154c6d1aecb2127481c49555ce81b5047bf Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 23 Oct 2023 19:02:35 -0700 Subject: [PATCH 10/19] First swipe. --- .../src/main/python/window_function_test.py | 2 +- .../nvidia/spark/rapids/GpuWindowExec.scala | 58 ++++++++++++++++--- 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index dc243582f98..1a386a8fe0c 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -599,7 +599,7 @@ def test_window_running_rank(data_gen): @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen, c_gen', [(long_gen, x) for x in running_part_and_order_gens] + [(x, long_gen) for x in all_basic_gens + [decimal_gen_32bit]], ids=idfn) -def test_window_running(b_gen, c_gen, batch_size): +def test_window_running_foo(b_gen, c_gen, batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 735fba38437..40004a04524 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -17,17 +17,14 @@ package com.nvidia.spark.rapids import java.util.concurrent.TimeUnit - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer - import ai.rapids.cudf import ai.rapids.cudf.{AggregationOverWindow, DType, GroupByOptions, GroupByScanAggregation, NullPolicy, NvtxColor, ReplacePolicy, ReplacePolicyWithColumn, Scalar, ScanAggregation, ScanType, Table, WindowOptions} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource, withResourceIfAllowed} import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimUnaryExecNode} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, CurrentRow, Expression, FrameType, NamedExpression, RangeFrame, RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} @@ -36,7 +33,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.rapids.aggregate.GpuAggregateExpression import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.unsafe.types.CalendarInterval /** @@ -464,11 +461,21 @@ object GpuWindowExec { } def isRunningWindow(spec: GpuWindowSpecDefinition): Boolean = spec match { - case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RowFrame, - GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow))) => true - case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RowFrame, - GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _))) if value == 0 => true - case _ => false + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuSpecialFrameBoundary(UnboundedPreceding), + GpuSpecialFrameBoundary(CurrentRow))) => true + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _))) if value == 0 => true + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RangeFrame, + GpuSpecialFrameBoundary(UnboundedPreceding), + GpuSpecialFrameBoundary(CurrentRow))) => true + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RangeFrame, + GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _))) if value == 0 => true + case _ => false } def isUnboundedToUnboundedWindow(spec: GpuWindowSpecDefinition): Boolean = spec match { @@ -1668,6 +1675,39 @@ case class GpuRunningWindowExec( override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: Nil + override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching) + + override def outputBatching: CoalesceGoal = { + val isRangeFrame = windowOps.exists { + // TODO: Also handle case where it's not a GpuAlias? + case GpuAlias(GpuWindowExpression(_, + GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RangeFrame, _, _))), + _) => true + case _ => false + } + if (!isRangeFrame) { + return null // NO batching restrictions on ROW frames. + } + if (gpuPartitionSpec.isEmpty) { + // windowOps + // .filter( GpuWindowExpression OR GpuAlias for GpuWindowExpression where windowFunction is + // GpuFirst or NthValue(1)) + // .filter( window-frame.type == RowFrame && boundaries == [UNB_PREC, CURRENT] ) + // Then, BatchedByKey(gpuOrderSpec)(cpuOrderSpec) + /* + windowOps.withFilter( e => e match { + case GpuWindowExpression(winFoo: Expression, _) if winFoo.isInstanceOf[GpuNthValue] => true + case _ => false + }) + */ + BatchedByKey(gpuOrderSpec)(cpuOrderSpec) + } else { + // BatchedByKey(gpuPartitionOrdering)(cpuPartitionOrdering) + // Same as for unpartitioned, but include gpu/cpuPartitionOrdering. + BatchedByKey(gpuPartitionOrdering ++ gpuOrderSpec)(cpuPartitionOrdering ++ cpuOrderSpec) + } + } + override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputBatches = gpuLongMetric(GpuMetric.NUM_OUTPUT_BATCHES) val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS) From a14bb8e868d0a0e4c5f7d09e931e710072e76cef Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 25 Oct 2023 12:03:15 -0700 Subject: [PATCH 11/19] Removed unnecessary TODO. GpuAlias covers the isRangeFrame check. --- .../nvidia/spark/rapids/GpuWindowExec.scala | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 40004a04524..b8cecaa91e4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -1679,31 +1679,20 @@ case class GpuRunningWindowExec( override def outputBatching: CoalesceGoal = { val isRangeFrame = windowOps.exists { - // TODO: Also handle case where it's not a GpuAlias? - case GpuAlias(GpuWindowExpression(_, - GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RangeFrame, _, _))), - _) => true + case GpuAlias( + GpuWindowExpression( + _, GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(RangeFrame, _, _))), + _) => true case _ => false } if (!isRangeFrame) { return null // NO batching restrictions on ROW frames. } if (gpuPartitionSpec.isEmpty) { - // windowOps - // .filter( GpuWindowExpression OR GpuAlias for GpuWindowExpression where windowFunction is - // GpuFirst or NthValue(1)) - // .filter( window-frame.type == RowFrame && boundaries == [UNB_PREC, CURRENT] ) - // Then, BatchedByKey(gpuOrderSpec)(cpuOrderSpec) - /* - windowOps.withFilter( e => e match { - case GpuWindowExpression(winFoo: Expression, _) if winFoo.isInstanceOf[GpuNthValue] => true - case _ => false - }) - */ + // If unpartitioned, batch on the order-by column. BatchedByKey(gpuOrderSpec)(cpuOrderSpec) } else { - // BatchedByKey(gpuPartitionOrdering)(cpuPartitionOrdering) - // Same as for unpartitioned, but include gpu/cpuPartitionOrdering. + // If partitioned, batch on partition-columns + order-by columns. BatchedByKey(gpuPartitionOrdering ++ gpuOrderSpec)(cpuPartitionOrdering ++ cpuOrderSpec) } } From d2915d6b2c14df40f959aa072288f1ba618aa3fb Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 27 Oct 2023 15:25:40 -0700 Subject: [PATCH 12/19] Fixed rebase error. --- .../src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 180e9db82dc..86c6fcad527 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -25,11 +25,8 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource, withResourceIfA import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.{GpuWindowUtil, ShimUnaryExecNode} -<<<<<<< HEAD -======= import org.apache.spark.TaskContext ->>>>>>> origin/branch-23.12 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, CurrentRow, Expression, FrameType, NamedExpression, RangeFrame, RowFrame, SortOrder, UnboundedFollowing, UnboundedPreceding} From 917fa07bd5a4760e71b7a9366cce98275ff8dd70 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 27 Oct 2023 15:49:53 -0700 Subject: [PATCH 13/19] Range [UNB, CURRENT] tests. Round#1. --- .../src/main/python/window_function_test.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 23b155f346d..0a6157b0a7b 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -515,6 +515,37 @@ def test_window_running_no_part(b_gen, batch_size): validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) + +# TODO: ROW vs RANGE parametrization? +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:')) +def test_range_running_window_no_part(b_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + query_parts = ['COUNT(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', + 'MIN(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', + 'MAX(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', + 'FIRST(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(b, TRUE) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(b, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls'] + + if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): + query_parts.append('SUM(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') + + if spark_version() > "3.1.1": + query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' + '(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') + + assert_gpu_and_cpu_are_equal_sql( + lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14), + "window_agg_table", + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', + validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], + conf = conf) + + # Test that we can do a running window sum on floats and doubles. This becomes problematic because we do the agg in parallel # which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations. # We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have From b8bec1b9ea4fce816ef70721b6c11368695cb83a Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 27 Oct 2023 15:59:14 -0700 Subject: [PATCH 14/19] Fixed verify errors. Signed-off-by: MithunR --- .../nvidia/spark/rapids/GpuWindowExec.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 86c6fcad527..760196dbf94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -17,8 +17,10 @@ package com.nvidia.spark.rapids import java.util.concurrent.TimeUnit + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer + import ai.rapids.cudf import ai.rapids.cudf.{AggregationOverWindow, DType, GroupByOptions, GroupByScanAggregation, NullPolicy, NvtxColor, ReplacePolicy, ReplacePolicyWithColumn, Scalar, ScanAggregation, ScanType, Table, WindowOptions} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource, withResourceIfAllowed} @@ -35,7 +37,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.rapids.aggregate.GpuAggregateExpression import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.CalendarInterval /** @@ -467,16 +469,18 @@ object GpuWindowExec { RowFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow))) => true - case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( - RowFrame, - GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _))) if value == 0 => true + case GpuWindowSpecDefinition(_, _, + GpuSpecifiedWindowFrame(RowFrame, + GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _))) + if value == 0 => true case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( RangeFrame, GpuSpecialFrameBoundary(UnboundedPreceding), GpuSpecialFrameBoundary(CurrentRow))) => true - case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( - RangeFrame, - GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _))) if value == 0 => true + case GpuWindowSpecDefinition(_, _, + GpuSpecifiedWindowFrame(RangeFrame, + GpuSpecialFrameBoundary(UnboundedPreceding), GpuLiteral(value, _))) + if value == 0 => true case _ => false } From 22aaf7ce673755c77ee59bc7e40e4ebca6ccb563 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 31 Oct 2023 12:57:21 -0700 Subject: [PATCH 15/19] More comprehensive RANGE window tests. --- .../src/main/python/window_function_test.py | 216 +++++++++++------- 1 file changed, 133 insertions(+), 83 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 19d72150475..2e2730a1b04 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -481,68 +481,68 @@ def test_window_batched_unbounded(b_gen, batch_size): validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'], conf = conf) -# This is for aggregations that work with a running window optimization. They don't need to be batched -# specially, but it only works if all of the aggregations can support this. -# the order returned should be consistent because the data ends up in a single task (no partitioning) + @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:')) def test_window_running_no_part(b_gen, batch_size): + """ + This is for aggregations that work with a running window optimization. They don't need to be batched + specially, but it only works if all of the aggregations can support this. + The order returned should be consistent because the data ends up in a single task (no partitioning). + + The running window optimization applies to both ROW-based and RANGE-based window specifications, + so long as the bounds are defined as [UNBOUNDED PRECEDING, CURRENT ROW]. For both categories, + this test verifies the following: + 1. All tested aggregations invoke `GpuRunningWindowExec`, indicating that the running window + optimization is in effect. + 2. The execution is batched, i.e. does not require that the entire input is loaded at once. + 3. The CPU and GPU runs produce the same results, regardless of batch size. + + Note that none of the ranking functions (including ROW_NUMBER) can be tested as a RANGE query. + By definition, ranking functions require ROW frames. + """ conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - query_parts = ['row_number() over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as row_num', - 'rank() over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as rank_val', - 'dense_rank() over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dense_rank_val', - 'count(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col', - 'min(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', - 'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col', - 'FIRST(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', - 'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', - 'NTH_VALUE(b, 1) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls'] + row_query_parts = [ + 'ROW_NUMBER() OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS row_num', + 'RANK() OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank_val', + 'DENSE_RANK() OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank_val', + 'COUNT(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', + 'MIN(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', + 'MAX(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', + 'FIRST(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(b, 1) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls', + ] + + range_query_parts = [ + 'COUNT(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_count_col', + 'MIN(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_min_col', + 'MAX(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_max_col', + 'FIRST(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_first_keep_nulls', + 'FIRST(b, TRUE) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_first_ignore_nulls', + 'NTH_VALUE(b, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_nth_1_keep_nulls', + ] if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): - query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col') + row_query_parts.append('SUM(b) OVER ' + '(ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') + range_query_parts.append('SUM(b) OVER ' + '(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_sum_col') # The option to IGNORE NULLS in NTH_VALUE is not available prior to Spark 3.2.1. if spark_version() >= "3.2.1": - query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' - '(ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') - - assert_gpu_and_cpu_are_equal_sql( - lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14), - "window_agg_table", - 'select ' + - ', '.join(query_parts) + - ' from window_agg_table ', - validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], - conf = conf) - - -# TODO: ROW vs RANGE parametrization? -@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:')) -def test_range_running_window_no_part(b_gen, batch_size): - conf = {'spark.rapids.sql.batchSizeBytes': batch_size, - 'spark.rapids.sql.castFloatToDecimal.enabled': True} - query_parts = ['COUNT(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', - 'MIN(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', - 'MAX(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', - 'FIRST(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', - 'FIRST(b, TRUE) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', - 'NTH_VALUE(b, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls'] - - if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): - query_parts.append('SUM(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') - - if spark_version() > "3.1.1": - query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' - '(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') + row_query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' + '(ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') + range_query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' + '(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_nth_1_ignore_nulls') assert_gpu_and_cpu_are_equal_sql( lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14), "window_agg_table", - 'select ' + - ', '.join(query_parts) + - ' from window_agg_table ', + 'SELECT ' + + ', '.join(row_query_parts + range_query_parts) + + ' FROM window_agg_table ', validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) @@ -558,18 +558,27 @@ def test_running_float_sum_no_part(batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - query_parts = ['a', - 'sum(cast(b as double)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_dbl_sum', - 'sum(abs(dbl)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', - 'sum(cast(b as float)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_flt_sum', - 'sum(abs(flt)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum'] + row_query_parts = [ + 'a', + 'SUM(CAST(b as DOUBLE)) OVER (ORDER BY a rows BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS shrt_dbl_sum', + 'SUM(ABS(dbl)) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dbl_sum', + 'SUM(CAST(b AS FLOAT)) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS shrt_flt_sum', + 'SUM(ABS(flt)) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS flt_sum' + ] + + range_query_parts = [ + 'SUM(CAST(b AS DOUBLE)) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_shrt_dbl_sum', + 'SUM(ABS(dbl)) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_dbl_sum', + 'SUM(CAST(b AS FLOAT)) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_shrt_flt_sum', + 'SUM(ABS(flt)) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_flt_sum' + ] gen = StructGen([('a', UniqueLongGen()),('b', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, gen, length=1024 * 14), "window_agg_table", 'select ' + - ', '.join(query_parts) + + ', '.join(row_query_parts + range_query_parts) + ' from window_agg_table ', validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) @@ -627,43 +636,73 @@ def test_window_running_rank(data_gen): validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) -# This is for aggregations that work with a running window optimization. They don't need to be batched -# specially, but it only works if all of the aggregations can support this. -# In a distributed setup the order of the partitions returned might be different, so we must ignore the order -# but small batch sizes can make sort very slow, so do the final order by locally + @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen, c_gen', [(long_gen, x) for x in running_part_and_order_gens] + [(x, long_gen) for x in all_basic_gens + [decimal_gen_32bit]], ids=idfn) -def test_window_running(b_gen, c_gen, batch_size): +def test_window_running_foo(b_gen, c_gen, batch_size): + """ + This is for aggregations that work with a running window optimization. They don't need to be batched + specially, but it only works if all of the aggregations can support this. + In a distributed setup the order of the partitions returned might be different, so we must ignore the order + but small batch sizes can make sort very slow, so do the final order by locally. + + The running window optimization applies to both ROW-based and RANGE-based window specifications, + so long as the bounds are defined as [UNBOUNDED PRECEDING, CURRENT ROW]. For both categories, + this test verifies the following: + 1. All tested aggregations invoke `GpuRunningWindowExec`, indicating that the running window + optimization is in effect. + 2. The execution is batched, i.e. does not require that the entire input is loaded at once. + 3. The CPU and GPU runs produce the same results, regardless of batch size. + + Note that none of the ranking functions (including ROW_NUMBER) can be tested as a RANGE query. + By definition, ranking functions require ROW frames. + """ + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - query_parts = ['b', 'a', 'row_number() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as row_num', - 'rank() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as rank_val', - 'dense_rank() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dense_rank_val', - 'count(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col', - 'min(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', - 'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col', - 'FIRST(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', - 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', - 'NTH_VALUE(c, 1) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls'] + row_query_parts = [ + 'b', 'a', + 'ROW_NUMBER() OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS row_num', + 'RANK() OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank_val', + 'DENSE_RANK() OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank_val', + 'COUNT(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', + 'MIN(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', + 'MAX(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', + 'FIRST(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(c, 1) OVER (PARTITION BY B ORDER BY A ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls' + ] + + range_query_parts = [ + 'COUNT(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_count', + 'MIN(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_min', + 'MAX(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_max', + 'FIRST(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_first_nulls', + 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_first_no_nulls', + 'NTH_VALUE(c, 1) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_nth_1_keep_nulls' + ] # Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering if isinstance(c_gen.data_type, NumericType) and (not isinstance(c_gen, FloatGen)) and (not isinstance(c_gen, DoubleGen)) and (not isinstance(c_gen, DecimalGen)): - query_parts.append('sum(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col') + row_query_parts.append('SUM(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') + range_query_parts.append('SUM(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_sum') # The option to IGNORE NULLS in NTH_VALUE is not available prior to Spark 3.2.1. if spark_version() >= "3.2.1": - query_parts.append('NTH_VALUE(c, 1) IGNORE NULLS OVER ' - '(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') + row_query_parts.append('NTH_VALUE(c, 1) IGNORE NULLS OVER ' + '(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') + range_query_parts.append('NTH_VALUE(c, 1) IGNORE NULLS OVER ' + '(PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_nth_1_no_nulls') assert_gpu_and_cpu_are_equal_sql( lambda spark : three_col_df(spark, UniqueLongGen(), RepeatSeqGen(b_gen, length=100), c_gen, length=1024 * 14), "window_agg_table", - 'select ' + - ', '.join(query_parts) + - ' from window_agg_table ', + 'SELECT ' + + ', '.join(row_query_parts + range_query_parts) + + ' FROM window_agg_table ', validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) @@ -674,26 +713,37 @@ def test_window_running(b_gen, c_gen, batch_size): # decimal is problematic if the precision is so high it falls back to the CPU. # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally +@approximate_float @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_window_running_float_decimal_sum(batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - query_parts = ['b', 'a', - 'sum(cast(c as double)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', - 'sum(abs(dbl)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', - 'sum(cast(c as float)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum', - 'sum(abs(flt)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum', - 'sum(cast(c as Decimal(6,1))) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dec_sum'] + row_query_parts = [ + 'b', 'a', + 'SUM(CAST(c AS DOUBLE)) OVER (PARTITION BY b ORDER BY A ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dbl_sum', + 'SUM(ABS(dbl)) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dbl_sum', + 'SUM(CAST(c AS FLOAT)) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS flt_sum', + 'SUM(ABS(flt)) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS flt_sum', + 'SUM(CAST(c AS DECIMAL(6,1))) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dec_sum' + ] + + range_query_parts = [ + 'SUM(CAST(c AS DOUBLE)) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_dbl_sum', + 'SUM(ABS(dbl)) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_abs_dbl_sum', + 'SUM(CAST(c AS FLOAT)) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_flt_sum', + 'SUM(ABS(FLT)) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_abs_flt_sum', + 'SUM(CAST(c AS DECIMAL(6,1))) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_dec_sum' + ] gen = StructGen([('a', UniqueLongGen()),('b', RepeatSeqGen(int_gen, length=1000)),('c', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, gen, length=1024 * 14), "window_agg_table", - 'select ' + - ', '.join(query_parts) + - ' from window_agg_table ', + 'SELECT ' + + ', '.join(row_query_parts + range_query_parts) + + ' FROM window_agg_table ', validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) From f6f6c62412f162528e0d5ec226c2393b9fcd2609 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 31 Oct 2023 15:53:27 -0700 Subject: [PATCH 16/19] Revert "More comprehensive RANGE window tests." This reverts commit 22aaf7ce673755c77ee59bc7e40e4ebca6ccb563. Best not combine the running window ROW/RANGE tests. --- .../src/main/python/window_function_test.py | 216 +++++++----------- 1 file changed, 83 insertions(+), 133 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 2e2730a1b04..19d72150475 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -481,68 +481,68 @@ def test_window_batched_unbounded(b_gen, batch_size): validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'], conf = conf) - +# This is for aggregations that work with a running window optimization. They don't need to be batched +# specially, but it only works if all of the aggregations can support this. +# the order returned should be consistent because the data ends up in a single task (no partitioning) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:')) def test_window_running_no_part(b_gen, batch_size): - """ - This is for aggregations that work with a running window optimization. They don't need to be batched - specially, but it only works if all of the aggregations can support this. - The order returned should be consistent because the data ends up in a single task (no partitioning). - - The running window optimization applies to both ROW-based and RANGE-based window specifications, - so long as the bounds are defined as [UNBOUNDED PRECEDING, CURRENT ROW]. For both categories, - this test verifies the following: - 1. All tested aggregations invoke `GpuRunningWindowExec`, indicating that the running window - optimization is in effect. - 2. The execution is batched, i.e. does not require that the entire input is loaded at once. - 3. The CPU and GPU runs produce the same results, regardless of batch size. - - Note that none of the ranking functions (including ROW_NUMBER) can be tested as a RANGE query. - By definition, ranking functions require ROW frames. - """ conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - row_query_parts = [ - 'ROW_NUMBER() OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS row_num', - 'RANK() OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank_val', - 'DENSE_RANK() OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank_val', - 'COUNT(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', - 'MIN(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', - 'MAX(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', - 'FIRST(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', - 'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', - 'NTH_VALUE(b, 1) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls', - ] - - range_query_parts = [ - 'COUNT(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_count_col', - 'MIN(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_min_col', - 'MAX(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_max_col', - 'FIRST(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_first_keep_nulls', - 'FIRST(b, TRUE) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_first_ignore_nulls', - 'NTH_VALUE(b, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_nth_1_keep_nulls', - ] + query_parts = ['row_number() over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as row_num', + 'rank() over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as rank_val', + 'dense_rank() over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dense_rank_val', + 'count(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col', + 'min(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', + 'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col', + 'FIRST(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(b, 1) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls'] if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): - row_query_parts.append('SUM(b) OVER ' - '(ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') - range_query_parts.append('SUM(b) OVER ' - '(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_sum_col') + query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col') # The option to IGNORE NULLS in NTH_VALUE is not available prior to Spark 3.2.1. if spark_version() >= "3.2.1": - row_query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' - '(ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') - range_query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' - '(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_nth_1_ignore_nulls') + query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' + '(ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') assert_gpu_and_cpu_are_equal_sql( lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14), "window_agg_table", - 'SELECT ' + - ', '.join(row_query_parts + range_query_parts) + - ' FROM window_agg_table ', + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', + validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], + conf = conf) + + +# TODO: ROW vs RANGE parametrization? +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:')) +def test_range_running_window_no_part(b_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + query_parts = ['COUNT(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', + 'MIN(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', + 'MAX(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', + 'FIRST(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(b, TRUE) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(b, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls'] + + if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): + query_parts.append('SUM(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') + + if spark_version() > "3.1.1": + query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' + '(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') + + assert_gpu_and_cpu_are_equal_sql( + lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14), + "window_agg_table", + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) @@ -558,27 +558,18 @@ def test_running_float_sum_no_part(batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - row_query_parts = [ - 'a', - 'SUM(CAST(b as DOUBLE)) OVER (ORDER BY a rows BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS shrt_dbl_sum', - 'SUM(ABS(dbl)) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dbl_sum', - 'SUM(CAST(b AS FLOAT)) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS shrt_flt_sum', - 'SUM(ABS(flt)) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS flt_sum' - ] - - range_query_parts = [ - 'SUM(CAST(b AS DOUBLE)) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_shrt_dbl_sum', - 'SUM(ABS(dbl)) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_dbl_sum', - 'SUM(CAST(b AS FLOAT)) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_shrt_flt_sum', - 'SUM(ABS(flt)) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_flt_sum' - ] + query_parts = ['a', + 'sum(cast(b as double)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_dbl_sum', + 'sum(abs(dbl)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', + 'sum(cast(b as float)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as shrt_flt_sum', + 'sum(abs(flt)) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum'] gen = StructGen([('a', UniqueLongGen()),('b', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, gen, length=1024 * 14), "window_agg_table", 'select ' + - ', '.join(row_query_parts + range_query_parts) + + ', '.join(query_parts) + ' from window_agg_table ', validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) @@ -636,73 +627,43 @@ def test_window_running_rank(data_gen): validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) - +# This is for aggregations that work with a running window optimization. They don't need to be batched +# specially, but it only works if all of the aggregations can support this. +# In a distributed setup the order of the partitions returned might be different, so we must ignore the order +# but small batch sizes can make sort very slow, so do the final order by locally @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('b_gen, c_gen', [(long_gen, x) for x in running_part_and_order_gens] + [(x, long_gen) for x in all_basic_gens + [decimal_gen_32bit]], ids=idfn) -def test_window_running_foo(b_gen, c_gen, batch_size): - """ - This is for aggregations that work with a running window optimization. They don't need to be batched - specially, but it only works if all of the aggregations can support this. - In a distributed setup the order of the partitions returned might be different, so we must ignore the order - but small batch sizes can make sort very slow, so do the final order by locally. - - The running window optimization applies to both ROW-based and RANGE-based window specifications, - so long as the bounds are defined as [UNBOUNDED PRECEDING, CURRENT ROW]. For both categories, - this test verifies the following: - 1. All tested aggregations invoke `GpuRunningWindowExec`, indicating that the running window - optimization is in effect. - 2. The execution is batched, i.e. does not require that the entire input is loaded at once. - 3. The CPU and GPU runs produce the same results, regardless of batch size. - - Note that none of the ranking functions (including ROW_NUMBER) can be tested as a RANGE query. - By definition, ranking functions require ROW frames. - """ - +def test_window_running(b_gen, c_gen, batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - row_query_parts = [ - 'b', 'a', - 'ROW_NUMBER() OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS row_num', - 'RANK() OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank_val', - 'DENSE_RANK() OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank_val', - 'COUNT(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', - 'MIN(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', - 'MAX(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', - 'FIRST(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', - 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', - 'NTH_VALUE(c, 1) OVER (PARTITION BY B ORDER BY A ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls' - ] - - range_query_parts = [ - 'COUNT(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_count', - 'MIN(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_min', - 'MAX(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_max', - 'FIRST(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_first_nulls', - 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_first_no_nulls', - 'NTH_VALUE(c, 1) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_nth_1_keep_nulls' - ] + query_parts = ['b', 'a', 'row_number() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as row_num', + 'rank() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as rank_val', + 'dense_rank() over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dense_rank_val', + 'count(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as count_col', + 'min(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as min_col', + 'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col', + 'FIRST(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(c, 1) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls'] # Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering if isinstance(c_gen.data_type, NumericType) and (not isinstance(c_gen, FloatGen)) and (not isinstance(c_gen, DoubleGen)) and (not isinstance(c_gen, DecimalGen)): - row_query_parts.append('SUM(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') - range_query_parts.append('SUM(c) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_sum') + query_parts.append('sum(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col') # The option to IGNORE NULLS in NTH_VALUE is not available prior to Spark 3.2.1. if spark_version() >= "3.2.1": - row_query_parts.append('NTH_VALUE(c, 1) IGNORE NULLS OVER ' - '(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') - range_query_parts.append('NTH_VALUE(c, 1) IGNORE NULLS OVER ' - '(PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_nth_1_no_nulls') + query_parts.append('NTH_VALUE(c, 1) IGNORE NULLS OVER ' + '(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') assert_gpu_and_cpu_are_equal_sql( lambda spark : three_col_df(spark, UniqueLongGen(), RepeatSeqGen(b_gen, length=100), c_gen, length=1024 * 14), "window_agg_table", - 'SELECT ' + - ', '.join(row_query_parts + range_query_parts) + - ' FROM window_agg_table ', + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) @@ -713,37 +674,26 @@ def test_window_running_foo(b_gen, c_gen, batch_size): # decimal is problematic if the precision is so high it falls back to the CPU. # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally -@approximate_float @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_window_running_float_decimal_sum(batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.variableFloatAgg.enabled': True, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - row_query_parts = [ - 'b', 'a', - 'SUM(CAST(c AS DOUBLE)) OVER (PARTITION BY b ORDER BY A ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dbl_sum', - 'SUM(ABS(dbl)) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dbl_sum', - 'SUM(CAST(c AS FLOAT)) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS flt_sum', - 'SUM(ABS(flt)) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS flt_sum', - 'SUM(CAST(c AS DECIMAL(6,1))) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dec_sum' - ] - - range_query_parts = [ - 'SUM(CAST(c AS DOUBLE)) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_dbl_sum', - 'SUM(ABS(dbl)) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_abs_dbl_sum', - 'SUM(CAST(c AS FLOAT)) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_flt_sum', - 'SUM(ABS(FLT)) OVER (PARTITION BY b ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_abs_flt_sum', - 'SUM(CAST(c AS DECIMAL(6,1))) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS range_dec_sum' - ] + query_parts = ['b', 'a', + 'sum(cast(c as double)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', + 'sum(abs(dbl)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dbl_sum', + 'sum(cast(c as float)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum', + 'sum(abs(flt)) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as flt_sum', + 'sum(cast(c as Decimal(6,1))) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as dec_sum'] gen = StructGen([('a', UniqueLongGen()),('b', RepeatSeqGen(int_gen, length=1000)),('c', short_gen),('flt', float_gen),('dbl', double_gen)], nullable=False) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, gen, length=1024 * 14), "window_agg_table", - 'SELECT ' + - ', '.join(row_query_parts + range_query_parts) + - ' FROM window_agg_table ', + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) From 2d8eb6530436031cc79ec38bf9bb0b2b62376869 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 1 Nov 2023 10:50:44 -0700 Subject: [PATCH 17/19] Separate tests for ROWS and RANGE, with repeated rows in order-by. --- .../src/main/python/window_function_test.py | 91 +++++++++++++++---- 1 file changed, 74 insertions(+), 17 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 19d72150475..23797f9fe8b 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -18,7 +18,7 @@ from data_gen import * from marks import * from pyspark.sql.types import * -from pyspark.sql.types import NumericType +from pyspark.sql.types import DateType, TimestampType, NumericType from pyspark.sql.window import Window import pyspark.sql.functions as f from spark_session import is_before_spark_320, is_databricks113_or_later, spark_version @@ -517,28 +517,52 @@ def test_window_running_no_part(b_gen, batch_size): conf = conf) -# TODO: ROW vs RANGE parametrization? -@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:')) -def test_range_running_window_no_part(b_gen, batch_size): +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Testing multiple batch sizes. +@pytest.mark.parametrize('a_gen', integral_gens + [string_gen, date_gen, timestamp_gen], ids=meta_idfn('data:')) +def test_running_window_without_partitions_runs_batched(a_gen, batch_size): + """ + This tests the running window optimization as applied to RANGE-based window specifications, + so long as the bounds are defined as [UNBOUNDED PRECEDING, CURRENT ROW]. + This test verifies the following: + 1. All tested aggregations invoke `GpuRunningWindowExec`, indicating that the running window + optimization is in effect. + 2. The execution is batched, i.e. does not require that the entire input is loaded at once. + 3. The CPU and GPU runs produce the same results, regardless of batch size. + + Note that none of the ranking functions (including ROW_NUMBER) can be tested as a RANGE query. + By definition, ranking functions require ROW frames. + + Note, also, that the order-by column is not generated via `UniqueLongGen()`. This is specifically + to test the case where `CURRENT ROW` might include more than a single row (as is possible in + RANGE queries). To mitigate the occurrence of non-deterministic results, the order-by column + is also used in the aggregation. This way, regardless of order, the same value is aggregated. + """ conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.castFloatToDecimal.enabled': True} - query_parts = ['COUNT(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', - 'MIN(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', - 'MAX(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', - 'FIRST(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', - 'FIRST(b, TRUE) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', - 'NTH_VALUE(b, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls'] + query_parts = [ + 'COUNT(a) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS count_col', + 'MIN(a) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS min_col', + 'MAX(a) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max_col', + 'FIRST(a) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls', + 'FIRST(a, TRUE) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls', + 'NTH_VALUE(a, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls' + ] + + def supports_sum(gen): + if isinstance(gen, DateType) or isinstance(gen.data_type, TimestampType): + return False + return isinstance(gen.data_type, NumericType) and \ + not isinstance(gen, FloatGen) and not isinstance(gen, DoubleGen) + + if supports_sum(a_gen): + query_parts.append('SUM(a) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') - if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen): - query_parts.append('SUM(b) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') - - if spark_version() > "3.1.1": - query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER ' + if spark_version() >= "3.2.1": + query_parts.append('NTH_VALUE(a, 1) IGNORE NULLS OVER ' '(ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls') assert_gpu_and_cpu_are_equal_sql( - lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14), + lambda spark: gen_df(spark, StructGen([('a', a_gen)], nullable=False), length=1024*14), "window_agg_table", 'select ' + ', '.join(query_parts) + @@ -574,6 +598,39 @@ def test_running_float_sum_no_part(batch_size): validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) + +@approximate_float +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Tests different batch sizes. +def test_running_window_float_sum_without_partitions_runs_batched(batch_size): + """ + This test is very similar to test_running_float_sum_no_part, except that it checks that RANGE window SUM + aggregations can run in batched mode. + Note that in the RANGE case, the test needs to check the case where there are repeats in the order-by column. + This covers the case where `CURRENT ROW` might refer to multiple rows in the order-by column. This does introduce + the possibility of non-deterministic results, because the ordering with repeated values isn't deterministic. + This is mitigated by aggregating on the same column as the order-by column, such that the same value is aggregated + for the repeated keys. + """ + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.variableFloatAgg.enabled': True, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + query_parts = ['b', + 'SUM(CAST(b AS DOUBLE)) OVER (ORDER BY b RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS shrt_dbl_sum', + 'SUM(ABS(dbl)) OVER (ORDER BY b RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS dbl_sum', + 'SUM(CAST(b AS FLOAT)) OVER (ORDER BY b RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS shrt_flt_sum', + 'SUM(ABS(flt)) OVER (ORDER BY b RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS flt_sum'] + + gen = StructGen([('b', short_gen), ('flt', float_gen), ('dbl', double_gen)], nullable=False) + assert_gpu_and_cpu_are_equal_sql( + lambda spark : gen_df(spark, gen, length=1024 * 14), + "window_agg_table", + 'SELECT ' + + ', '.join(query_parts) + + ' FROM window_agg_table ', + validate_execs_in_gpu_plan=['GpuRunningWindowExec'], + conf=conf) + + # Rank aggregations are running window aggregations but they care about the ordering. In most tests we don't # allow duplicate ordering, because that makes the results ambiguous. If two rows end up being switched even # if the order-by column is the same then we can get different results for say a running sum. Here we are going From d9540f9d7aa0fc31f38854a4c00d5892f8aa4407 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 1 Nov 2023 14:31:15 -0700 Subject: [PATCH 18/19] Partitioned test for batched RANGE query. Signed-off-by: MithunR --- .../src/main/python/window_function_test.py | 72 ++++++++++++++++++- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 23797f9fe8b..02abb213fe0 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -548,13 +548,14 @@ def test_running_window_without_partitions_runs_batched(a_gen, batch_size): 'NTH_VALUE(a, 1) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls' ] - def supports_sum(gen): + def must_test_sum_aggregation(gen): if isinstance(gen, DateType) or isinstance(gen.data_type, TimestampType): - return False + return False # These types do not support SUM(). + # For Float/Double types, skip `SUM()` test. This is tested in test_running_float_sum_no_part. return isinstance(gen.data_type, NumericType) and \ not isinstance(gen, FloatGen) and not isinstance(gen, DoubleGen) - if supports_sum(a_gen): + if must_test_sum_aggregation(a_gen): query_parts.append('SUM(a) OVER (ORDER BY a RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum_col') if spark_version() >= "3.2.1": @@ -724,6 +725,71 @@ def test_window_running(b_gen, c_gen, batch_size): validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) + +@ignore_order(local=True) +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Test different batch sizes. +@pytest.mark.parametrize('part_gen', [int_gen, long_gen], ids=idfn) # Partitioning is not really the focus of the test. +@pytest.mark.parametrize('order_gen', [x for x in all_basic_gens_no_null if x not in boolean_gens] + [decimal_gen_32bit], ids=idfn) +def test_range_running_window_runs_batched(part_gen, order_gen, batch_size): + """ + This tests the running window optimization as applied to RANGE-based window specifications, + so long as the bounds are defined as [UNBOUNDED PRECEDING, CURRENT ROW]. + This test verifies the following: + 1. All tested aggregations invoke `GpuRunningWindowExec`, indicating that the running window + optimization is in effect. + 2. The execution is batched, i.e. does not require that the entire input is loaded at once. + 3. The CPU and GPU runs produce the same results, regardless of batch size. + + Note that none of the ranking functions (including ROW_NUMBER) can be tested as a RANGE query. + By definition, ranking functions require ROW frames. + + Note, also, that the order-by column is not generated via `UniqueLongGen()`. This is specifically + to test the case where `CURRENT ROW` might include more than a single row (as is possible in + RANGE queries). To mitigate the occurrence of non-deterministic results, the order-by column + is also used in the aggregation. This way, regardless of order, the same value is aggregated. + """ + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.variableFloatAgg.enabled': True, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + window = "(PARTITION BY p ORDER BY oby RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + query_parts = [ + 'p', 'oby', + 'COUNT(oby) OVER ' + window + ' AS count_col', + 'MIN(oby) OVER ' + window + ' AS min_col', + 'MAX(oby) OVER ' + window + ' AS max_col', + 'FIRST(oby) OVER ' + window + ' AS first_keep_nulls', + 'FIRST(oby, TRUE) OVER ' + window + ' AS first_ignore_nulls', + 'NTH_VALUE(oby, 1) OVER ' + window + ' AS nth_1_keep_nulls' + ] + + def must_test_sum_aggregation(gen): + if isinstance(gen, DateType) or isinstance(gen.data_type, TimestampType): + return False # These types do not support SUM(). + # For Float/Double types, skip `SUM()` test. This is tested later. + # Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering + return isinstance(gen.data_type, NumericType) and \ + not isinstance(gen, FloatGen) and not isinstance(gen, DoubleGen) and not isinstance(gen, DecimalGen) + + if must_test_sum_aggregation(order_gen): + query_parts.append('SUM(oby) OVER ' + window + ' AS sum_col') + + # The option to IGNORE NULLS in NTH_VALUE is not available prior to Spark 3.2.1. + if spark_version() >= "3.2.1": + query_parts.append('NTH_VALUE(oby, 1) IGNORE NULLS OVER ' + window + ' AS nth_1_ignore_nulls') + + assert_gpu_and_cpu_are_equal_sql( + lambda spark: gen_df(spark, + StructGen([('p', RepeatSeqGen(part_gen, length=100)), + ('oby', order_gen)], nullable=False), + length=1024*14), + "window_agg_table", + 'SELECT ' + + ', '.join(query_parts) + + ' FROM window_agg_table ', + validate_execs_in_gpu_plan=['GpuRunningWindowExec'], + conf=conf) + + # Test that we can do a running window sum on floats and doubles and decimal. This becomes problematic because we do the agg in parallel # which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations. # We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have From 9b1ac51d80768816737f16d983855a20357c5f9b Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 1 Nov 2023 15:38:48 -0700 Subject: [PATCH 19/19] Partitioned tests for batched range window functions. Signed-off-by: MithunR --- .../src/main/python/window_function_test.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 02abb213fe0..e03f5b25a60 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -820,6 +820,50 @@ def test_window_running_float_decimal_sum(batch_size): validate_execs_in_gpu_plan = ['GpuRunningWindowExec'], conf = conf) + +@approximate_float +@ignore_order(local=True) +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Test different batch sizes. +def test_range_running_window_float_decimal_sum_runs_batched(batch_size): + """ + This test is very similar to test_window_running_float_decimal_sum, except that it checks that RANGE window SUM + aggregations can run in batched mode. + Note that in the RANGE case, the test needs to check the case where there are repeats in the order-by column. + This covers the case where `CURRENT ROW` might refer to multiple rows in the order-by column. This does introduce + the possibility of non-deterministic results, because the ordering with repeated values isn't deterministic. + This is mitigated by aggregating on the same column as the order-by column, such that the same value is aggregated + for the repeated keys. + """ + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.variableFloatAgg.enabled': True, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + + def window(oby_column): + return "(PARTITION BY p ORDER BY " + oby_column + " RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + query_parts = [ + 'p', 'oby', + 'SUM(CAST(oby AS DOUBLE)) OVER ' + window('CAST(oby AS DOUBLE)') + ' AS short_double_sum', + 'SUM(ABS(dbl)) OVER ' + window('ABS(dbl)') + ' AS double_sum', + 'SUM(CAST(oby AS FLOAT)) OVER ' + window('CAST(oby AS FLOAT)') + ' AS short_float_sum', + 'SUM(ABS(flt)) OVER ' + window('ABS(flt)') + ' AS float_sum', + 'SUM(CAST(oby AS DECIMAL(6,1))) OVER ' + window('CAST(oby AS DECIMAL(6,1))') + ' AS dec_sum' + ] + + gen = StructGen([('p', RepeatSeqGen(int_gen, length=1000)), + ('oby', short_gen), + ('flt', float_gen), + ('dbl', double_gen)], nullable=False) + assert_gpu_and_cpu_are_equal_sql( + lambda spark: gen_df(spark, gen, length=1024 * 14), + "window_agg_table", + 'SELECT ' + + ', '.join(query_parts) + + ' FROM window_agg_table ', + validate_execs_in_gpu_plan=['GpuRunningWindowExec'], + conf=conf) + + # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally @ignore_order(local=True)