Skip to content

Commit

Permalink
Fire readComplete() event to Stream channels (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nashatyrev authored Feb 25, 2021
1 parent 5b7d9ba commit ed2a5f3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ abstract class AbstractMuxHandler<TData>() :
private val activeFuture = CompletableFuture<Void>()
private var closed = false
protected abstract val inboundInitializer: MuxChannelInitializer<TData>
private val pendingReadComplete = mutableSetOf<MuxId>()

override fun handlerAdded(ctx: ChannelHandlerContext) {
super.handlerAdded(ctx)
Expand Down Expand Up @@ -54,13 +55,19 @@ abstract class AbstractMuxHandler<TData>() :

protected fun childRead(id: MuxId, msg: TData) {
val child = streamMap[id] ?: throw ConnectionClosedException("Channel with id $id not opened")
pendingReadComplete += id
child.pipeline().fireChannelRead(msg)
}

override fun channelReadComplete(ctx: ChannelHandlerContext) {
pendingReadComplete.forEach { streamMap[it]?.pipeline()?.fireChannelReadComplete() }
pendingReadComplete.clear()
}

abstract fun onChildWrite(child: MuxChannel<TData>, data: TData): Boolean

protected fun onRemoteOpen(id: MuxId) {
val initializer = inboundInitializer ?: throw InternalErrorException("Illegal state: inbound stream handler is not set up yet")
val initializer = inboundInitializer
val child = createChild(
id,
initializer,
Expand Down
37 changes: 37 additions & 0 deletions src/test/kotlin/io/libp2p/mux/MultiplexHandlerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.DefaultChannelId
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertThrows
Expand Down Expand Up @@ -83,6 +84,40 @@ class MultiplexHandlerTest {
assertEquals("66", childHandlers[0].inboundMessages.last())
}

@Test
fun `test that readComplete event is fired to child channel`() {
openStream(12)

assertThat(childHandlers[0].readCompleteEventCount).isZero()

writeStream(12, "22")

assertThat(childHandlers[0].readCompleteEventCount).isEqualTo(1)

writeStream(12, "23")

assertThat(childHandlers[0].readCompleteEventCount).isEqualTo(2)
}

@Test
fun `test that readComplete event is fired to reading channels only`() {
openStream(12)
openStream(13)

assertThat(childHandlers[0].readCompleteEventCount).isZero()
assertThat(childHandlers[1].readCompleteEventCount).isZero()

writeStream(12, "22")

assertThat(childHandlers[0].readCompleteEventCount).isEqualTo(1)
assertThat(childHandlers[1].readCompleteEventCount).isEqualTo(0)

writeStream(13, "23")

assertThat(childHandlers[0].readCompleteEventCount).isEqualTo(1)
assertThat(childHandlers[1].readCompleteEventCount).isEqualTo(1)
}

@Test
fun twoStreamsInterleaved() {
openStream(12)
Expand Down Expand Up @@ -222,6 +257,7 @@ class MultiplexHandlerTest {
class TestHandler : ChannelInboundHandlerAdapter() {
val inboundMessages = mutableListOf<String>()
var ctx: ChannelHandlerContext? = null
var readCompleteEventCount = 0

override fun channelInactive(ctx: ChannelHandlerContext?) {
println("MultiplexHandlerTest.channelInactive")
Expand All @@ -246,6 +282,7 @@ class MultiplexHandlerTest {
}

override fun channelReadComplete(ctx: ChannelHandlerContext?) {
readCompleteEventCount++
println("MultiplexHandlerTest.channelReadComplete")
}

Expand Down

0 comments on commit ed2a5f3

Please sign in to comment.