Skip to content

Commit

Permalink
[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocat…
Browse files Browse the repository at this point in the history
…eBlocksToBatch
  • Loading branch information
gaborgsomogyi committed May 25, 2018
1 parent 64fad0b commit 2d35dfa
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,13 @@ private[streaming] class ReceivedBlockTracker(
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
(streamId, getReceivedBlockQueue(streamId).clone())
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
streamIds.foreach {
getReceivedBlockQueue(_).clear()
}
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps}
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.mockito.Matchers.any
import org.mockito.Mockito.{doThrow, reset, spy}
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
Expand Down Expand Up @@ -115,6 +117,50 @@ class ReceivedBlockTrackerSuite
tracker2.stop()
}

test("block allocation to batch should not loose blocks from received queue") {
val tracker1 = createTracker(createSpyTracker = true)
tracker1.isWriteAheadLogEnabled should be (true)
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty

// Add blocks
val blockInfos = generateBlockInfos()
blockInfos.map(tracker1.addBlock)
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos

// Try to allocate the blocks to a batch and verify that it's failing
// The blocks should stay in the received queue when WAL write failing
doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
.when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
try {
tracker1.allocateBlocksToBatch(1)
assert(false)
} catch {
case _: RuntimeException =>
// Nothing to do here
}
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
tracker1.getBlocksOfBatch(1) shouldEqual Map.empty
tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty

// Allocate the blocks to a batch and verify that all of them have been allocated
reset(tracker1)
tracker1.allocateBlocksToBatch(2)
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
tracker1.hasUnallocatedReceivedBlocks should be (false)
tracker1.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
tracker1.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos

tracker1.stop()

// Recover from WAL to see the correctness
val tracker2 = createTracker(recoverFromWriteAheadLog = true)
tracker2.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
tracker2.hasUnallocatedReceivedBlocks should be (false)
tracker2.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
tracker2.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos
tracker2.stop()
}

test("recovery and cleanup with write ahead logs") {
val manualClock = new ManualClock
// Set the time increment level to twice the rotation interval so that every increment creates
Expand Down Expand Up @@ -308,12 +354,16 @@ class ReceivedBlockTrackerSuite
* want to control time by manually incrementing it to test log clean.
*/
def createTracker(
createSpyTracker: Boolean = false,
setCheckpointDir: Boolean = true,
recoverFromWriteAheadLog: Boolean = false,
clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
val tracker = new ReceivedBlockTracker(
var tracker = new ReceivedBlockTracker(
conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
if (createSpyTracker) {
tracker = spy(tracker)
}
allReceivedBlockTrackers += tracker
tracker
}
Expand Down

0 comments on commit 2d35dfa

Please sign in to comment.