Skip to content

Commit

Permalink
address exception leaking in send channel builder
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoferrer committed Mar 6, 2019
1 parent 25ad202 commit 5b8dfbe
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal fun <RespT> CoroutineScope.newSendChannelFromObserver(
capacity: Int = 1
): SendChannel<RespT> =
actor<RespT>(
context = Dispatchers.Unconfined,
context = observer.exceptionHandler + Dispatchers.Unconfined,
capacity = capacity,
start = CoroutineStart.LAZY
) {
Expand All @@ -45,6 +45,7 @@ internal fun <RespT> CoroutineScope.newSendChannelFromObserver(
invokeOnClose(observer.completionHandler)
}


internal fun <ReqT, RespT> CoroutineScope.newManagedServerResponseChannel(
responseObserver: ServerCallStreamObserver<RespT>,
isMessagePreloaded: AtomicBoolean,
Expand Down Expand Up @@ -76,12 +77,17 @@ internal fun CoroutineScope.bindScopeCancellationToCall(call: ClientCall<*, *>){
}
}

internal val StreamObserver<*>.exceptionHandler: CoroutineExceptionHandler
get() = CoroutineExceptionHandler { _, e ->
runBlocking{ onError(e.toRpcException()) }
}

internal val StreamObserver<*>.completionHandler: CompletionHandler
get() = {
// If the call was cancelled already
// the stream observer will throw
runCatching {
if(it != null)
if (it != null)
onError(it.toRpcException()) else
onCompleted()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import kotlin.test.*
class NewSendChannelFromObserverTests {

@Test
fun `Test channel send to observer success`() = runBlocking {
fun `Channel send to observer success`() = runBlocking {

val observer = mockk<StreamObserver<Int>>().apply {
every { onNext(allAny()) } just Runs
Expand All @@ -50,36 +50,94 @@ class NewSendChannelFromObserverTests {
}

verify(exactly = 3) { observer.onNext(allAny()) }
verify(exactly = 1) { observer.onCompleted() }
}

@Test
fun `Test channel close with error`() = runBlocking {
fun `Channel close with error`() = runBlocking {

val statusException = Status.INVALID_ARGUMENT.asException()
val observer = mockk<StreamObserver<String>>().apply {
every { onNext(allAny()) } just Runs
every { onError(statusException) } just Runs
}

GlobalScope.newSendChannelFromObserver(observer).apply {
val channel = GlobalScope.newSendChannelFromObserver(observer).apply {
send("")
close(statusException)
}

assert(channel.isClosedForSend){ "Channel should be closed for send" }
verify(exactly = 1) { observer.onNext(allAny()) }
verify(atLeast = 1) { observer.onError(statusException) }
verify(exactly = 0) { observer.onCompleted() }
}


@Test
fun `Channel is closed when scope is cancelled normally`() {

val observer = mockk<StreamObserver<String>>().apply {
every { onNext(allAny()) } just Runs
every { onError(any()) } just Runs
}

lateinit var channel: SendChannel<String>
runBlocking {
launch {
launch(start = CoroutineStart.UNDISPATCHED) {
channel = newSendChannelFromObserver(observer).apply {
send("")
}
}
cancel()
}
}

assert(channel.isClosedForSend){ "Channel should be closed for send" }
verify(exactly = 1) { observer.onNext(allAny()) }
verify(exactly = 1) { observer.onError(any()) }
verify(exactly = 0) { observer.onCompleted() }
}

@Test
fun `Channel is closed when scope is cancelled exceptionally`() {

val observer = mockk<StreamObserver<String>>().apply {
every { onNext(allAny()) } just Runs
every { onError(any()) } just Runs
}

lateinit var channel: SendChannel<String>
assertFailsWith(IllegalStateException::class,"cancel"){
runBlocking {
launch {
channel = newSendChannelFromObserver(observer).apply {
send("")
}
}
launch {
error("cancel")
}
}
}

assert(channel.isClosedForSend){ "Channel should be closed for send" }
verify(exactly = 1) { observer.onNext(allAny()) }
verify(exactly = 1) { observer.onError(statusException) }
verify(exactly = 1) { observer.onError(any()) }
verify(exactly = 0) { observer.onCompleted() }
}

@Test
fun `Test channel close when observer onNext error `() = runBlocking {
fun `Channel close when observer onNext error `() = runBlocking {

val statusException = Status.UNKNOWN.asException()
val statusException = Status.INVALID_ARGUMENT.asException()
val observer = mockk<StreamObserver<String>>().apply {
every { onNext(allAny()) } throws statusException
every { onError(statusException) } just Runs
}

GlobalScope.newSendChannelFromObserver(observer).apply {
val channel = GlobalScope.newSendChannelFromObserver(observer).apply {

val send1Result = runCatching { send("") }
assertTrue(send1Result.isSuccess, "Error during observer.onNext should not fail channel.send")
Expand All @@ -90,9 +148,10 @@ class NewSendChannelFromObserverTests {
assertEquals(statusException, send2Result.exceptionOrNull())
}

assert(channel.isClosedForSend){ "Channel should be closed for send" }
verify(exactly = 1) { observer.onNext(allAny()) }
verify(exactly = 1) { observer.onError(statusException) }
verify(inverse = true) { observer.onCompleted() }
verify(atLeast = 1) { observer.onError(statusException) }
verify(exactly = 0) { observer.onCompleted() }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,10 @@ class ClientCallBidiStreamingTests {
val stub = rpcSpy.stub

setupServerHandlerSuccess()
val (requestChannel, responseChannel) = stub
.clientCallBidiStreaming(methodDescriptor)

lateinit var requestChannel: SendChannel<HelloRequest>
lateinit var responseChannel: ReceiveChannel<HelloReply>
runBlocking(Dispatchers.Default) {
val callChannel = stub
.withCoroutineContext()
.clientCallBidiStreaming(methodDescriptor)
requestChannel = callChannel.requestChannel
responseChannel = callChannel.responseChannel
launch {
repeat(3){
requestChannel.send(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class ClientCallClientStreamingTests {

private val methodDescriptor = GreeterGrpc.getSayHelloClientStreamingMethod()
private val service = spyk(object : GreeterGrpc.GreeterImplBase() {})
private val noopExceptionHandler = CoroutineExceptionHandler{ _, _ -> /**NOOP**/ }

inner class RpcSpy{
val stub: GreeterGrpc.GreeterStub
Expand Down Expand Up @@ -159,7 +158,6 @@ class ClientCallClientStreamingTests {
setupServerHandlerError()

val (requestChannel, response) = stub
.withCoroutineContext(noopExceptionHandler)
.clientCallClientStreaming(methodDescriptor)

var requestsSent = 0
Expand Down

0 comments on commit 5b8dfbe

Please sign in to comment.