From a8b9e827df12bc01375dc3fabe7ac1b24facda48 Mon Sep 17 00:00:00 2001 From: Jim Brennan Date: Fri, 8 Dec 2023 08:57:31 -0600 Subject: [PATCH 1/4] Add handling for CpuSplitAndRetryOOM in InternalRowToColumnarBatchIterator Signed-off-by: Jim Brennan --- .../InternalRowToColumnarBatchIterator.java | 204 +++++++++++------- .../rapids/AbstractGpuJoinIterator.scala | 4 +- .../spark/rapids/GpuRowToColumnarExec.scala | 5 +- .../spark/rapids/RmmRapidsRetryIterator.scala | 81 ++++--- .../nvidia/spark/rapids/WithRetrySuite.scala | 10 +- 5 files changed, 196 insertions(+), 108 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java index e52b0b64634..3bf8a296a78 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java @@ -21,6 +21,7 @@ import java.util.NoSuchElementException; import java.util.Optional; +import com.nvidia.spark.Retryable; import scala.Option; import scala.Tuple2; import scala.collection.Iterator; @@ -54,8 +55,8 @@ public abstract class InternalRowToColumnarBatchIterator implements Iterator { protected final Iterator input; protected UnsafeRow pending = null; - protected final int numRowsEstimate; - protected final long dataLength; + protected int numRowsEstimate = 1; + protected final int sizePerRowEstimate; protected final DType[] rapidsTypes; protected final DataType[] outputTypes; protected final GpuMetric streamTime; @@ -74,10 +75,8 @@ protected InternalRowToColumnarBatchIterator( GpuMetric numOutputRows, GpuMetric numOutputBatches) { this.input = input; - int sizePerRowEstimate = CudfUnsafeRow.getRowSizeEstimate(schema); - numRowsEstimate = (int)Math.max(1, - Math.min(Integer.MAX_VALUE - 1, goal.targetSizeBytes() / sizePerRowEstimate)); - dataLength = ((long) sizePerRowEstimate) * numRowsEstimate; + sizePerRowEstimate = CudfUnsafeRow.getRowSizeEstimate(schema); + numRowsEstimate = calcNumRowsEstimate(goal.targetSizeBytes()); rapidsTypes = new DType[schema.length]; outputTypes = new DataType[schema.length]; @@ -92,6 +91,19 @@ protected InternalRowToColumnarBatchIterator( this.numOutputBatches = numOutputBatches; } + private int calcNumRowsEstimate(long targetBytes) { + return (int) Math.max(1, + Math.min(Integer.MAX_VALUE - 1, targetBytes / sizePerRowEstimate)); + } + private long calcDataLengthEstimate(int numRows) { + return ((long) sizePerRowEstimate) * numRows; + } + + private long calcOffsetLengthEstimate(int numRows) { + int BYTES_PER_OFFSET = DType.INT32.getSizeInBytes(); + return (long)(numRows + 1) * BYTES_PER_OFFSET; + } + @Override public boolean hasNext() { boolean ret = true; @@ -109,11 +121,13 @@ public ColumnarBatch next() { if (!hasNext()) { throw new NoSuchElementException(); } - final int BYTES_PER_OFFSET = DType.INT32.getSizeInBytes(); - long collectStart = System.nanoTime(); + long collectStart = System.nanoTime(); Tuple2 batchAndRange; + AutoCloseableTargetSize numRowsWrapper = + new AutoCloseableTargetSize(numRowsEstimate, 1); + Tuple2 bufsAndRows; // The row formatted data is stored as a column of lists of bytes. The current java CUDF APIs // don't do a great job from a performance standpoint with building this type of data structure @@ -121,75 +135,70 @@ public ColumnarBatch next() { // buffers. One will be for the byte data and the second will be for the offsets. We will then // write the data directly into those buffers using code generation in a child of this class. // that implements fillBatch. - HostMemoryBuffer db = - RmmRapidsRetryIterator.withRetryNoSplit( () -> { - return HostAlloc$.MODULE$.alloc(dataLength, true); - }); + bufsAndRows = + // Starting with initial num rows estimate, this retry block will + // recalculate the buffer sizes from the rows estimate, which is split + // in half if we get a split and retry oom, until we hit the min of 1 row. + RmmRapidsRetryIterator.withRetry(numRowsWrapper, + RmmRapidsRetryIterator.splitTargetSizeInHalfCpu(), (numRows) -> { + return allocBuffersWithRestore(numRows); + }).next(); + // Update our estimate for number of rows. + numRowsEstimate = (int) bufsAndRows._2.targetSize(); + long dataLength = calcDataLengthEstimate(numRowsEstimate); try ( - SpillableHostBuffer sdb = SpillableHostBuffer$.MODULE$.apply(db, db.getLength(), - SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(), - RapidsBufferCatalog$.MODULE$.singleton()); + SpillableHostBuffer sdb = bufsAndRows._1[0]; + SpillableHostBuffer sob = bufsAndRows._1[1]; ) { - HostMemoryBuffer ob = - RmmRapidsRetryIterator.withRetryNoSplit( () -> { - return HostAlloc$.MODULE$.alloc( - ((long) numRowsEstimate + 1) * BYTES_PER_OFFSET, true); - }); - try ( - SpillableHostBuffer sob = SpillableHostBuffer$.MODULE$.apply(ob, ob.getLength(), - SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(), - RapidsBufferCatalog$.MODULE$.singleton()); - ) { - // Fill in buffer under write lock for host buffers - int[] used = sdb.withHostBufferWriteLock( (dataBuffer) -> { - return sob.withHostBufferWriteLock( (offsetsBuffer) -> { - return fillBatch(dataBuffer, offsetsBuffer); - }); + // Fill in buffer under write lock for host buffers + int[] used = sdb.withHostBufferWriteLock( (dataBuffer) -> { + return sob.withHostBufferWriteLock( (offsetsBuffer) -> { + return fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate); }); - batchAndRange = sdb.withHostBufferReadOnly( (dataBuffer) -> { - return sob.withHostBufferReadOnly( (offsetsBuffer) -> { - int dataOffset = used[0]; - int currentRow = used[1]; - // We don't want to loop forever trying to copy nothing - assert (currentRow > 0); - if (numInputRows != null) { - numInputRows.add(currentRow); - } - if (numOutputRows != null) { - numOutputRows.add(currentRow); - } - if (numOutputBatches != null) { - numOutputBatches.add(1); - } - // Now that we have filled the buffers with the data, we need to turn them into a - // HostColumnVector and copy them to the device so the GPU can turn it into a Table. - // To do this we first need to make a HostColumnCoreVector for the data, and then - // put that into a HostColumnVector as its child. This the basics of building up - // a column of lists of bytes in CUDF but it is typically hidden behind the higer level - // APIs. - dataBuffer.incRefCount(); - offsetsBuffer.incRefCount(); - try (HostColumnVectorCore dataCv = - new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L), - dataBuffer, null, null, new ArrayList<>()); - HostColumnVector hostColumn = new HostColumnVector(DType.LIST, - currentRow, Optional.of(0L), null, null, - offsetsBuffer, Collections.singletonList(dataCv))) { - - long ct = System.nanoTime() - collectStart; - streamTime.add(ct); - - // Grab the semaphore because we are about to put data onto the GPU. - GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get()); - NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN, - Option.apply(opTime)); - ColumnVector devColumn = - RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice); - return Tuple2.apply(makeSpillableBatch(devColumn), range); - } - }); + }); + batchAndRange = sdb.withHostBufferReadOnly( (dataBuffer) -> { + return sob.withHostBufferReadOnly( (offsetsBuffer) -> { + int dataOffset = used[0]; + int currentRow = used[1]; + // We don't want to loop forever trying to copy nothing + assert (currentRow > 0); + if (numInputRows != null) { + numInputRows.add(currentRow); + } + if (numOutputRows != null) { + numOutputRows.add(currentRow); + } + if (numOutputBatches != null) { + numOutputBatches.add(1); + } + // Now that we have filled the buffers with the data, we need to turn them into a + // HostColumnVector and copy them to the device so the GPU can turn it into a Table. + // To do this we first need to make a HostColumnCoreVector for the data, and then + // put that into a HostColumnVector as its child. This the basics of building up + // a column of lists of bytes in CUDF but it is typically hidden behind the higer level + // APIs. + dataBuffer.incRefCount(); + offsetsBuffer.incRefCount(); + try (HostColumnVectorCore dataCv = + new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L), + dataBuffer, null, null, new ArrayList<>()); + HostColumnVector hostColumn = new HostColumnVector(DType.LIST, + currentRow, Optional.of(0L), null, null, + offsetsBuffer, Collections.singletonList(dataCv))) { + + long ct = System.nanoTime() - collectStart; + streamTime.add(ct); + + // Grab the semaphore because we are about to put data onto the GPU. + GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get()); + NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN, + Option.apply(opTime)); + ColumnVector devColumn = + RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice); + return Tuple2.apply(makeSpillableBatch(devColumn), range); + } }); - } + }); } try (NvtxRange ignored = batchAndRange._2; Table tab = @@ -202,6 +211,50 @@ public ColumnarBatch next() { } } + private Tuple2 + allocBuffers(HostMemoryBuffer[] hBufs, SpillableHostBuffer[] sBufs, + AutoCloseableTargetSize numRowsWrapper) { + long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize()); + long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize()); + hBufs[0] = HostAlloc$.MODULE$.alloc(dataBytes, true); + sBufs[0] = SpillableHostBuffer$.MODULE$.apply(hBufs[0], hBufs[0].getLength(), + SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(), + RapidsBufferCatalog$.MODULE$.singleton()); + hBufs[0] = null; // Was closed by spillable + hBufs[1] = HostAlloc$.MODULE$.alloc(offsetBytes, true); + sBufs[1] = SpillableHostBuffer$.MODULE$.apply(hBufs[1], hBufs[1].getLength(), + SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(), + RapidsBufferCatalog$.MODULE$.singleton()); + hBufs[1] = null; // Was closed by spillable + return Tuple2.apply(sBufs, numRowsWrapper); + } + + private Tuple2 + allocBuffersWithRestore(AutoCloseableTargetSize numRows) { + HostMemoryBuffer[] hostBufs = new HostMemoryBuffer[]{null, null }; + SpillableHostBuffer[] spillableBufs = new SpillableHostBuffer[]{null, null}; + Retryable retryBufs = new Retryable() { + @Override + public void checkpoint() {} + @Override + public void restore() { + for (int i = 0; i < spillableBufs.length; i++) { + if (spillableBufs[i] != null) { + spillableBufs[i].close(); + spillableBufs[i] = null; + } + if (hostBufs[i] != null) { + hostBufs[i].close(); + hostBufs[i] = null; + } + } + } + }; + return RmmRapidsRetryIterator.withRestoreOnRetry(retryBufs, () -> { + return allocBuffers(hostBufs, spillableBufs, numRows); + }); + } + /** * Take our device column of encoded rows and turn it into a spillable columnar batch. * This allows us to go into a retry block and be able to roll back our work. @@ -244,8 +297,11 @@ protected Table convertFromRowsUnderRetry(ColumnarBatch cb) { * virtual function call per batch instead of one per row. * @param dataBuffer the data buffer to populate * @param offsetsBuffer the offsets buffer to populate + * @param dataLength the data length corresponding to the current rows estimate. + * @param numRows the number of rows we can fill * @return an array of ints where the first index is the amount of data in bytes copied into * the data buffer and the second index is the number of rows copied into the buffers. */ - public abstract int[] fillBatch(HostMemoryBuffer dataBuffer, HostMemoryBuffer offsetsBuffer); + public abstract int[] fillBatch(HostMemoryBuffer dataBuffer, HostMemoryBuffer offsetsBuffer, + long dataLength, int numRows); } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala index cee705d8f8e..37d636bf82e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{GatherMap, NvtxColor, OutOfBoundsPolicy} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalf, withRestoreOnRetry, withRetry} +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalfGpu, withRestoreOnRetry, withRetry} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.spark.TaskContext @@ -148,7 +148,7 @@ abstract class AbstractGpuJoinIterator( // less from the gatherer, but because the gatherer tracks how much is used, the // next call to this function will start in the right place. gather.checkpoint() - withRetry(targetSizeWrapper, splitTargetSizeInHalf) { attempt => + withRetry(targetSizeWrapper, splitTargetSizeInHalfGpu) { attempt => withRestoreOnRetry(gather) { val nextRows = JoinGatherer.getRowsInNextBatch(gather, attempt.targetSize) gather.gatherNext(nextRows) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index 8586e4c3dbe..f399bdeb20f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -787,7 +787,8 @@ object GeneratedInternalRowToCudfRowIterator extends Logging { | // of a row at a time. | @Override | public int[] fillBatch(ai.rapids.cudf.HostMemoryBuffer dataBuffer, - | ai.rapids.cudf.HostMemoryBuffer offsetsBuffer) { + | ai.rapids.cudf.HostMemoryBuffer offsetsBuffer, + | long dataLength, int numRows) { | final long dataBaseAddress = dataBuffer.getAddress(); | final long endDataAddress = dataBaseAddress + dataLength; | @@ -820,7 +821,7 @@ object GeneratedInternalRowToCudfRowIterator extends Logging { | } else { | currentRow += 1; | dataOffset += numBytesUsedByRow; - | done = !(currentRow < numRowsEstimate && + | done = !(currentRow < numRows && | dataOffset < dataLength && | input.hasNext()); | } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala index 78fe8c7b31d..4973e276bb8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala @@ -51,10 +51,10 @@ object RmmRapidsRetryIterator extends Logging { * `fn` must be idempotent: this is a requirement because we may call `fn` multiple times * while handling retries. * - * @param input an iterator of T + * @param input an iterator of T * @param splitPolicy a function that can split an item of type T into a Seq[T]. The split * function must close the item passed to it. - * @param fn the work to perform. Takes T and produces an output K + * @param fn the work to perform. Takes T and produces an output K * @tparam T element type that must be AutoCloseable (likely `SpillableColumnarBatch`) * @tparam K `fn` result type * @return an iterator of K @@ -84,10 +84,10 @@ object RmmRapidsRetryIterator extends Logging { * `fn` must be idempotent: this is a requirement because we may call `fn` multiple times * while handling retries. * - * @param input a single item T + * @param input a single item T * @param splitPolicy a function that can split an item of type T into a Seq[T]. The split * function must close the item passed to it. - * @param fn the work to perform. Takes T and produces an output K + * @param fn the work to perform. Takes T and produces an output K * @tparam T element type that must be AutoCloseable (likely `SpillableColumnarBatch`) * @tparam K `fn` result type * @return an iterator of K @@ -117,8 +117,8 @@ object RmmRapidsRetryIterator extends Logging { * `fn` must be idempotent: this is a requirement because we may call `fn` multiple times * while handling retries. * - * @param input a single item T - * @param fn the work to perform. Takes T and produces an output K + * @param input a single item T + * @param fn the work to perform. Takes T and produces an output K * @tparam T element type that must be AutoCloseable (likely `SpillableColumnarBatch`) * @tparam K `fn` result type * @return a single item of type K @@ -148,8 +148,8 @@ object RmmRapidsRetryIterator extends Logging { * `fn` must be idempotent: this is a requirement because we may call `fn` multiple times * while handling retries. * - * @param input a single item T - * @param fn the work to perform. Takes T and produces an output K + * @param input a single item T + * @param fn the work to perform. Takes T and produces an output K * @tparam T element type that must be AutoCloseable (likely `SpillableColumnarBatch`) * @tparam K `fn` result type * @return a single item of type K @@ -296,11 +296,12 @@ object RmmRapidsRetryIterator extends Logging { /** * AutoCloseable wrapper on Seq[T], returning a Seq[T] that can be closed. + * * @param ts the Seq to wrap * @tparam T the type of the items in `ts` */ private case class AutoCloseableSeqInternal[T <: AutoCloseable](ts: Seq[T]) - extends Seq[T] with AutoCloseable{ + extends Seq[T] with AutoCloseable { override def close(): Unit = { ts.foreach(_.safeClose()) } @@ -315,18 +316,22 @@ object RmmRapidsRetryIterator extends Logging { /** * An iterator of a single item that is able to close if .next * has not been called on it. + * * @param ts the AutoCloseable item to close if this iterator hasn't been drained * @tparam T the type of `ts`, must be AutoCloseable */ private case class SingleItemAutoCloseableIteratorInternal[T <: AutoCloseable](ts: T) - extends Iterator[T] with AutoCloseable { + extends Iterator[T] with AutoCloseable { private var wasCalledSuccessfully = false + override def hasNext: Boolean = !wasCalledSuccessfully + override def next(): T = { wasCalledSuccessfully = true ts } + override def close(): Unit = { if (!wasCalledSuccessfully) { ts.close() @@ -353,6 +358,7 @@ object RmmRapidsRetryIterator extends Logging { * using as an input), usually by splitting a batch in half by number of rows, or * splitting a collection of batches into smaller collections to be attempted separately, * likely reducing GPU memory that needs to be manifested while calling `.next`. + * * @param isFromGpuOom true if the split happened because of a GPU OOM. Otherwise it was a * CPU off heap OOM. */ @@ -366,6 +372,7 @@ object RmmRapidsRetryIterator extends Logging { /** * A spliterator that doesn't take any inputs, hence it is "empty", and it doesn't know * how to split. It allows the caller to call the function `fn` once on `next`. + * * @param fn the work to perform. It is a function that takes nothing and produces K * @tparam K the resulting type */ @@ -413,8 +420,8 @@ object RmmRapidsRetryIterator extends Logging { * * @tparam T element type that must be AutoCloseable * @tparam K `fn` result type - * @param input an iterator of T - * @param fn a function that takes T and produces K + * @param input an iterator of T + * @param fn a function that takes T and produces K * @param splitPolicy a function that can split an item of type T into a Seq[T]. The split * function must close the item passed to it. */ @@ -626,7 +633,7 @@ object RmmRapidsRetryIterator extends Logging { throw lastException } } - // else another exception wrapped a retry. So we are going to try again + // else another exception wrapped a retry. So we are going to try again } } if (result.isEmpty) { @@ -682,22 +689,46 @@ object RmmRapidsRetryIterator extends Logging { } } + private def splitTargetSizeInHalfInternal( + target: AutoCloseableTargetSize, isGpu: Boolean): Seq[AutoCloseableTargetSize] = { + withResource(target) { _ => + val newTarget = target.targetSize / 2 + if (newTarget < target.minSize) { + if (isGpu) { + throw new GpuSplitAndRetryOOM( + s"GPU OutOfMemory: targetSize: ${target.targetSize} cannot be split further!" + + s" minimum: ${target.minSize}") + } else { + throw new CpuSplitAndRetryOOM( + s"CPU OutOfMemory: targetSize: ${target.targetSize} cannot be split further!" + + s" minimum: ${target.minSize}") + } + } + Seq(AutoCloseableTargetSize(newTarget, target.minSize)) + } + } + /** * A common split function for an AutoCloseableTargetSize, which just divides the target size * in half, and creates a seq with just one element representing the new target size. * @return a Seq[AutoCloseableTargetSize] with 1 element. + * @throws GpuSplitAndRetryOOM if it reaches the split limit. */ - def splitTargetSizeInHalf: AutoCloseableTargetSize => Seq[AutoCloseableTargetSize] = + def splitTargetSizeInHalfGpu: AutoCloseableTargetSize => Seq[AutoCloseableTargetSize] = (target: AutoCloseableTargetSize) => { - withResource(target) { _ => - val newTarget = target.targetSize / 2 - if (newTarget < target.minSize) { - throw new GpuSplitAndRetryOOM( - s"GPU OutOfMemory: targetSize: ${target.targetSize} cannot be split further!" + - s" minimum: ${target.minSize}") - } - Seq(AutoCloseableTargetSize(newTarget, target.minSize)) - } + splitTargetSizeInHalfInternal(target, true) + } + + /** + * A common split function for an AutoCloseableTargetSize, which just divides the target size + * in half, and creates a seq with just one element representing the new target size. + * + * @return a Seq[AutoCloseableTargetSize] with 1 element. + * @throws CpuSplitAndRetryOOM if it reaches the split limit. + */ + def splitTargetSizeInHalfCpu: AutoCloseableTargetSize => Seq[AutoCloseableTargetSize] = + (target: AutoCloseableTargetSize) => { + splitTargetSizeInHalfInternal(target, false) } } @@ -705,8 +736,8 @@ object RmmRapidsRetryIterator extends Logging { * This is a wrapper that turns a target size into an autocloseable to allow it to be used * in withRetry blocks. It is intended to be used to help with cases where the split calculation * happens inside the retry block, and depends on the target size. On a `GpuSplitAndRetryOOM` or - * `CpuSplitAndRetryOOM`, a split policy like `splitTargetSizeInHalf` can be used to retry the - * block with a smaller target size. + * `CpuSplitAndRetryOOM`, a split policy like `splitTargetSizeInHalfGpu` or + * `splitTargetSizeInHalfCpu` can be used to retry the block with a smaller target size. */ case class AutoCloseableTargetSize(targetSize: Long, minSize: Long) extends AutoCloseable { override def close(): Unit = () diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala index 73618bd81ef..aa003c454f1 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WithRetrySuite.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{Rmm, RmmAllocationMode, RmmEventHandler, Table} import com.nvidia.spark.Retryable import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalf, withRestoreOnRetry, withRetry, withRetryNoSplit} +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalfGpu, withRestoreOnRetry, withRetry, withRetryNoSplit} import com.nvidia.spark.rapids.jni.{GpuRetryOOM, GpuSplitAndRetryOOM, RmmSpark} import org.mockito.Mockito._ import org.scalatest.BeforeAndAfterEach @@ -245,7 +245,7 @@ class WithRetrySuite } } - test("splitTargetSizeInHalf splits for AutoCloseableTargetSize") { + test("splitTargetSizeInHalfGpu splits for AutoCloseableTargetSize") { val initialValue = 20L val minValue = 5L val numSplits = 2 @@ -253,7 +253,7 @@ class WithRetrySuite var lastSplitSize = 0L val myTarget = AutoCloseableTargetSize(initialValue, minValue) try { - withRetry(myTarget, splitTargetSizeInHalf) { attempt => + withRetry(myTarget, splitTargetSizeInHalfGpu) { attempt => lastSplitSize = attempt.targetSize if (doThrow > 0) { doThrow = doThrow - 1 @@ -266,7 +266,7 @@ class WithRetrySuite } } - test("splitTargetSizeInHalf on AutoCloseableTargetSize throws if limit reached") { + test("splitTargetSizeInHalfGpu on AutoCloseableTargetSize throws if limit reached") { val initialValue = 20L val minValue = 5L val numSplits = 3 @@ -275,7 +275,7 @@ class WithRetrySuite val myTarget = AutoCloseableTargetSize(initialValue, minValue) try { assertThrows[GpuSplitAndRetryOOM] { - withRetry(myTarget, splitTargetSizeInHalf) { attempt => + withRetry(myTarget, splitTargetSizeInHalfGpu) { attempt => lastSplitSize = attempt.targetSize if (doThrow > 0) { doThrow = doThrow - 1 From f242884f20dd9b138a9c6afad10ae180890c4dd4 Mon Sep 17 00:00:00 2001 From: Jim Brennan Date: Fri, 8 Dec 2023 16:59:20 -0600 Subject: [PATCH 2/4] Workaround spill issue --- .../InternalRowToColumnarBatchIterator.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java index 3bf8a296a78..492511f4714 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java @@ -127,7 +127,7 @@ public ColumnarBatch next() { Tuple2 batchAndRange; AutoCloseableTargetSize numRowsWrapper = new AutoCloseableTargetSize(numRowsEstimate, 1); - Tuple2 bufsAndRows; + Tuple2 bufsAndNumRows; // The row formatted data is stored as a column of lists of bytes. The current java CUDF APIs // don't do a great job from a performance standpoint with building this type of data structure @@ -135,7 +135,7 @@ public ColumnarBatch next() { // buffers. One will be for the byte data and the second will be for the offsets. We will then // write the data directly into those buffers using code generation in a child of this class. // that implements fillBatch. - bufsAndRows = + bufsAndNumRows = // Starting with initial num rows estimate, this retry block will // recalculate the buffer sizes from the rows estimate, which is split // in half if we get a split and retry oom, until we hit the min of 1 row. @@ -143,21 +143,17 @@ public ColumnarBatch next() { RmmRapidsRetryIterator.splitTargetSizeInHalfCpu(), (numRows) -> { return allocBuffersWithRestore(numRows); }).next(); - // Update our estimate for number of rows. - numRowsEstimate = (int) bufsAndRows._2.targetSize(); + // Update our estimate for number of rows with the final size used to allocate the buffers. + numRowsEstimate = (int) bufsAndNumRows._2.targetSize(); long dataLength = calcDataLengthEstimate(numRowsEstimate); try ( - SpillableHostBuffer sdb = bufsAndRows._1[0]; - SpillableHostBuffer sob = bufsAndRows._1[1]; + SpillableHostBuffer sdb = bufsAndNumRows._1[0]; + SpillableHostBuffer sob = bufsAndNumRows._1[1]; ) { // Fill in buffer under write lock for host buffers - int[] used = sdb.withHostBufferWriteLock( (dataBuffer) -> { + batchAndRange = sdb.withHostBufferWriteLock( (dataBuffer) -> { return sob.withHostBufferWriteLock( (offsetsBuffer) -> { - return fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate); - }); - }); - batchAndRange = sdb.withHostBufferReadOnly( (dataBuffer) -> { - return sob.withHostBufferReadOnly( (offsetsBuffer) -> { + int[] used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate); int dataOffset = used[0]; int currentRow = used[1]; // We don't want to loop forever trying to copy nothing From 15cef09cc6ef36642a78f22f8b85f2955d00161a Mon Sep 17 00:00:00 2001 From: Jim Brennan Date: Mon, 11 Dec 2023 09:07:37 -0600 Subject: [PATCH 3/4] Fix for targetSize in HostAlloc synchronous spill call and some cleanup --- .../spark/rapids/InternalRowToColumnarBatchIterator.java | 6 +++--- .../src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java index 492511f4714..4f8717c620b 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java @@ -136,9 +136,9 @@ public ColumnarBatch next() { // write the data directly into those buffers using code generation in a child of this class. // that implements fillBatch. bufsAndNumRows = - // Starting with initial num rows estimate, this retry block will - // recalculate the buffer sizes from the rows estimate, which is split - // in half if we get a split and retry oom, until we hit the min of 1 row. + // Starting with initial num rows estimate, this retry block will recalculate the buffer + // sizes from the rows estimate, which is split in half if we get a split and retry oom, + // until we succeed or hit the min of 1 row. RmmRapidsRetryIterator.withRetry(numRowsWrapper, RmmRapidsRetryIterator.splitTargetSizeInHalfCpu(), (numRows) -> { return allocBuffersWithRestore(numRows); diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala index 7da626e21b0..cefdfa8766e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala @@ -144,7 +144,7 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L logDebug(s"Targeting host store size of $targetSize bytes") // We could not make it work so try and spill enough to make it work val maybeAmountSpilled = - RapidsBufferCatalog.synchronousSpill(RapidsBufferCatalog.getHostStorage, allocSize) + RapidsBufferCatalog.synchronousSpill(RapidsBufferCatalog.getHostStorage, targetSize) maybeAmountSpilled.foreach { amountSpilled => logInfo(s"Spilled $amountSpilled bytes from the host store") } From b73fbd8f3938b11b13632787d13e707a35c76a1a Mon Sep 17 00:00:00 2001 From: Jim Brennan Date: Mon, 11 Dec 2023 14:59:43 -0600 Subject: [PATCH 4/4] Address review comments --- .../InternalRowToColumnarBatchIterator.java | 62 ++++++++++++------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java index 4f8717c620b..a1f878cb078 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java @@ -92,9 +92,10 @@ protected InternalRowToColumnarBatchIterator( } private int calcNumRowsEstimate(long targetBytes) { - return (int) Math.max(1, - Math.min(Integer.MAX_VALUE - 1, targetBytes / sizePerRowEstimate)); + return Math.max(1, + Math.min(Integer.MAX_VALUE - 1, (int) (targetBytes / sizePerRowEstimate))); } + private long calcDataLengthEstimate(int numRows) { return ((long) sizePerRowEstimate) * numRows; } @@ -208,27 +209,43 @@ public ColumnarBatch next() { } private Tuple2 - allocBuffers(HostMemoryBuffer[] hBufs, SpillableHostBuffer[] sBufs, - AutoCloseableTargetSize numRowsWrapper) { - long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize()); - long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize()); - hBufs[0] = HostAlloc$.MODULE$.alloc(dataBytes, true); - sBufs[0] = SpillableHostBuffer$.MODULE$.apply(hBufs[0], hBufs[0].getLength(), - SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(), - RapidsBufferCatalog$.MODULE$.singleton()); - hBufs[0] = null; // Was closed by spillable - hBufs[1] = HostAlloc$.MODULE$.alloc(offsetBytes, true); - sBufs[1] = SpillableHostBuffer$.MODULE$.apply(hBufs[1], hBufs[1].getLength(), - SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(), - RapidsBufferCatalog$.MODULE$.singleton()); - hBufs[1] = null; // Was closed by spillable - return Tuple2.apply(sBufs, numRowsWrapper); + allocBuffers(SpillableHostBuffer[] sBufs, AutoCloseableTargetSize numRowsWrapper) { + HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null }; + try { + long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize()); + long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize()); + hBufs[0] = HostAlloc$.MODULE$.alloc(dataBytes, true); + sBufs[0] = SpillableHostBuffer$.MODULE$.apply(hBufs[0], hBufs[0].getLength(), + SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(), + RapidsBufferCatalog$.MODULE$.singleton()); + hBufs[0] = null; // Was closed by spillable + hBufs[1] = HostAlloc$.MODULE$.alloc(offsetBytes, true); + sBufs[1] = SpillableHostBuffer$.MODULE$.apply(hBufs[1], hBufs[1].getLength(), + SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(), + RapidsBufferCatalog$.MODULE$.singleton()); + hBufs[1] = null; // Was closed by spillable + return Tuple2.apply(sBufs, numRowsWrapper); + } finally { + // Make sure host buffers are always closed + for (int i = 0; i < hBufs.length; i++) { + if (hBufs[i] != null) { + hBufs[i].close(); + hBufs[i] = null; + } + } + // If the second spillable buffer is null, we must have thrown, + // so we need to close the first one in case this is not a retry exception. + // Restore on retry is handled by the caller. + if ((sBufs[1] == null) && (sBufs[0] != null)) { + sBufs[0].close(); + sBufs[0] = null; + } + } } private Tuple2 allocBuffersWithRestore(AutoCloseableTargetSize numRows) { - HostMemoryBuffer[] hostBufs = new HostMemoryBuffer[]{null, null }; - SpillableHostBuffer[] spillableBufs = new SpillableHostBuffer[]{null, null}; + SpillableHostBuffer[] spillableBufs = new SpillableHostBuffer[]{ null, null}; Retryable retryBufs = new Retryable() { @Override public void checkpoint() {} @@ -239,15 +256,12 @@ public void restore() { spillableBufs[i].close(); spillableBufs[i] = null; } - if (hostBufs[i] != null) { - hostBufs[i].close(); - hostBufs[i] = null; - } } } }; + return RmmRapidsRetryIterator.withRestoreOnRetry(retryBufs, () -> { - return allocBuffers(hostBufs, spillableBufs, numRows); + return allocBuffers(spillableBufs, numRows); }); }