Skip to content

Commit

Permalink
[SPARK-26734][STREAMING] Fix StackOverflowError with large block queue
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError.  This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has.

## How was this patch tested?

A unit test was added.

Closes #23716 from rlodge/SPARK-26734.

Authored-by: Ross Lodge <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 8427e9b)
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
rlodge authored and srowen committed Feb 6, 2019
1 parent 9c78669 commit 38ade42
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ private[streaming] class ReceivedBlockTracker(
*/
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
// We explicitly create an ArrayBuffer here because at least as of Scala 2.11 and 2.12
// a mutable.Queue fails serialization with a StackOverflow error if it has more than
// a few thousand elements. So we explicitly allocate a collection for serialization which
// we know doesn't have this issue. (See SPARK-26734).
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).clone())
(streamId, mutable.ArrayBuffer(getReceivedBlockQueue(streamId).clone(): _*))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}

test("block addition, and block to batch allocation with many blocks") {
val receivedBlockTracker = createTracker()
receivedBlockTracker.isWriteAheadLogEnabled should be (true)

val blockInfos = generateBlockInfos(100000)
blockInfos.map(receivedBlockTracker.addBlock)
receivedBlockTracker.allocateBlocksToBatch(1)

receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false)
receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos)
receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos

val expectedWrittenData1 = blockInfos.map(BlockAdditionEvent) :+
BatchAllocationEvent(1, AllocatedBlocks(Map(streamId -> blockInfos)))
getWrittenLogData() shouldEqual expectedWrittenData1
getWriteAheadLogFiles() should have size 1

receivedBlockTracker.stop()
}

test("recovery with write ahead logs should remove only allocated blocks from received queue") {
val manualClock = new ManualClock
val batchTime = manualClock.getTimeMillis()
Expand Down Expand Up @@ -362,8 +383,8 @@ class ReceivedBlockTrackerSuite
}

/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
def generateBlockInfos(blockCount: Int = 5): Seq[ReceivedBlockInfo] = {
List.fill(blockCount)(ReceivedBlockInfo(streamId, Some(0L), None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}

Expand Down

0 comments on commit 38ade42

Please sign in to comment.