Skip to content

Commit

Permalink
[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocat…
Browse files Browse the repository at this point in the history
…eBlocksToBatch

When blocks tried to get allocated to a batch and WAL write fails then the blocks will be removed from the received block queue. This fact simply produces data loss because the next allocation will not find the mentioned blocks in the queue.

In this PR blocks will be removed from the received queue only if WAL write succeded.

Additional unit test.

Author: Gabor Somogyi <[email protected]>

Closes apache#21430 from gaborgsomogyi/SPARK-23991.

Change-Id: I5ead84f0233f0c95e6d9f2854ac2ff6906f6b341
  • Loading branch information
gaborgsomogyi authored and jerryshao committed May 29, 2018
1 parent 23db600 commit aca65c6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ private[streaming] class ReceivedBlockTracker(
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
(streamId, getReceivedBlockQueue(streamId).clone())
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
streamIds.foreach(getReceivedBlockQueue(_).clear())
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps}
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.mockito.Matchers.any
import org.mockito.Mockito.{doThrow, reset, spy}
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
Expand Down Expand Up @@ -115,6 +117,47 @@ class ReceivedBlockTrackerSuite
tracker2.stop()
}

test("block allocation to batch should not loose blocks from received queue") {
val tracker1 = spy(createTracker())
tracker1.isWriteAheadLogEnabled should be (true)
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty

// Add blocks
val blockInfos = generateBlockInfos()
blockInfos.map(tracker1.addBlock)
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos

// Try to allocate the blocks to a batch and verify that it's failing
// The blocks should stay in the received queue when WAL write failing
doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
.when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
val errMsg = intercept[RuntimeException] {
tracker1.allocateBlocksToBatch(1)
}
assert(errMsg.getMessage === "Not able to write BatchAllocationEvent")
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
tracker1.getBlocksOfBatch(1) shouldEqual Map.empty
tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty

// Allocate the blocks to a batch and verify that all of them have been allocated
reset(tracker1)
tracker1.allocateBlocksToBatch(2)
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
tracker1.hasUnallocatedReceivedBlocks should be (false)
tracker1.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
tracker1.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos

tracker1.stop()

// Recover from WAL to see the correctness
val tracker2 = createTracker(recoverFromWriteAheadLog = true)
tracker2.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
tracker2.hasUnallocatedReceivedBlocks should be (false)
tracker2.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
tracker2.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos
tracker2.stop()
}

test("recovery and cleanup with write ahead logs") {
val manualClock = new ManualClock
// Set the time increment level to twice the rotation interval so that every increment creates
Expand Down Expand Up @@ -312,7 +355,7 @@ class ReceivedBlockTrackerSuite
recoverFromWriteAheadLog: Boolean = false,
clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
val tracker = new ReceivedBlockTracker(
var tracker = new ReceivedBlockTracker(
conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
allReceivedBlockTrackers += tracker
tracker
Expand Down

0 comments on commit aca65c6

Please sign in to comment.