Skip to content

Commit

Permalink
Use PrivateMethodTester on check that delegate stream is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
massie committed Jun 18, 2015
1 parent 4ea1712 commit a011bfa
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ final class ShuffleBlockFetcherIterator(
* Note: the delegate parameter is private[storage] to make it available to tests.
*/
private class BufferReleasingInputStream(
private[storage] val delegate: InputStream,
iterator: ShuffleBlockFetcherIterator)
private val delegate: InputStream,
private val iterator: ShuffleBlockFetcherIterator)
extends InputStream {
private var closed = false
private[this] var closed = false

override def read(): Int = delegate.read()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.PrivateMethodTester

import org.apache.spark.{SparkFunSuite, TaskContextImpl}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.shuffle.BlockFetchingListener


class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodTester {
// Some of the tests are quite tricky because we are testing the cleanup behavior
// in the presence of faults.

Expand Down Expand Up @@ -113,13 +114,15 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
// Note: ShuffleBlockFetcherIterator wraps input streams in a BufferReleasingInputStream
val wrappedInputStream = inputStream.get.asInstanceOf[BufferReleasingInputStream]
verify(mockBuf, times(0)).release()
verify(wrappedInputStream.delegate, times(0)).close()
val delegateAccess = PrivateMethod[InputStream]('delegate)

verify(wrappedInputStream.invokePrivate(delegateAccess()), times(0)).close()
wrappedInputStream.close()
verify(mockBuf, times(1)).release()
verify(wrappedInputStream.delegate, times(1)).close()
verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
wrappedInputStream.close() // close should be idempotent
verify(mockBuf, times(1)).release()
verify(wrappedInputStream.delegate, times(1)).close()
verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
}

// 3 local blocks, and 2 remote blocks
Expand Down

0 comments on commit a011bfa

Please sign in to comment.