From ae3eec0163aaea9797d50aa9bfe3717093b0577c Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 14 Nov 2023 13:53:10 -0800 Subject: [PATCH 01/23] Initial swipe: Batched, bounded windows. --- .../rapids/GpuBatchedBoundedWindowExec.scala | 112 +++++++++++++++++ .../nvidia/spark/rapids/GpuWindowExec.scala | 117 +++++++++++++++--- 2 files changed, 213 insertions(+), 16 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala new file mode 100644 index 00000000000..37ebd7be585 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.NvtxColor +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +class GpuBatchedBoundedWindowIterator( + input: Iterator[ColumnarBatch], + override val boundWindowOps: Seq[GpuExpression], + override val boundPartitionSpec: Seq[GpuExpression], + override val boundOrderSpec: Seq[SortOrder], + val outputTypes: Array[DataType], + numOutputBatches: GpuMetric, + numOutputRows: GpuMetric, + opTime: GpuMetric) extends Iterator[ColumnarBatch] with BasicWindowCalc { + + override def isRunningBatched: Boolean = false // Not "Running Window" optimized. + // This is strictly for batching. + + override def hasNext: Boolean = onDeck.isDefined || input.hasNext + + var onDeck: Option[SpillableColumnarBatch] = None + + override def next(): ColumnarBatch = { + val cbSpillable = onDeck match { + case Some(x) => + onDeck = None + x + case _ => + getNext() + } + withRetryNoSplit(cbSpillable) { _ => + withResource(cbSpillable.getColumnarBatch()) { cb => + withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => + val ret = withResource(computeBasicWindow(cb)) { cols => + convertToBatch(outputTypes, cols) + } + GpuColumnVector.debug("Output: ", ret) + numOutputBatches += 1 + numOutputRows += ret.numRows() + ret + } + } + } + } + + def getNext(): SpillableColumnarBatch = { + SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } + +} + +/// Window Exec used exclusively for batching bounded window functions. +class GpuBatchedBoundedWindowExec( + override val windowOps: Seq[NamedExpression], + override val gpuPartitionSpec: Seq[Expression], + override val gpuOrderSpec: Seq[SortOrder], + override val child: SparkPlan)( + override val cpuPartitionSpec: Seq[Expression], + override val cpuOrderSpec: Seq[SortOrder] +) extends GpuWindowExec(windowOps, + gpuPartitionSpec, + gpuOrderSpec, + child)(cpuPartitionSpec, cpuOrderSpec) { + + override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: Nil + + override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching) + + override def outputBatching: CoalesceGoal = if (gpuPartitionSpec.isEmpty) { + RequireSingleBatch + } else { + BatchedByKey(gpuPartitionOrdering)(cpuPartitionOrdering) + } + + override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputBatches = gpuLongMetric(GpuMetric.NUM_OUTPUT_BATCHES) + val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS) + val opTime = gpuLongMetric(GpuMetric.OP_TIME) + + val boundWindowOps = GpuBindReferences.bindGpuReferences(windowOps, child.output) + val boundPartitionSpec = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, child.output) + val boundOrderSpec = GpuBindReferences.bindReferences(gpuOrderSpec, child.output) + + child.executeColumnar().mapPartitions { iter => + new GpuBatchedBoundedWindowIterator(iter, boundWindowOps, boundPartitionSpec, + boundOrderSpec, output.map(_.dataType).toArray, numOutputBatches, numOutputRows, opTime) + } + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 760196dbf94..3b3c7afb236 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -155,6 +155,19 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W s"Found unexpected expression $other in window exec ${other.getClass}") } + val allBoundedBatched = fixedUpWindowOps.forall { + case GpuAlias(GpuWindowExpression(_, spec), _) => + // Check that the spec is bounded. + GpuWindowExec.isBoundedRowsWindowAndBatchable(spec) + case GpuAlias(_: AttributeReference, _) | _: AttributeReference => + true + case other => + throw new IllegalArgumentException( + s"Unexpected expression $other in window exec ${other.getClass}") + } + + println(s"allBoundedBatched == ${allBoundedBatched}") + val input = if (isPreNeeded) { GpuProjectExec(pre.toList, childPlans.head.convertIfNeeded())() } else { @@ -169,8 +182,14 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W input, getPartitionSpecs, getOrderSpecs) + } else if (allBoundedBatched) { + new GpuBatchedBoundedWindowExec( + fixedUpWindowOps, + partitionSpec.map(_.convertToGpu()), + orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), + input)(getPartitionSpecs, getOrderSpecs) } else { - GpuWindowExec( + new GpuWindowExec( fixedUpWindowOps, partitionSpec.map(_.convertToGpu()), orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), @@ -233,11 +252,19 @@ class GpuWindowExecMeta(windowExec: WindowExec, } case class BatchedOps(running: Seq[NamedExpression], - unboundedToUnbounded: Seq[NamedExpression], - passThrough: Seq[NamedExpression]) { + unboundedToUnbounded: Seq[NamedExpression], + bounded: Seq[NamedExpression], + passThrough: Seq[NamedExpression]) { + def getRunningExpressionsWithPassthrough: Seq[NamedExpression] = passThrough ++ running + def getDoublePassExpressionsWithRunningAsPassthrough: Seq[NamedExpression] = + passThrough ++ unboundedToUnbounded ++ running.map(_.toAttribute) + + def getBoundedExpressionsWithTheRestAsPassthrough: Seq[NamedExpression] = + passThrough ++ bounded ++ (unboundedToUnbounded ++ running).map(_.toAttribute) + private def getRunningWindowExec( gpuPartitionSpec: Seq[Expression], gpuOrderSpec: Seq[SortOrder], @@ -262,6 +289,17 @@ case class BatchedOps(running: Seq[NamedExpression], gpuOrderSpec, child)(cpuPartitionSpec, cpuOrderSpec) + private def getBatchedBoundedWindowExec(gpuPartitionSpec: Seq[Expression], + gpuOrderSpec: Seq[SortOrder], + child: SparkPlan, + cpuPartitionSpec: Seq[Expression], + cpuOrderSpec: Seq[SortOrder]): GpuExec = + GpuBatchedBoundedWindowExec(getBoundedExpressionsWithTheRestAsPassthrough, + gpuPartitionSpec, + gpuOrderSpec, + child) + (cpuPartitionSpec, cpuOrderSpec) + def getWindowExec( gpuPartitionSpec: Seq[Expression], gpuOrderSpec: Seq[SortOrder], @@ -270,26 +308,46 @@ case class BatchedOps(running: Seq[NamedExpression], cpuOrderSpec: Seq[SortOrder]): GpuExec = { // The order of these matter so we can pass the output of the first through the second one if (hasRunning) { - val running = getRunningWindowExec(gpuPartitionSpec, gpuOrderSpec, child, + val runningExec = getRunningWindowExec(gpuPartitionSpec, gpuOrderSpec, child, cpuPartitionSpec, cpuOrderSpec) if (hasDoublePass) { - getDoublePassWindowExec(gpuPartitionSpec, gpuOrderSpec, running, + val doublePassExec = getDoublePassWindowExec(gpuPartitionSpec, gpuOrderSpec, runningExec, cpuPartitionSpec, cpuOrderSpec) + if (hasBounded) { + getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, doublePassExec, + cpuPartitionSpec, cpuOrderSpec) + } else { + doublePassExec + } } else { - running + if (hasBounded) { + getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, runningExec, + cpuPartitionSpec, cpuOrderSpec) + } + else { + runningExec + } } } else { - getDoublePassWindowExec(gpuPartitionSpec, gpuOrderSpec, child, - cpuPartitionSpec, cpuOrderSpec) + if (hasDoublePass) { + val doublePassExec = getDoublePassWindowExec(gpuPartitionSpec, gpuOrderSpec, child, + cpuPartitionSpec, cpuOrderSpec) + if (hasBounded) { + getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, doublePassExec, + cpuPartitionSpec, cpuOrderSpec) + } else { + doublePassExec + } + } else { + getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, child, + cpuPartitionSpec, cpuOrderSpec) + } } } def hasRunning: Boolean = running.nonEmpty - - def getDoublePassExpressionsWithRunningAsPassthrough: Seq[NamedExpression] = - passThrough ++ unboundedToUnbounded ++ running.map(_.toAttribute) - def hasDoublePass: Boolean = unboundedToUnbounded.nonEmpty + def hasBounded: Boolean = bounded.nonEmpty } object GpuWindowExec { @@ -484,6 +542,30 @@ object GpuWindowExec { case _ => false } + /** + * Checks whether the window spec is both ROWS-based and bounded. + * Window functions of this spec can possibly still be batched. + */ + def isBoundedRowsWindowAndBatchable(spec: GpuWindowSpecDefinition): Boolean = spec match { + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuLiteral(prec: Int, _), + GpuLiteral(foll: Int, _))) => prec <= 0 && foll >= 0 + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuSpecialFrameBoundary(CurrentRow), + GpuLiteral(foll: Int, _))) => foll >= 0 + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuLiteral(prec: Int,_), + GpuSpecialFrameBoundary(CurrentRow))) => prec <= 0 + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuSpecialFrameBoundary(CurrentRow), + GpuSpecialFrameBoundary(CurrentRow))) => true + case _ => false + } + def isUnboundedToUnboundedWindow(spec: GpuWindowSpecDefinition): Boolean = spec match { case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(_, GpuSpecialFrameBoundary(UnboundedPreceding), @@ -516,6 +598,7 @@ object GpuWindowExec { def splitBatchedOps(windowOps: Seq[NamedExpression]): BatchedOps = { val running = ArrayBuffer[NamedExpression]() val doublePass = ArrayBuffer[NamedExpression]() + val batchedBounded = ArrayBuffer[NamedExpression]() val passThrough = ArrayBuffer[NamedExpression]() windowOps.foreach { case expr@GpuAlias(GpuWindowExpression(func, spec), _) => @@ -523,6 +606,8 @@ object GpuWindowExec { running.append(expr) } else if (isBatchedUnboundedToUnboundedFunc(func, spec)) { doublePass.append(expr) + } else if (isBoundedRowsWindowAndBatchable(spec)) { + batchedBounded.append(expr) } else { throw new IllegalArgumentException( s"Found unexpected expression $expr in window exec ${expr.getClass}") @@ -1264,10 +1349,10 @@ trait BasicWindowCalc { // `orderByPositions` and `partByPositions` are the positions in `initialProjections` for // the order by columns and the part by columns respectively. private val (initialProjections, - passThrough, - aggregations, - orderByPositions, - partByPositions) = { + passThrough, + aggregations, + orderByPositions, + partByPositions) = { val initialProjections = ArrayBuffer[Expression]() val dedupedInitialProjections = mutable.HashMap[Expression, Int]() From 2aa3f5d1eff10bfe62a5d0a01dd8adfd41255c3e Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 14 Nov 2023 15:36:05 -0800 Subject: [PATCH 02/23] Bounded windows routed to BatchedBoundedWindowExec. GpuBatchedBoundedWindowExec is currently identical to GpuWindowExec, in that it does no batching yet. After rerouting, the tests all seem to still pass. --- .../rapids/GpuBatchedBoundedWindowExec.scala | 1 - .../nvidia/spark/rapids/GpuWindowExec.scala | 40 ++++++------------- 2 files changed, 12 insertions(+), 29 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index 37ebd7be585..bbe1986d718 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -57,7 +57,6 @@ class GpuBatchedBoundedWindowIterator( val ret = withResource(computeBasicWindow(cb)) { cols => convertToBatch(outputTypes, cols) } - GpuColumnVector.debug("Output: ", ret) numOutputBatches += 1 numOutputRows += ret.numRows() ret diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 3b3c7afb236..7b48eb9e28e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -155,19 +155,6 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W s"Found unexpected expression $other in window exec ${other.getClass}") } - val allBoundedBatched = fixedUpWindowOps.forall { - case GpuAlias(GpuWindowExpression(_, spec), _) => - // Check that the spec is bounded. - GpuWindowExec.isBoundedRowsWindowAndBatchable(spec) - case GpuAlias(_: AttributeReference, _) | _: AttributeReference => - true - case other => - throw new IllegalArgumentException( - s"Unexpected expression $other in window exec ${other.getClass}") - } - - println(s"allBoundedBatched == ${allBoundedBatched}") - val input = if (isPreNeeded) { GpuProjectExec(pre.toList, childPlans.head.convertIfNeeded())() } else { @@ -182,12 +169,6 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W input, getPartitionSpecs, getOrderSpecs) - } else if (allBoundedBatched) { - new GpuBatchedBoundedWindowExec( - fixedUpWindowOps, - partitionSpec.map(_.convertToGpu()), - orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), - input)(getPartitionSpecs, getOrderSpecs) } else { new GpuWindowExec( fixedUpWindowOps, @@ -294,19 +275,19 @@ case class BatchedOps(running: Seq[NamedExpression], child: SparkPlan, cpuPartitionSpec: Seq[Expression], cpuOrderSpec: Seq[SortOrder]): GpuExec = - GpuBatchedBoundedWindowExec(getBoundedExpressionsWithTheRestAsPassthrough, - gpuPartitionSpec, - gpuOrderSpec, - child) - (cpuPartitionSpec, cpuOrderSpec) + new GpuBatchedBoundedWindowExec(getBoundedExpressionsWithTheRestAsPassthrough, + gpuPartitionSpec, + gpuOrderSpec, + child)(cpuPartitionSpec, cpuOrderSpec) - def getWindowExec( + def getWindowExec( gpuPartitionSpec: Seq[Expression], gpuOrderSpec: Seq[SortOrder], child: SparkPlan, cpuPartitionSpec: Seq[Expression], cpuOrderSpec: Seq[SortOrder]): GpuExec = { // The order of these matter so we can pass the output of the first through the second one + // TODO: Use a separate function for everything below the hasRunning check. if (hasRunning) { val runningExec = getRunningWindowExec(gpuPartitionSpec, gpuOrderSpec, child, cpuPartitionSpec, cpuOrderSpec) @@ -592,8 +573,11 @@ object GpuWindowExec { case _ => false } - def isBatchedFunc(func: Expression, spec: GpuWindowSpecDefinition): Boolean = - isBatchedRunningFunc(func, spec) || isBatchedUnboundedToUnboundedFunc(func, spec) + def isBatchedFunc(func: Expression, spec: GpuWindowSpecDefinition): Boolean = { + isBatchedRunningFunc(func, spec) || + isBatchedUnboundedToUnboundedFunc(func, spec) || + isBoundedRowsWindowAndBatchable(spec) + } def splitBatchedOps(windowOps: Seq[NamedExpression]): BatchedOps = { val running = ArrayBuffer[NamedExpression]() @@ -620,7 +604,7 @@ object GpuWindowExec { throw new IllegalArgumentException( s"Found unexpected expression $other in window exec ${other.getClass}") } - BatchedOps(running.toSeq, doublePass.toSeq, passThrough.toSeq) + BatchedOps(running.toSeq, doublePass.toSeq, batchedBounded, passThrough.toSeq) } } From cb4586628a34289c5e82f6e8bc69086665afc8c8 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 20 Nov 2023 14:27:28 -0800 Subject: [PATCH 03/23] First swipe at new version. Compiling. Yet to test. Signed-off-by: MithunR --- .../rapids/GpuBatchedBoundedWindowExec.scala | 178 +++++++++++++++++- .../nvidia/spark/rapids/GpuWindowExec.scala | 48 ++++- 2 files changed, 214 insertions(+), 12 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index bbe1986d718..7b1b368a68f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -16,9 +16,8 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.NvtxColor +import ai.rapids.cudf.{ColumnVector => CudfColumnVector, NvtxColor, Table => CudfTable} import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder} @@ -32,6 +31,8 @@ class GpuBatchedBoundedWindowIterator( override val boundPartitionSpec: Seq[GpuExpression], override val boundOrderSpec: Seq[SortOrder], val outputTypes: Array[DataType], + maxPreceding: Int, + maxFollowing: Int, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric) extends Iterator[ColumnarBatch] with BasicWindowCalc { @@ -39,10 +40,14 @@ class GpuBatchedBoundedWindowIterator( override def isRunningBatched: Boolean = false // Not "Running Window" optimized. // This is strictly for batching. - override def hasNext: Boolean = onDeck.isDefined || input.hasNext + override def hasNext: Boolean = numUnprocessedInCache > 0 || input.hasNext - var onDeck: Option[SpillableColumnarBatch] = None + var cached: Option[Array[CudfColumnVector]] = None // For processing with the next batch. + // TODO: Rename numUnprocessedInCache to numIncomplete. + private var numUnprocessedInCache: Int = 0 // numRows at the bottom not processed completely. + private var numPrecedingRowsAdded: Int = 0 // numRows at the top, added for preceding context. + /* override def next(): ColumnarBatch = { val cbSpillable = onDeck match { case Some(x) => @@ -55,6 +60,7 @@ class GpuBatchedBoundedWindowIterator( withResource(cbSpillable.getColumnarBatch()) { cb => withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => val ret = withResource(computeBasicWindow(cb)) { cols => + println(s"CALEB: cols size: ${cols.length}, num-rows == ${cols(0).getRowCount}") convertToBatch(outputTypes, cols) } numOutputBatches += 1 @@ -64,11 +70,161 @@ class GpuBatchedBoundedWindowIterator( } } } + */ - def getNext(): SpillableColumnarBatch = { - SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY) + var inputTypes: Option[Array[DataType]] = None + + // TODO: Move into getNextInputBatch. + private def concatenateColumns(cached: Array[CudfColumnVector], + freshBatchTable: CudfTable) + : Array[CudfColumnVector] = { + + if (cached.length != freshBatchTable.getNumberOfColumns) { + throw new IllegalArgumentException("Expected the same number of columns " + + "in input batch and cached batch.") + } + cached.zipWithIndex.map { case (cachedCol, idx) => + CudfColumnVector.concatenate(cachedCol, freshBatchTable.getColumn(idx)) + } + } + + // TODO: Consider returning ColumnarBatch instead. + private def getNextInputBatch: SpillableColumnarBatch = { + + def optionallySetInputTypes(inputCB: ColumnarBatch): Unit = { + if (inputTypes.isEmpty) { + inputTypes = Some(GpuColumnVector.extractTypes(inputCB)) + } + } + + // Reads fresh batch from iterator, initializes input data-types if necessary. + def getFreshInputBatch: ColumnarBatch = { + val fresh_batch = input.next() + optionallySetInputTypes(fresh_batch) + fresh_batch + } + + // Clears cached column vectors, after consumption. + def clearCached(): Unit = { + cached.foreach(_.foreach(_.close)) + cached = None + } + + // Either cached has unprocessed rows, or input.hasNext(). + if (input.hasNext) { + if (cached.isDefined) { + // Cached input AND new input rows exist. Return concat-ed rows. + withResource(getFreshInputBatch) { freshBatchCB => + withResource(GpuColumnVector.from(freshBatchCB)) { freshBatchTable => + val concat = concatenateColumns(cached.get, freshBatchTable) + clearCached() + SpillableColumnarBatch(convertToBatch(inputTypes.get, concat), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } + } + } else { + // No cached input available. Return fresh input rows, only. + SpillableColumnarBatch(getFreshInputBatch, + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } + } + else { + // No fresh input available. Return cached input. + val cachedCB = convertToBatch(inputTypes.get, cached.get) + clearCached() + SpillableColumnarBatch(cachedCB, + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } + } + + /** + * Helper to trim specified number of rows off the top and bottom, + * of all specified columns. + */ + private def trim(columns: Array[CudfColumnVector], + offTheTop: Int, + offTheBottom: Int): Array[CudfColumnVector] = { + + def checkValidSizes(col: CudfColumnVector): Unit = + if (offTheTop >= col.getRowCount || + offTheBottom >= col.getRowCount || + (offTheTop + offTheBottom) > col.getRowCount) { + throw new IllegalArgumentException(s"Cannot trim column of size ${col.getRowCount} by " + + s"$offTheTop rows at the top, and $offTheBottom rows at the bottom.") + } + + columns.map{ col => + checkValidSizes(col) + col.subVector(offTheTop, col.getRowCount.toInt - offTheBottom) + } } + private def resetInputCache(newCache: Option[Array[CudfColumnVector]], + newUnprocessed: Int, // TODO: Not required here. Already set prior. + newPrecedingAdded: Int): Unit= { + cached.foreach(_.foreach(_.close)) + cached = newCache + numUnprocessedInCache = newUnprocessed + numPrecedingRowsAdded = newPrecedingAdded + } + + override def next(): ColumnarBatch = { + var outputBatch: ColumnarBatch = null + while (outputBatch == null && hasNext) { // TODO: && input.hasNext? Simply hasNext? + withResource(getNextInputBatch) { inputCbSpillable => + withResource(inputCbSpillable.getColumnarBatch()) { inputCB => + withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => + withResource(computeBasicWindow(inputCB)) { outputCols => + val inputRowCount = inputCB.numRows() + + // TODO: Move this to the end of getNextInputBatch, when resetting. + // Cleared after getNextInputBatch + // cached.foreach(_.foreach(_.close)) + // cached = None + + val noMoreInput = !input.hasNext + + numUnprocessedInCache = if (noMoreInput) { + // If there are no more input rows expected, + // this is the last output batch. + 0 + } else { + // More input rows expected. The last `maxFollowing` rows can't be finalized. + // Cannot exceed `inputRowCount`. + maxFollowing min inputRowCount + } + + // Trim output. Remove numPrecedingRowsAdded from the top, + // and numUnprocessedInCache from the bottom. + // TODO: Optimize. If no rows can be output, skip calling the kernel. + + if (numPrecedingRowsAdded + numUnprocessedInCache >= inputRowCount) { + println("Not enough rows! Cannot output a batch.") + } + else { + val trimmedOutputCols = trim(outputCols, + numPrecedingRowsAdded, numUnprocessedInCache) + outputBatch = convertToBatch(outputTypes, trimmedOutputCols) + } + + // Min 0, Max inputRowCount + // numPrecedingRowsAdded = (numUnprocessedInCache + maxFollowing) max inputRowCount + numPrecedingRowsAdded = (numUnprocessedInCache + maxPreceding) min inputRowCount + val inputCols = Range(0, inputCB.numCols()).map { + inputCB.column(_).asInstanceOf[GpuColumnVector].getBase + }.toArray + + val newCached = trim(inputCols, inputRowCount - numPrecedingRowsAdded, 0) + resetInputCache(Some(newCached), numUnprocessedInCache, numPrecedingRowsAdded) + } + } + } + } + } + numOutputBatches += 1 + numOutputRows += outputBatch.numRows() + outputBatch + } } /// Window Exec used exclusively for batching bounded window functions. @@ -78,13 +234,16 @@ class GpuBatchedBoundedWindowExec( override val gpuOrderSpec: Seq[SortOrder], override val child: SparkPlan)( override val cpuPartitionSpec: Seq[Expression], - override val cpuOrderSpec: Seq[SortOrder] + override val cpuOrderSpec: Seq[SortOrder], + maxPreceding: Integer, + maxFollowing: Integer ) extends GpuWindowExec(windowOps, gpuPartitionSpec, gpuOrderSpec, child)(cpuPartitionSpec, cpuOrderSpec) { - override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: Nil + override def otherCopyArgs: Seq[AnyRef] = + cpuPartitionSpec :: cpuOrderSpec :: maxPreceding :: maxFollowing :: Nil override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching) @@ -105,7 +264,8 @@ class GpuBatchedBoundedWindowExec( child.executeColumnar().mapPartitions { iter => new GpuBatchedBoundedWindowIterator(iter, boundWindowOps, boundPartitionSpec, - boundOrderSpec, output.map(_.dataType).toArray, numOutputBatches, numOutputRows, opTime) + boundOrderSpec, output.map(_.dataType).toArray, maxPreceding, maxFollowing, + numOutputBatches, numOutputRows, opTime) } } } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 7b48eb9e28e..0ae5980a19f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -246,6 +246,22 @@ case class BatchedOps(running: Seq[NamedExpression], def getBoundedExpressionsWithTheRestAsPassthrough: Seq[NamedExpression] = passThrough ++ bounded ++ (unboundedToUnbounded ++ running).map(_.toAttribute) + def getMaxBoundedPrecedingAndFollowing: (Int, Int) = { + // All bounded window expressions should have window bound window specs. + val boundedWindowSpecs = bounded.map{ + case GpuAlias(GpuWindowExpression(_, spec), _) => spec + case other => throw new IllegalArgumentException("Expected a window-expression " + + s" found $other") + } + val precedingAndFollowing = boundedWindowSpecs.map( + GpuWindowExec.getBoundedWindowPrecedingAndFollowing + ) +// (precedingAndFollowing.map{ case (prec, _) => prec }.max, +// precedingAndFollowing.map{ case (_, foll) => foll }.max) + (precedingAndFollowing.map{ _._1 }.max, + precedingAndFollowing.map{ _._2 }.max) + } + private def getRunningWindowExec( gpuPartitionSpec: Seq[Expression], gpuOrderSpec: Seq[SortOrder], @@ -274,13 +290,16 @@ case class BatchedOps(running: Seq[NamedExpression], gpuOrderSpec: Seq[SortOrder], child: SparkPlan, cpuPartitionSpec: Seq[Expression], - cpuOrderSpec: Seq[SortOrder]): GpuExec = + cpuOrderSpec: Seq[SortOrder]): GpuExec = { + val (prec@_, foll@_) = getMaxBoundedPrecedingAndFollowing new GpuBatchedBoundedWindowExec(getBoundedExpressionsWithTheRestAsPassthrough, gpuPartitionSpec, gpuOrderSpec, - child)(cpuPartitionSpec, cpuOrderSpec) + child/*, prec, foll*/)( + cpuPartitionSpec, cpuOrderSpec, prec, foll) + } - def getWindowExec( + def getWindowExec( gpuPartitionSpec: Seq[Expression], gpuOrderSpec: Seq[SortOrder], child: SparkPlan, @@ -526,6 +545,7 @@ object GpuWindowExec { /** * Checks whether the window spec is both ROWS-based and bounded. * Window functions of this spec can possibly still be batched. + * TODO: Check if this can be rewritten with getBoundedWindowPrecedingAndFollowing(). */ def isBoundedRowsWindowAndBatchable(spec: GpuWindowSpecDefinition): Boolean = spec match { case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( @@ -547,6 +567,28 @@ object GpuWindowExec { case _ => false } + def getBoundedWindowPrecedingAndFollowing(spec: GpuWindowSpecDefinition): (Int, Int) = + spec match { + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuLiteral(prec: Int, _), + GpuLiteral(foll: Int, _))) => (prec, foll) + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuSpecialFrameBoundary(CurrentRow), + GpuLiteral(foll: Int, _))) => (0, foll) + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuLiteral(prec: Int,_), + GpuSpecialFrameBoundary(CurrentRow))) => (prec, 0) + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuSpecialFrameBoundary(CurrentRow), + GpuSpecialFrameBoundary(CurrentRow))) => (0, 0) + case _ => throw new IllegalArgumentException("Expected bounded ROWS spec, " + + s"found $spec") // (null, null) // Can't reach here. + } + def isUnboundedToUnboundedWindow(spec: GpuWindowSpecDefinition): Boolean = spec match { case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame(_, GpuSpecialFrameBoundary(UnboundedPreceding), From c3cb1a45efb035b0c0ddbfa5554a2cf4840c3b4d Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 21 Nov 2023 21:42:49 -0800 Subject: [PATCH 04/23] Mostly working, but buggy. Losing some rows in the output. --- .../rapids/GpuBatchedBoundedWindowExec.scala | 30 ------------------- .../nvidia/spark/rapids/GpuWindowExec.scala | 4 +-- 2 files changed, 1 insertion(+), 33 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index 7b1b368a68f..b2edbb1fd2d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -47,31 +47,6 @@ class GpuBatchedBoundedWindowIterator( private var numUnprocessedInCache: Int = 0 // numRows at the bottom not processed completely. private var numPrecedingRowsAdded: Int = 0 // numRows at the top, added for preceding context. - /* - override def next(): ColumnarBatch = { - val cbSpillable = onDeck match { - case Some(x) => - onDeck = None - x - case _ => - getNext() - } - withRetryNoSplit(cbSpillable) { _ => - withResource(cbSpillable.getColumnarBatch()) { cb => - withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => - val ret = withResource(computeBasicWindow(cb)) { cols => - println(s"CALEB: cols size: ${cols.length}, num-rows == ${cols(0).getRowCount}") - convertToBatch(outputTypes, cols) - } - numOutputBatches += 1 - numOutputRows += ret.numRows() - ret - } - } - } - } - */ - var inputTypes: Option[Array[DataType]] = None // TODO: Move into getNextInputBatch. @@ -177,11 +152,6 @@ class GpuBatchedBoundedWindowIterator( withResource(computeBasicWindow(inputCB)) { outputCols => val inputRowCount = inputCB.numRows() - // TODO: Move this to the end of getNextInputBatch, when resetting. - // Cleared after getNextInputBatch - // cached.foreach(_.foreach(_.close)) - // cached = None - val noMoreInput = !input.hasNext numUnprocessedInCache = if (noMoreInput) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 0ae5980a19f..1fc80d1269f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -256,9 +256,7 @@ case class BatchedOps(running: Seq[NamedExpression], val precedingAndFollowing = boundedWindowSpecs.map( GpuWindowExec.getBoundedWindowPrecedingAndFollowing ) -// (precedingAndFollowing.map{ case (prec, _) => prec }.max, -// precedingAndFollowing.map{ case (_, foll) => foll }.max) - (precedingAndFollowing.map{ _._1 }.max, + (precedingAndFollowing.map{ p => Math.abs(p._1) }.max, // Only Negative values supported. precedingAndFollowing.map{ _._2 }.max) } From 1230fed8d54ad8bec49e7de64f652a1aebd41ac6 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 1 Dec 2023 10:48:27 -0800 Subject: [PATCH 05/23] Fixed up the math. Looks to be working at 150M. --- .../spark/rapids/GpuBatchedBoundedWindowExec.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index b2edbb1fd2d..f374f863179 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -178,13 +178,16 @@ class GpuBatchedBoundedWindowIterator( } // Min 0, Max inputRowCount - // numPrecedingRowsAdded = (numUnprocessedInCache + maxFollowing) max inputRowCount - numPrecedingRowsAdded = (numUnprocessedInCache + maxPreceding) min inputRowCount + // TODO: The following seems wrong. numPrecedingRowsAdded can't include numUnProcessed +// numPrecedingRowsAdded = (numUnprocessedInCache + maxPreceding) min inputRowCount + numPrecedingRowsAdded = maxPreceding min (inputRowCount - numUnprocessedInCache) val inputCols = Range(0, inputCB.numCols()).map { inputCB.column(_).asInstanceOf[GpuColumnVector].getBase }.toArray - val newCached = trim(inputCols, inputRowCount - numPrecedingRowsAdded, 0) + val newCached = trim(inputCols, + inputRowCount - (numPrecedingRowsAdded + numUnprocessedInCache), + 0) resetInputCache(Some(newCached), numUnprocessedInCache, numPrecedingRowsAdded) } } From 159dc62f27d29e842504390f4568af57159dd516 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 4 Dec 2023 11:38:32 -0800 Subject: [PATCH 06/23] Minor refactor/cleanup. --- .../rapids/GpuBatchedBoundedWindowExec.scala | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index f374f863179..602d918d477 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -49,23 +49,15 @@ class GpuBatchedBoundedWindowIterator( var inputTypes: Option[Array[DataType]] = None - // TODO: Move into getNextInputBatch. - private def concatenateColumns(cached: Array[CudfColumnVector], - freshBatchTable: CudfTable) - : Array[CudfColumnVector] = { - - if (cached.length != freshBatchTable.getNumberOfColumns) { - throw new IllegalArgumentException("Expected the same number of columns " + - "in input batch and cached batch.") - } - cached.zipWithIndex.map { case (cachedCol, idx) => - CudfColumnVector.concatenate(cachedCol, freshBatchTable.getColumn(idx)) - } + // Clears cached column vectors, after consumption. + def clearCached(): Unit = { + cached.foreach(_.foreach(_.close)) + cached = None } - // TODO: Consider returning ColumnarBatch instead. private def getNextInputBatch: SpillableColumnarBatch = { - + // Sets column batch types using the types cached from the + // first input column read. def optionallySetInputTypes(inputCB: ColumnarBatch): Unit = { if (inputTypes.isEmpty) { inputTypes = Some(GpuColumnVector.extractTypes(inputCB)) @@ -79,10 +71,17 @@ class GpuBatchedBoundedWindowIterator( fresh_batch } - // Clears cached column vectors, after consumption. - def clearCached(): Unit = { - cached.foreach(_.foreach(_.close)) - cached = None + def concatenateColumns(cached: Array[CudfColumnVector], + freshBatchTable: CudfTable) + : Array[CudfColumnVector] = { + + if (cached.length != freshBatchTable.getNumberOfColumns) { + throw new IllegalArgumentException("Expected the same number of columns " + + "in input batch and cached batch.") + } + cached.zipWithIndex.map { case (cachedCol, idx) => + CudfColumnVector.concatenate(cachedCol, freshBatchTable.getColumn(idx)) + } } // Either cached has unprocessed rows, or input.hasNext(). @@ -145,7 +144,7 @@ class GpuBatchedBoundedWindowIterator( override def next(): ColumnarBatch = { var outputBatch: ColumnarBatch = null - while (outputBatch == null && hasNext) { // TODO: && input.hasNext? Simply hasNext? + while (outputBatch == null && hasNext) { withResource(getNextInputBatch) { inputCbSpillable => withResource(inputCbSpillable.getColumnarBatch()) { inputCB => withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => @@ -177,9 +176,6 @@ class GpuBatchedBoundedWindowIterator( outputBatch = convertToBatch(outputTypes, trimmedOutputCols) } - // Min 0, Max inputRowCount - // TODO: The following seems wrong. numPrecedingRowsAdded can't include numUnProcessed -// numPrecedingRowsAdded = (numUnprocessedInCache + maxPreceding) min inputRowCount numPrecedingRowsAdded = maxPreceding min (inputRowCount - numUnprocessedInCache) val inputCols = Range(0, inputCB.numCols()).map { inputCB.column(_).asInstanceOf[GpuColumnVector].getBase From 392e7c60ffa140505a8e489bfe88b7185e2908eb Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 4 Dec 2023 14:34:00 -0800 Subject: [PATCH 07/23] Clearing cache on task completion. --- .../spark/rapids/GpuBatchedBoundedWindowExec.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index 602d918d477..cf710b9d15e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -18,7 +18,9 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{ColumnVector => CudfColumnVector, NvtxColor, Table => CudfTable} import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder} import org.apache.spark.sql.execution.SparkPlan @@ -47,10 +49,16 @@ class GpuBatchedBoundedWindowIterator( private var numUnprocessedInCache: Int = 0 // numRows at the bottom not processed completely. private var numPrecedingRowsAdded: Int = 0 // numRows at the top, added for preceding context. + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + clearCached() + } + } + var inputTypes: Option[Array[DataType]] = None // Clears cached column vectors, after consumption. - def clearCached(): Unit = { + private def clearCached(): Unit = { cached.foreach(_.foreach(_.close)) cached = None } From 5dbbdd26ef647d6118c70393965b5635b29464cd Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 4 Dec 2023 20:18:54 -0800 Subject: [PATCH 08/23] Fixed leak from trim(). --- .../rapids/GpuBatchedBoundedWindowExec.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index cf710b9d15e..794cdd9e489 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -21,6 +21,7 @@ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder} import org.apache.spark.sql.execution.SparkPlan @@ -37,7 +38,7 @@ class GpuBatchedBoundedWindowIterator( maxFollowing: Int, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, - opTime: GpuMetric) extends Iterator[ColumnarBatch] with BasicWindowCalc { + opTime: GpuMetric) extends Iterator[ColumnarBatch] with BasicWindowCalc with Logging { override def isRunningBatched: Boolean = false // Not "Running Window" optimized. // This is strictly for batching. @@ -45,7 +46,7 @@ class GpuBatchedBoundedWindowIterator( override def hasNext: Boolean = numUnprocessedInCache > 0 || input.hasNext var cached: Option[Array[CudfColumnVector]] = None // For processing with the next batch. - // TODO: Rename numUnprocessedInCache to numIncomplete. + private var numUnprocessedInCache: Int = 0 // numRows at the bottom not processed completely. private var numPrecedingRowsAdded: Int = 0 // numRows at the top, added for preceding context. @@ -176,12 +177,14 @@ class GpuBatchedBoundedWindowIterator( // TODO: Optimize. If no rows can be output, skip calling the kernel. if (numPrecedingRowsAdded + numUnprocessedInCache >= inputRowCount) { - println("Not enough rows! Cannot output a batch.") + logWarning("Not enough rows! Cannot output a batch.") } else { - val trimmedOutputCols = trim(outputCols, - numPrecedingRowsAdded, numUnprocessedInCache) - outputBatch = convertToBatch(outputTypes, trimmedOutputCols) + outputBatch = withResource( + trim(outputCols, + numPrecedingRowsAdded, numUnprocessedInCache)) { trimmed => + convertToBatch(outputTypes, trimmed) + } } numPrecedingRowsAdded = maxPreceding min (inputRowCount - numUnprocessedInCache) From d00747ff04f4c6f3c54a59f1c9e15ac38429c439 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 4 Dec 2023 20:22:35 -0800 Subject: [PATCH 09/23] Document onTaskCompletion. --- .../com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index 794cdd9e489..777ce4f232e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -50,6 +50,7 @@ class GpuBatchedBoundedWindowIterator( private var numUnprocessedInCache: Int = 0 // numRows at the bottom not processed completely. private var numPrecedingRowsAdded: Int = 0 // numRows at the top, added for preceding context. + // Register handler to clean up cache when task completes. Option(TaskContext.get()).foreach { tc => onTaskCompletion(tc) { clearCached() From b5bd065b75fd422e8615a85ec04177f8ef076b3a Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 4 Dec 2023 21:30:18 -0800 Subject: [PATCH 10/23] Optimization: Skip window kernel if no output for current batch. --- .../rapids/GpuBatchedBoundedWindowExec.scala | 72 +++++++++---------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index 777ce4f232e..1cbc5645e2f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -57,6 +57,7 @@ class GpuBatchedBoundedWindowIterator( } } + // Caches input column schema on first read. var inputTypes: Option[Array[DataType]] = None // Clears cached column vectors, after consumption. @@ -144,11 +145,9 @@ class GpuBatchedBoundedWindowIterator( } private def resetInputCache(newCache: Option[Array[CudfColumnVector]], - newUnprocessed: Int, // TODO: Not required here. Already set prior. newPrecedingAdded: Int): Unit= { cached.foreach(_.foreach(_.close)) cached = newCache - numUnprocessedInCache = newUnprocessed numPrecedingRowsAdded = newPrecedingAdded } @@ -157,48 +156,45 @@ class GpuBatchedBoundedWindowIterator( while (outputBatch == null && hasNext) { withResource(getNextInputBatch) { inputCbSpillable => withResource(inputCbSpillable.getColumnarBatch()) { inputCB => - withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => - withResource(computeBasicWindow(inputCB)) { outputCols => - val inputRowCount = inputCB.numRows() - - val noMoreInput = !input.hasNext - - numUnprocessedInCache = if (noMoreInput) { - // If there are no more input rows expected, - // this is the last output batch. - 0 - } else { - // More input rows expected. The last `maxFollowing` rows can't be finalized. - // Cannot exceed `inputRowCount`. - maxFollowing min inputRowCount - } - // Trim output. Remove numPrecedingRowsAdded from the top, - // and numUnprocessedInCache from the bottom. - // TODO: Optimize. If no rows can be output, skip calling the kernel. + val inputRowCount = inputCB.numRows() + val noMoreInput = !input.hasNext + numUnprocessedInCache = if (noMoreInput) { + // If there are no more input rows expected, + // this is the last output batch. + // Consider all rows in the batch as processed. + 0 + } else { + // More input rows expected. The last `maxFollowing` rows can't be finalized. + // Cannot exceed `inputRowCount`. + maxFollowing min inputRowCount + } - if (numPrecedingRowsAdded + numUnprocessedInCache >= inputRowCount) { - logWarning("Not enough rows! Cannot output a batch.") - } - else { - outputBatch = withResource( - trim(outputCols, - numPrecedingRowsAdded, numUnprocessedInCache)) { trimmed => - convertToBatch(outputTypes, trimmed) + if (numPrecedingRowsAdded + numUnprocessedInCache >= inputRowCount) { + // No point calling windowing kernel: the results will simply be ignored. + logWarning("Not enough rows! Cannot output a batch.") + } else { + withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ => + withResource(computeBasicWindow(inputCB)) { outputCols => + outputBatch = withResource( + trim(outputCols, + numPrecedingRowsAdded, numUnprocessedInCache)) { trimmed => + convertToBatch(outputTypes, trimmed) } } - - numPrecedingRowsAdded = maxPreceding min (inputRowCount - numUnprocessedInCache) - val inputCols = Range(0, inputCB.numCols()).map { - inputCB.column(_).asInstanceOf[GpuColumnVector].getBase - }.toArray - - val newCached = trim(inputCols, - inputRowCount - (numPrecedingRowsAdded + numUnprocessedInCache), - 0) - resetInputCache(Some(newCached), numUnprocessedInCache, numPrecedingRowsAdded) } } + + // Compute new cache using current input. + numPrecedingRowsAdded = maxPreceding min (inputRowCount - numUnprocessedInCache) + val inputCols = Range(0, inputCB.numCols()).map { + inputCB.column(_).asInstanceOf[GpuColumnVector].getBase + }.toArray + + val newCached = trim(inputCols, + inputRowCount - (numPrecedingRowsAdded + numUnprocessedInCache), + 0) + resetInputCache(Some(newCached), numPrecedingRowsAdded) } } } From 94d9bc42c41bcb83a09902166c901f4ec2fa8da4 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 5 Dec 2023 09:57:55 -0800 Subject: [PATCH 11/23] Removed commented code, prints. --- .../main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 1fc80d1269f..c4cb98c85bf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -293,8 +293,7 @@ case class BatchedOps(running: Seq[NamedExpression], new GpuBatchedBoundedWindowExec(getBoundedExpressionsWithTheRestAsPassthrough, gpuPartitionSpec, gpuOrderSpec, - child/*, prec, foll*/)( - cpuPartitionSpec, cpuOrderSpec, prec, foll) + child)(cpuPartitionSpec, cpuOrderSpec, prec, foll) } def getWindowExec( @@ -543,7 +542,6 @@ object GpuWindowExec { /** * Checks whether the window spec is both ROWS-based and bounded. * Window functions of this spec can possibly still be batched. - * TODO: Check if this can be rewritten with getBoundedWindowPrecedingAndFollowing(). */ def isBoundedRowsWindowAndBatchable(spec: GpuWindowSpecDefinition): Boolean = spec match { case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( From 60245fecb3dc978451cb71d5ed27db1f756329e5 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 5 Dec 2023 14:57:17 -0800 Subject: [PATCH 12/23] Switched to exposing negative minPreceding. Also built safety guards to disable optimization for very large window extents. --- .../rapids/GpuBatchedBoundedWindowExec.scala | 14 ++-- .../nvidia/spark/rapids/GpuWindowExec.scala | 74 ++++++++++++------- .../com/nvidia/spark/rapids/RapidsConf.scala | 9 +++ 3 files changed, 63 insertions(+), 34 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index 1cbc5645e2f..ac334cdb069 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -34,7 +34,7 @@ class GpuBatchedBoundedWindowIterator( override val boundPartitionSpec: Seq[GpuExpression], override val boundOrderSpec: Seq[SortOrder], val outputTypes: Array[DataType], - maxPreceding: Int, + minPreceding: Int, maxFollowing: Int, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, @@ -83,7 +83,7 @@ class GpuBatchedBoundedWindowIterator( } def concatenateColumns(cached: Array[CudfColumnVector], - freshBatchTable: CudfTable) + freshBatchTable: CudfTable) : Array[CudfColumnVector] = { if (cached.length != freshBatchTable.getNumberOfColumns) { @@ -167,6 +167,7 @@ class GpuBatchedBoundedWindowIterator( } else { // More input rows expected. The last `maxFollowing` rows can't be finalized. // Cannot exceed `inputRowCount`. + // TODO: Account for maxFollowing < 0 (e.g. LAG()) => numUnprocessedInCache = 0. maxFollowing min inputRowCount } @@ -186,7 +187,8 @@ class GpuBatchedBoundedWindowIterator( } // Compute new cache using current input. - numPrecedingRowsAdded = maxPreceding min (inputRowCount - numUnprocessedInCache) + // TODO: Account for minPreceding >0 (e.g. LEAD()) => numPrecedingRowsAdded = 0. + numPrecedingRowsAdded = Math.abs(minPreceding) min (inputRowCount - numUnprocessedInCache) val inputCols = Range(0, inputCB.numCols()).map { inputCB.column(_).asInstanceOf[GpuColumnVector].getBase }.toArray @@ -212,7 +214,7 @@ class GpuBatchedBoundedWindowExec( override val child: SparkPlan)( override val cpuPartitionSpec: Seq[Expression], override val cpuOrderSpec: Seq[SortOrder], - maxPreceding: Integer, + minPreceding: Integer, maxFollowing: Integer ) extends GpuWindowExec(windowOps, gpuPartitionSpec, @@ -220,7 +222,7 @@ class GpuBatchedBoundedWindowExec( child)(cpuPartitionSpec, cpuOrderSpec) { override def otherCopyArgs: Seq[AnyRef] = - cpuPartitionSpec :: cpuOrderSpec :: maxPreceding :: maxFollowing :: Nil + cpuPartitionSpec :: cpuOrderSpec :: minPreceding :: maxFollowing :: Nil override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching) @@ -241,7 +243,7 @@ class GpuBatchedBoundedWindowExec( child.executeColumnar().mapPartitions { iter => new GpuBatchedBoundedWindowIterator(iter, boundWindowOps, boundPartitionSpec, - boundOrderSpec, output.map(_.dataType).toArray, maxPreceding, maxFollowing, + boundOrderSpec, output.map(_.dataType).toArray, minPreceding, maxFollowing, numOutputBatches, numOutputRows, opTime) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index c4cb98c85bf..527a3301e56 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -144,7 +144,7 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W val allBatched = fixedUpWindowOps.forall { case GpuAlias(GpuWindowExpression(func, spec), _) => - GpuWindowExec.isBatchedFunc(func, spec) + GpuWindowExec.isBatchedFunc(func, spec, conf) case GpuAlias(_: AttributeReference, _) | _: AttributeReference => // We allow pure result columns for running windows true @@ -162,7 +162,7 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W } val windowExpr = if (allBatched) { - val batchedOps = GpuWindowExec.splitBatchedOps(fixedUpWindowOps) + val batchedOps = GpuWindowExec.splitBatchedOps(fixedUpWindowOps, conf) batchedOps.getWindowExec( partitionSpec.map(_.convertToGpu()), orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), @@ -246,7 +246,7 @@ case class BatchedOps(running: Seq[NamedExpression], def getBoundedExpressionsWithTheRestAsPassthrough: Seq[NamedExpression] = passThrough ++ bounded ++ (unboundedToUnbounded ++ running).map(_.toAttribute) - def getMaxBoundedPrecedingAndFollowing: (Int, Int) = { + def getMinPrecedingMaxFollowingForBoundedWindows: (Int, Int) = { // All bounded window expressions should have window bound window specs. val boundedWindowSpecs = bounded.map{ case GpuAlias(GpuWindowExpression(_, spec), _) => spec @@ -256,7 +256,7 @@ case class BatchedOps(running: Seq[NamedExpression], val precedingAndFollowing = boundedWindowSpecs.map( GpuWindowExec.getBoundedWindowPrecedingAndFollowing ) - (precedingAndFollowing.map{ p => Math.abs(p._1) }.max, // Only Negative values supported. + (precedingAndFollowing.map{ _._1 }.min, // Only non-positive (>=0) values supported. precedingAndFollowing.map{ _._2 }.max) } @@ -289,7 +289,7 @@ case class BatchedOps(running: Seq[NamedExpression], child: SparkPlan, cpuPartitionSpec: Seq[Expression], cpuOrderSpec: Seq[SortOrder]): GpuExec = { - val (prec@_, foll@_) = getMaxBoundedPrecedingAndFollowing + val (prec@_, foll@_) = getMinPrecedingMaxFollowingForBoundedWindows new GpuBatchedBoundedWindowExec(getBoundedExpressionsWithTheRestAsPassthrough, gpuPartitionSpec, gpuOrderSpec, @@ -542,25 +542,40 @@ object GpuWindowExec { /** * Checks whether the window spec is both ROWS-based and bounded. * Window functions of this spec can possibly still be batched. + * TODO: (future): Change here to support +ve preceding, -ve following. */ - def isBoundedRowsWindowAndBatchable(spec: GpuWindowSpecDefinition): Boolean = spec match { - case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( - RowFrame, - GpuLiteral(prec: Int, _), - GpuLiteral(foll: Int, _))) => prec <= 0 && foll >= 0 - case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( - RowFrame, - GpuSpecialFrameBoundary(CurrentRow), - GpuLiteral(foll: Int, _))) => foll >= 0 - case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( - RowFrame, - GpuLiteral(prec: Int,_), - GpuSpecialFrameBoundary(CurrentRow))) => prec <= 0 - case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( - RowFrame, - GpuSpecialFrameBoundary(CurrentRow), - GpuSpecialFrameBoundary(CurrentRow))) => true - case _ => false + private def isBoundedRowsWindowAndBatchable(spec: GpuWindowSpecDefinition, + conf: RapidsConf): Boolean = { + + def precInPermissibleRange(prec: Int) = + prec <= 0 && Math.abs(prec) <= conf.boundedRowsWindowMaxExtent + + def follInPermissibleRange(foll: Int) = + foll >= 0 && Math.abs(foll) <= conf.boundedRowsWindowMaxExtent + + spec match { + + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuLiteral(prec: Int, _), + GpuLiteral(foll: Int, _))) => + precInPermissibleRange(prec) && follInPermissibleRange(foll) + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuSpecialFrameBoundary(CurrentRow), + GpuLiteral(foll: Int, _))) => + follInPermissibleRange(foll) + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuLiteral(prec: Int, _), + GpuSpecialFrameBoundary(CurrentRow))) => + precInPermissibleRange(prec) + case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( + RowFrame, + GpuSpecialFrameBoundary(CurrentRow), + GpuSpecialFrameBoundary(CurrentRow))) => true + case _ => false + } } def getBoundedWindowPrecedingAndFollowing(spec: GpuWindowSpecDefinition): (Int, Int) = @@ -582,7 +597,7 @@ object GpuWindowExec { GpuSpecialFrameBoundary(CurrentRow), GpuSpecialFrameBoundary(CurrentRow))) => (0, 0) case _ => throw new IllegalArgumentException("Expected bounded ROWS spec, " + - s"found $spec") // (null, null) // Can't reach here. + s"found $spec") // Can't reach here. } def isUnboundedToUnboundedWindow(spec: GpuWindowSpecDefinition): Boolean = spec match { @@ -611,13 +626,16 @@ object GpuWindowExec { case _ => false } - def isBatchedFunc(func: Expression, spec: GpuWindowSpecDefinition): Boolean = { + def isBatchedFunc(func: Expression, + spec: GpuWindowSpecDefinition, + conf: RapidsConf): Boolean = { isBatchedRunningFunc(func, spec) || isBatchedUnboundedToUnboundedFunc(func, spec) || - isBoundedRowsWindowAndBatchable(spec) + isBoundedRowsWindowAndBatchable(spec, conf) } - def splitBatchedOps(windowOps: Seq[NamedExpression]): BatchedOps = { + def splitBatchedOps(windowOps: Seq[NamedExpression], + conf: RapidsConf): BatchedOps = { val running = ArrayBuffer[NamedExpression]() val doublePass = ArrayBuffer[NamedExpression]() val batchedBounded = ArrayBuffer[NamedExpression]() @@ -628,7 +646,7 @@ object GpuWindowExec { running.append(expr) } else if (isBatchedUnboundedToUnboundedFunc(func, spec)) { doublePass.append(expr) - } else if (isBoundedRowsWindowAndBatchable(spec)) { + } else if (isBoundedRowsWindowAndBatchable(spec, conf)) { batchedBounded.append(expr) } else { throw new IllegalArgumentException( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 82fb5cdb4b9..a04219e7778 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1340,6 +1340,13 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val BATCHED_BOUNDED_ROW_WINDOW_MAX_EXTENT: ConfEntryWithDefault[Integer] = + conf("spark.rapids.sql.window.batched.bounded.row.extent") + .doc("Max value for bounded row window preceding/following extents " + + "permissible for the window to be evaluated in batched mode") + .integerConf + .createWithDefault(value = 100) + val ENABLE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] = conf("spark.rapids.sql.agg.singlePassPartialSortEnabled") .doc("Enable or disable a single pass partial sort optimization where if a heuristic " + @@ -2709,6 +2716,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isRangeWindowDecimalEnabled: Boolean = get(ENABLE_RANGE_WINDOW_DECIMAL) + lazy val boundedRowsWindowMaxExtent: Int = get(BATCHED_BOUNDED_ROW_WINDOW_MAX_EXTENT) + lazy val allowSinglePassPartialSortAgg: Boolean = get(ENABLE_SINGLE_PASS_PARTIAL_SORT_AGG) lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG) From 682afdc44f689357c9677edc6688eb00012e942e Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 5 Dec 2023 21:42:32 -0800 Subject: [PATCH 13/23] Removed incorrect error message. --- .../com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index ac334cdb069..5d25a0742a4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -131,9 +131,7 @@ class GpuBatchedBoundedWindowIterator( offTheBottom: Int): Array[CudfColumnVector] = { def checkValidSizes(col: CudfColumnVector): Unit = - if (offTheTop >= col.getRowCount || - offTheBottom >= col.getRowCount || - (offTheTop + offTheBottom) > col.getRowCount) { + if ((offTheTop + offTheBottom) > col.getRowCount) { throw new IllegalArgumentException(s"Cannot trim column of size ${col.getRowCount} by " + s"$offTheTop rows at the top, and $offTheBottom rows at the bottom.") } From 476708013b9a31d0d9cd4b9a54d0e526ecb1fa4c Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 5 Dec 2023 21:43:12 -0800 Subject: [PATCH 14/23] Tests for varying finite window combinations. --- .../src/main/python/window_function_test.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index d416c8d4c46..226ca754ae3 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -1807,6 +1807,43 @@ def test_window_aggs_for_negative_rows_unpartitioned(data_gen, batch_size): conf=conf) +@ignore_order(local=True) +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('data_gen', [ + _grpkey_short_with_nulls, + _grpkey_int_with_nulls, + _grpkey_long_with_nulls, + _grpkey_date_with_nulls, +], ids=idfn) +def test_window_aggs_for_batched_finite_row_windows(data_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size} + assert_gpu_and_cpu_are_equal_sql( + lambda spark: gen_df(spark, data_gen, length=2048), + 'window_agg_table', + 'select ' + ' count(1) over ' + ' (partition by a order by b,c asc ' + ' rows between CURRENT ROW and 100 following) as count_1_asc, ' + ' count(c) over ' + ' (partition by a order by b,c asc ' + ' rows between 100 PRECEDING AND CURRENT ROW) as count_b_asc, ' + ' sum(c) over ' + ' (partition by a order by b,c asc ' + ' rows between 1 preceding and 3 following) as sum_c_asc, ' + ' avg(c) over ' + ' (partition by a order by b,c asc ' + ' rows between 10 preceding and 30 following) as avg_b_asc, ' + ' max(c) over ' + ' (partition by a order by b,c desc ' + ' rows between 1 preceding and 3 following) as max_b_desc, ' + ' min(c) over ' + ' (partition by a order by b,c asc ' + ' rows between 1 preceding and 3 following) as min_b_asc ' + 'from window_agg_table ', + validate_execs_in_gpu_plan=['GpuBatchedBoundedWindowExec'], + conf=conf) + + def test_lru_cache_datagen(): # log cache info at the end of integration tests, not related to window functions info = gen_df_help.cache_info() From fa5ab16d415179063db3784ad0101b976a35bbcd Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 5 Dec 2023 22:04:43 -0800 Subject: [PATCH 15/23] Tests for unpartitioned cases. Plus, some minor reformatting. --- .../src/main/python/window_function_test.py | 73 +++++++++++++------ 1 file changed, 51 insertions(+), 22 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 226ca754ae3..d1e71aa7f09 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -1808,38 +1808,67 @@ def test_window_aggs_for_negative_rows_unpartitioned(data_gen, batch_size): @ignore_order(local=True) -@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) @pytest.mark.parametrize('data_gen', [ _grpkey_short_with_nulls, _grpkey_int_with_nulls, _grpkey_long_with_nulls, _grpkey_date_with_nulls, ], ids=idfn) -def test_window_aggs_for_batched_finite_row_windows(data_gen, batch_size): +def test_window_aggs_for_batched_finite_row_windows_partitioned(data_gen, batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size} assert_gpu_and_cpu_are_equal_sql( lambda spark: gen_df(spark, data_gen, length=2048), 'window_agg_table', - 'select ' - ' count(1) over ' - ' (partition by a order by b,c asc ' - ' rows between CURRENT ROW and 100 following) as count_1_asc, ' - ' count(c) over ' - ' (partition by a order by b,c asc ' - ' rows between 100 PRECEDING AND CURRENT ROW) as count_b_asc, ' - ' sum(c) over ' - ' (partition by a order by b,c asc ' - ' rows between 1 preceding and 3 following) as sum_c_asc, ' - ' avg(c) over ' - ' (partition by a order by b,c asc ' - ' rows between 10 preceding and 30 following) as avg_b_asc, ' - ' max(c) over ' - ' (partition by a order by b,c desc ' - ' rows between 1 preceding and 3 following) as max_b_desc, ' - ' min(c) over ' - ' (partition by a order by b,c asc ' - ' rows between 1 preceding and 3 following) as min_b_asc ' - 'from window_agg_table ', + """ + SELECT + COUNT(1) OVER (PARTITION BY a ORDER BY b,c ASC + ROWS BETWEEN CURRENT ROW AND 100 FOLLOWING) AS count_1_asc, + COUNT(c) OVER (PARTITION BY a ORDER BY b,c ASC + ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_b_asc, + SUM(c) OVER (PARTITION BY a ORDER BY b,c ASC + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS sum_c_asc, + AVG(c) OVER (PARTITION BY a ORDER BY b,c ASC + ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_b_asc, + MAX(c) OVER (PARTITION BY a ORDER BY b,c DESC + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_b_desc, + MIN(c) OVER (PARTITION BY a ORDER BY b,c ASC + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_b_asc + FROM window_agg_table + """, + validate_execs_in_gpu_plan=['GpuBatchedBoundedWindowExec'], + conf=conf) + + +@ignore_order(local=True) +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) +@pytest.mark.parametrize('data_gen', [ + _grpkey_short_with_nulls, + _grpkey_int_with_nulls, + _grpkey_long_with_nulls, + _grpkey_date_with_nulls, +], ids=idfn) +def test_window_aggs_for_batched_finite_row_windows_unpartitioned(data_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size} + assert_gpu_and_cpu_are_equal_sql( + lambda spark: gen_df(spark, data_gen, length=2048), + 'window_agg_table', + """ + SELECT + COUNT(1) OVER (ORDER BY b,c,a ASC + ROWS BETWEEN CURRENT ROW AND 100 FOLLOWING) AS count_1_asc, + COUNT(c) OVER (PARTITION BY a ORDER BY b,c,a ASC + ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_b_asc, + SUM(c) OVER (PARTITION BY a ORDER BY b,c,a ASC + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS sum_c_asc, + AVG(c) OVER (PARTITION BY a ORDER BY b,c,a ASC + ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_b_asc, + MAX(c) OVER (PARTITION BY a ORDER BY b,c,a DESC + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_b_desc, + MIN(c) OVER (PARTITION BY a ORDER BY b,c,a ASC + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_b_asc + FROM window_agg_table + """, validate_execs_in_gpu_plan=['GpuBatchedBoundedWindowExec'], conf=conf) From 3f50224b573430d771cc1f29e5bc30d264f037a9 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 6 Dec 2023 11:04:49 -0800 Subject: [PATCH 16/23] Fixed leak in concat. --- .../spark/rapids/GpuBatchedBoundedWindowExec.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index 5d25a0742a4..ea5ecf6b145 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -101,10 +101,11 @@ class GpuBatchedBoundedWindowIterator( // Cached input AND new input rows exist. Return concat-ed rows. withResource(getFreshInputBatch) { freshBatchCB => withResource(GpuColumnVector.from(freshBatchCB)) { freshBatchTable => - val concat = concatenateColumns(cached.get, freshBatchTable) - clearCached() - SpillableColumnarBatch(convertToBatch(inputTypes.get, concat), - SpillPriorities.ACTIVE_BATCHING_PRIORITY) + withResource(concatenateColumns(cached.get, freshBatchTable)) { concat => + clearCached() + SpillableColumnarBatch(convertToBatch(inputTypes.get, concat), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } } } } else { From e08cfa2b155160cca4411aa482294ecef03cb8d4 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 6 Dec 2023 12:52:52 -0800 Subject: [PATCH 17/23] Test that large extents fall back to GpuWindowExec. Signed-off-by: MithunR --- .../src/main/python/window_function_test.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index d1e71aa7f09..8e20926a77b 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -1873,6 +1873,45 @@ def test_window_aggs_for_batched_finite_row_windows_unpartitioned(data_gen, batc conf=conf) +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', [_grpkey_int_with_nulls,], ids=idfn) +def test_window_aggs_for_batched_finite_row_windows_fallback(data_gen): + """ + This test is to verify that batching is disabled for bounded windows if + the window extents exceed the window-extents specified in the RAPIDS conf. + """ + + # Query with window extent = { 200 PRECEDING, 200 FOLLOWING }. + query = """ + SELECT + COUNT(1) OVER (PARTITION BY a ORDER BY b,c ASC + ROWS BETWEEN 200 PRECEDING AND 200 FOLLOWING) AS count_1_asc + FROM window_agg_table + """ + + def get_conf_with_extent(extent): + return {'spark.rapids.sql.batchSizeBytes': '1000', + 'spark.rapids.sql.window.batched.bounded.row.extent': extent} + + def assert_query_runs_on(exec, conf): + assert_gpu_and_cpu_are_equal_sql( + lambda spark: gen_df(spark, data_gen, length=2048), + 'window_agg_table', + query, + validate_execs_in_gpu_plan=[exec], + conf=conf) + + # Check that with max window extent set to 100, + # query runs without batching, i.e. `GpuWindowExec`. + conf_100 = get_conf_with_extent(100) + assert_query_runs_on(exec='GpuWindowExec', conf=conf_100) + + # Check that with max window extent set to 200, + # query runs *with* batching, i.e. `GpuBatchedBoundedWindowExec`. + conf_200 = get_conf_with_extent(200) + assert_query_runs_on(exec='GpuBatchedBoundedWindowExec', conf=conf_200) + + def test_lru_cache_datagen(): # log cache info at the end of integration tests, not related to window functions info = gen_df_help.cache_info() From e0581c034bacf6a0b4ca156243ede54d9f65b650 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 6 Dec 2023 13:03:03 -0800 Subject: [PATCH 18/23] Fix build break with Scala 2.13. --- .../src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 527a3301e56..13bc6298d69 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -660,7 +660,7 @@ object GpuWindowExec { throw new IllegalArgumentException( s"Found unexpected expression $other in window exec ${other.getClass}") } - BatchedOps(running.toSeq, doublePass.toSeq, batchedBounded, passThrough.toSeq) + BatchedOps(running.toSeq, doublePass.toSeq, batchedBounded.toSeq, passThrough.toSeq) } } From 0ba8be6af3f1a7d3a9c46c81870515b41daa9327 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 6 Dec 2023 18:11:49 -0800 Subject: [PATCH 19/23] Support for negative offsets. This now allows for `LEAD()`, `LAG()`, and regular window functions with negative values for `preceding`,`following` window bounds. --- .../src/main/python/window_function_test.py | 28 +++++++++++++------ .../rapids/GpuBatchedBoundedWindowExec.scala | 16 +++++++++-- .../nvidia/spark/rapids/GpuWindowExec.scala | 15 ++++------ 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 8e20926a77b..0f251a57e46 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -1825,15 +1825,21 @@ def test_window_aggs_for_batched_finite_row_windows_partitioned(data_gen, batch_ COUNT(1) OVER (PARTITION BY a ORDER BY b,c ASC ROWS BETWEEN CURRENT ROW AND 100 FOLLOWING) AS count_1_asc, COUNT(c) OVER (PARTITION BY a ORDER BY b,c ASC - ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_b_asc, + ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_c_asc, + COUNT(c) OVER (PARTITION BY a ORDER BY b,c ASC + ROWS BETWEEN -50 PRECEDING AND 100 FOLLOWING) AS count_c_negative, + COUNT(1) OVER (PARTITION BY a ORDER BY b,c ASC + ROWS BETWEEN 50 PRECEDING AND -10 FOLLOWING) AS count_1_negative, SUM(c) OVER (PARTITION BY a ORDER BY b,c ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS sum_c_asc, AVG(c) OVER (PARTITION BY a ORDER BY b,c ASC - ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_b_asc, + ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_c_asc, MAX(c) OVER (PARTITION BY a ORDER BY b,c DESC - ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_b_desc, + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_c_desc, MIN(c) OVER (PARTITION BY a ORDER BY b,c ASC - ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_b_asc + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_c_asc, + LAG(c, 30) OVER (PARTITION BY a ORDER BY b,c ASC) AS lag_c_30_asc, + LEAD(c, 40) OVER (PARTITION BY a ORDER BY b,c ASC) AS lead_c_40_asc FROM window_agg_table """, validate_execs_in_gpu_plan=['GpuBatchedBoundedWindowExec'], @@ -1858,15 +1864,21 @@ def test_window_aggs_for_batched_finite_row_windows_unpartitioned(data_gen, batc COUNT(1) OVER (ORDER BY b,c,a ASC ROWS BETWEEN CURRENT ROW AND 100 FOLLOWING) AS count_1_asc, COUNT(c) OVER (PARTITION BY a ORDER BY b,c,a ASC - ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_b_asc, + ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_c_asc, + COUNT(c) OVER (PARTITION BY a ORDER BY b,c,a ASC + ROWS BETWEEN -50 PRECEDING AND 100 FOLLOWING) AS count_c_negative, + COUNT(1) OVER (PARTITION BY a ORDER BY b,c,a ASC + ROWS BETWEEN 50 PRECEDING AND -10 FOLLOWING) AS count_1_negative, SUM(c) OVER (PARTITION BY a ORDER BY b,c,a ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS sum_c_asc, AVG(c) OVER (PARTITION BY a ORDER BY b,c,a ASC - ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_b_asc, + ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_c_asc, MAX(c) OVER (PARTITION BY a ORDER BY b,c,a DESC - ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_b_desc, + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_c_desc, MIN(c) OVER (PARTITION BY a ORDER BY b,c,a ASC - ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_b_asc + ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_c_asc, + LAG(c, 6) OVER (PARTITION BY a ORDER BY b,c,a ASC) AS lag_c_6, + LEAD(c,4) OVER (PARTITION BY a ORDER BY b,c,a ASC) AS lead_c_4 FROM window_agg_table """, validate_execs_in_gpu_plan=['GpuBatchedBoundedWindowExec'], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index ea5ecf6b145..e2141ef3ff8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -167,7 +167,13 @@ class GpuBatchedBoundedWindowIterator( // More input rows expected. The last `maxFollowing` rows can't be finalized. // Cannot exceed `inputRowCount`. // TODO: Account for maxFollowing < 0 (e.g. LAG()) => numUnprocessedInCache = 0. - maxFollowing min inputRowCount + if (maxFollowing < 0) { // E.g. LAG(3) => [ preceding=-3, following=-3 ] + // -ve following => No need to wait for more following rows. + // All "following" context is already available in the current batch. + 0 + } else { + maxFollowing min inputRowCount + } } if (numPrecedingRowsAdded + numUnprocessedInCache >= inputRowCount) { @@ -187,7 +193,13 @@ class GpuBatchedBoundedWindowIterator( // Compute new cache using current input. // TODO: Account for minPreceding >0 (e.g. LEAD()) => numPrecedingRowsAdded = 0. - numPrecedingRowsAdded = Math.abs(minPreceding) min (inputRowCount - numUnprocessedInCache) + numPrecedingRowsAdded = if (minPreceding > 0) { // E.g. LEAD(3) => [prec=3, foll=3] + // preceding > 0 => No "preceding" rows need be carried forward. + // Only the rows that need to be recomputed. + 0 + } else { + Math.abs(minPreceding) min (inputRowCount - numUnprocessedInCache) + } val inputCols = Range(0, inputCB.numCols()).map { inputCB.column(_).asInstanceOf[GpuColumnVector].getBase }.toArray diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 13bc6298d69..c94c8ef6da5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -303,7 +303,6 @@ case class BatchedOps(running: Seq[NamedExpression], cpuPartitionSpec: Seq[Expression], cpuOrderSpec: Seq[SortOrder]): GpuExec = { // The order of these matter so we can pass the output of the first through the second one - // TODO: Use a separate function for everything below the hasRunning check. if (hasRunning) { val runningExec = getRunningWindowExec(gpuPartitionSpec, gpuOrderSpec, child, cpuPartitionSpec, cpuOrderSpec) @@ -542,16 +541,12 @@ object GpuWindowExec { /** * Checks whether the window spec is both ROWS-based and bounded. * Window functions of this spec can possibly still be batched. - * TODO: (future): Change here to support +ve preceding, -ve following. */ private def isBoundedRowsWindowAndBatchable(spec: GpuWindowSpecDefinition, conf: RapidsConf): Boolean = { - def precInPermissibleRange(prec: Int) = - prec <= 0 && Math.abs(prec) <= conf.boundedRowsWindowMaxExtent - - def follInPermissibleRange(foll: Int) = - foll >= 0 && Math.abs(foll) <= conf.boundedRowsWindowMaxExtent + def inPermissibleRange(bounds: Int) = + Math.abs(bounds) <= conf.boundedRowsWindowMaxExtent spec match { @@ -559,17 +554,17 @@ object GpuWindowExec { RowFrame, GpuLiteral(prec: Int, _), GpuLiteral(foll: Int, _))) => - precInPermissibleRange(prec) && follInPermissibleRange(foll) + inPermissibleRange(prec) && inPermissibleRange(foll) case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( RowFrame, GpuSpecialFrameBoundary(CurrentRow), GpuLiteral(foll: Int, _))) => - follInPermissibleRange(foll) + inPermissibleRange(foll) case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( RowFrame, GpuLiteral(prec: Int, _), GpuSpecialFrameBoundary(CurrentRow))) => - precInPermissibleRange(prec) + inPermissibleRange(prec) case GpuWindowSpecDefinition(_, _, GpuSpecifiedWindowFrame( RowFrame, GpuSpecialFrameBoundary(CurrentRow), From b5fda09ac9ab65af10f6c966d0809184332f79ab Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 6 Dec 2023 18:35:28 -0800 Subject: [PATCH 20/23] Removed erroneous batching. This commit fixes the batching. The new exec should not have to receive batched input. --- .../nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index e2141ef3ff8..5f31009ed41 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -235,13 +235,9 @@ class GpuBatchedBoundedWindowExec( override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: minPreceding :: maxFollowing :: Nil - override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching) + override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq.fill(children.size)(null) - override def outputBatching: CoalesceGoal = if (gpuPartitionSpec.isEmpty) { - RequireSingleBatch - } else { - BatchedByKey(gpuPartitionOrdering)(cpuPartitionOrdering) - } + override def outputBatching: CoalesceGoal = null override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputBatches = gpuLongMetric(GpuMetric.NUM_OUTPUT_BATCHES) From f9de13a1b1f5d1407b6ff965269767d47d04cbb9 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 7 Dec 2023 10:53:09 -0800 Subject: [PATCH 21/23] Config changes: 1. Renamed config. '.extent' to '.max'. 2. Fixed documentation for said config. 3. Removed TODOs that were already handled. --- .../src/main/python/window_function_test.py | 2 +- .../spark/rapids/GpuBatchedBoundedWindowExec.scala | 2 -- .../scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 2 +- .../scala/com/nvidia/spark/rapids/RapidsConf.scala | 10 ++++++---- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 0f251a57e46..bbb97f46282 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -1903,7 +1903,7 @@ def test_window_aggs_for_batched_finite_row_windows_fallback(data_gen): def get_conf_with_extent(extent): return {'spark.rapids.sql.batchSizeBytes': '1000', - 'spark.rapids.sql.window.batched.bounded.row.extent': extent} + 'spark.rapids.sql.window.batched.bounded.row.max': extent} def assert_query_runs_on(exec, conf): assert_gpu_and_cpu_are_equal_sql( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala index 5f31009ed41..0c288210dbe 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchedBoundedWindowExec.scala @@ -166,7 +166,6 @@ class GpuBatchedBoundedWindowIterator( } else { // More input rows expected. The last `maxFollowing` rows can't be finalized. // Cannot exceed `inputRowCount`. - // TODO: Account for maxFollowing < 0 (e.g. LAG()) => numUnprocessedInCache = 0. if (maxFollowing < 0) { // E.g. LAG(3) => [ preceding=-3, following=-3 ] // -ve following => No need to wait for more following rows. // All "following" context is already available in the current batch. @@ -192,7 +191,6 @@ class GpuBatchedBoundedWindowIterator( } // Compute new cache using current input. - // TODO: Account for minPreceding >0 (e.g. LEAD()) => numPrecedingRowsAdded = 0. numPrecedingRowsAdded = if (minPreceding > 0) { // E.g. LEAD(3) => [prec=3, foll=3] // preceding > 0 => No "preceding" rows need be carried forward. // Only the rows that need to be recomputed. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index c94c8ef6da5..7ef90723107 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -546,7 +546,7 @@ object GpuWindowExec { conf: RapidsConf): Boolean = { def inPermissibleRange(bounds: Int) = - Math.abs(bounds) <= conf.boundedRowsWindowMaxExtent + Math.abs(bounds) <= conf.batchedBoundedRowsWindowMax spec match { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index a04219e7778..b3c086a955a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1340,10 +1340,12 @@ object RapidsConf { .booleanConf .createWithDefault(true) - val BATCHED_BOUNDED_ROW_WINDOW_MAX_EXTENT: ConfEntryWithDefault[Integer] = - conf("spark.rapids.sql.window.batched.bounded.row.extent") + val BATCHED_BOUNDED_ROW_WINDOW_MAX: ConfEntryWithDefault[Integer] = + conf("spark.rapids.sql.window.batched.bounded.row.max") .doc("Max value for bounded row window preceding/following extents " + - "permissible for the window to be evaluated in batched mode") + "permissible for the window to be evaluated in batched mode. This value affects " + + "both the preceding and following bounds, potentially doubling the window size " + + "permitted for batched execution") .integerConf .createWithDefault(value = 100) @@ -2716,7 +2718,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isRangeWindowDecimalEnabled: Boolean = get(ENABLE_RANGE_WINDOW_DECIMAL) - lazy val boundedRowsWindowMaxExtent: Int = get(BATCHED_BOUNDED_ROW_WINDOW_MAX_EXTENT) + lazy val batchedBoundedRowsWindowMax: Int = get(BATCHED_BOUNDED_ROW_WINDOW_MAX) lazy val allowSinglePassPartialSortAgg: Boolean = get(ENABLE_SINGLE_PASS_PARTIAL_SORT_AGG) From 4a5f5c559942e3bdc999118850169b1c12594c3c Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 7 Dec 2023 11:19:07 -0800 Subject: [PATCH 22/23] Docs update for batched row window config. --- docs/additional-functionality/advanced_configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 089a9cb9b27..5cf56dd0500 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -142,6 +142,7 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.stableSort.enabled|Enable or disable stable sorting. Apache Spark's sorting is typically a stable sort, but sort stability cannot be guaranteed in distributed work loads because the order in which upstream data arrives to a task is not guaranteed. Sort stability then only matters when reading and sorting data from a file using a single task/partition. Because of limitations in the plugin when you enable stable sorting all of the data for a single task will be combined into a single batch before sorting. This currently disables spilling from GPU memory if the data size is too large.|false|Runtime spark.rapids.sql.suppressPlanningFailure|Option to fallback an individual query to CPU if an unexpected condition prevents the query plan from being converted to a GPU-enabled one. Note this is different from a normal CPU fallback for a yet-to-be-supported Spark SQL feature. If this happens the error should be reported and investigated as a GitHub issue.|false|Runtime spark.rapids.sql.variableFloatAgg.enabled|Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.|true|Runtime +spark.rapids.sql.window.batched.bounded.row.max|Max value for bounded row window preceding/following extents permissible for the window to be evaluated in batched mode. This value affects both the preceding and following bounds, potentially doubling the window size permitted for batched execution|100|Runtime spark.rapids.sql.window.range.byte.enabled|When the order-by column of a range based window is byte type and the range boundary calculated for a value has overflow, CPU and GPU will get the different results. When set to false disables the range window acceleration for the byte type order-by column|false|Runtime spark.rapids.sql.window.range.decimal.enabled|When set to false, this disables the range window acceleration for the DECIMAL type order-by column|true|Runtime spark.rapids.sql.window.range.double.enabled|When set to false, this disables the range window acceleration for the double type order-by column|true|Runtime From 5d475caad6dc05847077450ca521b3d5be5ecf6a Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 13 Dec 2023 11:05:08 -0800 Subject: [PATCH 23/23] Fixed output column order. This fixes the empty output problem. --- .../src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 7ef90723107..6de2eb1f2de 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -179,6 +179,8 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W if (isPostNeeded) { GpuProjectExec(post.toList, windowExpr)() + } else if (windowExpr.output != windowExec.output) { + GpuProjectExec(windowExec.output.toList, windowExpr)() } else { windowExpr }