Skip to content

Commit

Permalink
[SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flak…
Browse files Browse the repository at this point in the history
…y when Jenkins load is high

We need to make sure that the last entry is indeed the last entry in the queue.

Author: Burak Yavuz <[email protected]>

Closes apache#10110 from brkyvz/batch-wal-test-fix.
  • Loading branch information
brkyvz authored and tdas committed Dec 7, 2015
1 parent 80a824d commit 6fd9e70
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
var segment: WriteAheadLogRecordHandle = null
if (buffer.length > 0) {
logDebug(s"Batched ${buffer.length} records for Write Ahead Log write")
// threads may not be able to add items in order by time
val sortedByTime = buffer.sortBy(_.time)
// We take the latest record for the timestamp. Please refer to the class Javadoc for
// detailed explanation
val time = buffer.last.time
segment = wrappedLog.write(aggregate(buffer), time)
val time = sortedByTime.last.time
segment = wrappedLog.write(aggregate(sortedByTime), time)
}
buffer.foreach(_.promise.success(segment))
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
p
}

test("BatchedWriteAheadLog - name log with aggregated entries with the timestamp of last entry") {
test("BatchedWriteAheadLog - name log with the highest timestamp of aggregated entries") {
val blockingWal = new BlockingWriteAheadLog(wal, walHandle)
val batchedWal = new BatchedWriteAheadLog(blockingWal, sparkConf)

Expand All @@ -500,8 +500,14 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// rest of the records will be batched while it takes time for 3 to get written
writeAsync(batchedWal, event2, 5L)
writeAsync(batchedWal, event3, 8L)
writeAsync(batchedWal, event4, 12L)
writeAsync(batchedWal, event5, 10L)
// we would like event 5 to be written before event 4 in order to test that they get
// sorted before being aggregated
writeAsync(batchedWal, event5, 12L)
eventually(timeout(1 second)) {
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 3)
}
writeAsync(batchedWal, event4, 10L)
eventually(timeout(1 second)) {
assert(walBatchingThreadPool.getActiveCount === 5)
assert(batchedWal.invokePrivate(queueLength()) === 4)
Expand All @@ -517,7 +523,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// the file name should be the timestamp of the last record, as events should be naturally
// in order of timestamp, and we need the last element.
val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
verify(wal, times(1)).write(bufferCaptor.capture(), meq(12L))
val records = BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
assert(records.toSet === queuedEvents)
}
Expand Down

0 comments on commit 6fd9e70

Please sign in to comment.