From 8427e9ba5cae28233d1bdc54208b46889b83a821 Mon Sep 17 00:00:00 2001 From: Ross Lodge Date: Wed, 6 Feb 2019 08:43:40 -0800 Subject: [PATCH] [SPARK-26734][STREAMING] Fix StackOverflowError with large block queue ## What changes were proposed in this pull request? SPARK-23991 introduced a bug in `ReceivedBlockTracker#allocateBlocksToBatch`: when a queue with more than a few thousand blocks are in the queue, serializing the queue throws a StackOverflowError. This change just adds `dequeueAll` to the new `clone` operation on the queue so that the fix in 23991 is preserved but the serialized data comes from an ArrayBuffer which doesn't have the serialization problems that mutable.Queue has. ## How was this patch tested? A unit test was added. Closes #23716 from rlodge/SPARK-26734. Authored-by: Ross Lodge Signed-off-by: Sean Owen --- .../scheduler/ReceivedBlockTracker.scala | 6 ++++- .../streaming/ReceivedBlockTrackerSuite.scala | 25 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index cf4324578ea87..a9763cfe04539 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -111,8 +111,12 @@ private[streaming] class ReceivedBlockTracker( */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { + // We explicitly create an ArrayBuffer here because at least as of Scala 2.11 and 2.12 + // a mutable.Queue fails serialization with a StackOverflow error if it has more than + // a few thousand elements. So we explicitly allocate a collection for serialization which + // we know doesn't have this issue. (See SPARK-26734). val streamIdToBlocks = streamIds.map { streamId => - (streamId, getReceivedBlockQueue(streamId).clone()) + (streamId, mutable.ArrayBuffer(getReceivedBlockQueue(streamId).clone(): _*)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index bdaef94949159..8800f1c91b20a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -96,6 +96,27 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } + test("block addition, and block to batch allocation with many blocks") { + val receivedBlockTracker = createTracker() + receivedBlockTracker.isWriteAheadLogEnabled should be (true) + + val blockInfos = generateBlockInfos(100000) + blockInfos.map(receivedBlockTracker.addBlock) + receivedBlockTracker.allocateBlocksToBatch(1) + + receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false) + receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos) + receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos + + val expectedWrittenData1 = blockInfos.map(BlockAdditionEvent) :+ + BatchAllocationEvent(1, AllocatedBlocks(Map(streamId -> blockInfos))) + getWrittenLogData() shouldEqual expectedWrittenData1 + getWriteAheadLogFiles() should have size 1 + + receivedBlockTracker.stop() + } + test("recovery with write ahead logs should remove only allocated blocks from received queue") { val manualClock = new ManualClock val batchTime = manualClock.getTimeMillis() @@ -362,8 +383,8 @@ class ReceivedBlockTrackerSuite } /** Generate blocks infos using random ids */ - def generateBlockInfos(): Seq[ReceivedBlockInfo] = { - List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None, + def generateBlockInfos(blockCount: Int = 5): Seq[ReceivedBlockInfo] = { + List.fill(blockCount)(ReceivedBlockInfo(streamId, Some(0L), None, BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L)))) }