diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index cf4324578ea87..a9763cfe04539 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -111,8 +111,12 @@ private[streaming] class ReceivedBlockTracker( */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { + // We explicitly create an ArrayBuffer here because at least as of Scala 2.11 and 2.12 + // a mutable.Queue fails serialization with a StackOverflow error if it has more than + // a few thousand elements. So we explicitly allocate a collection for serialization which + // we know doesn't have this issue. (See SPARK-26734). val streamIdToBlocks = streamIds.map { streamId => - (streamId, getReceivedBlockQueue(streamId).clone()) + (streamId, mutable.ArrayBuffer(getReceivedBlockQueue(streamId).clone(): _*)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index bdaef94949159..8800f1c91b20a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -96,6 +96,27 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } + test("block addition, and block to batch allocation with many blocks") { + val receivedBlockTracker = createTracker() + receivedBlockTracker.isWriteAheadLogEnabled should be (true) + + val blockInfos = generateBlockInfos(100000) + blockInfos.map(receivedBlockTracker.addBlock) + receivedBlockTracker.allocateBlocksToBatch(1) + + receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty + receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false) + receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos) + receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos + + val expectedWrittenData1 = blockInfos.map(BlockAdditionEvent) :+ + BatchAllocationEvent(1, AllocatedBlocks(Map(streamId -> blockInfos))) + getWrittenLogData() shouldEqual expectedWrittenData1 + getWriteAheadLogFiles() should have size 1 + + receivedBlockTracker.stop() + } + test("recovery with write ahead logs should remove only allocated blocks from received queue") { val manualClock = new ManualClock val batchTime = manualClock.getTimeMillis() @@ -362,8 +383,8 @@ class ReceivedBlockTrackerSuite } /** Generate blocks infos using random ids */ - def generateBlockInfos(): Seq[ReceivedBlockInfo] = { - List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None, + def generateBlockInfos(blockCount: Int = 5): Seq[ReceivedBlockInfo] = { + List.fill(blockCount)(ReceivedBlockInfo(streamId, Some(0L), None, BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L)))) }