Skip to content

Commit

Permalink
Refactor: test fixtures sort. Regression bug fix (#284)
Browse files Browse the repository at this point in the history
* Refactor: move test utility classes to testFixtures module
* Fix regression: MplexFrame should implement ReferenceCounted to release the underlying ByteBuf after encoding with MplexFrameCodec
* Add unit test reproducing the issue
  • Loading branch information
Nashatyrev committed May 25, 2023
1 parent 8971b31 commit 63869ca
Show file tree
Hide file tree
Showing 25 changed files with 33 additions and 16 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ configure(
implementation("org.slf4j:slf4j-api")
implementation("com.github.multiformats:java-multibase:v1.1.1")

testFixturesImplementation("com.google.guava:guava")
testFixturesImplementation("org.slf4j:slf4j-api")

testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.junit.jupiter:junit-jupiter-params")
testImplementation("io.mockk:mockk")
Expand Down
5 changes: 4 additions & 1 deletion libp2p/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ dependencies {
implementation("org.bouncycastle:bcpkix-jdk15on")

testImplementation(project(":tools:schedulers"))
testImplementation("org.apache.logging.log4j:log4j-core")

testFixturesApi("org.apache.logging.log4j:log4j-core")
testFixturesImplementation(project(":tools:schedulers"))
testFixturesImplementation("io.netty:netty-transport-classes-epoll")
testFixturesImplementation("io.netty:netty-handler")
testFixturesImplementation("org.junit.jupiter:junit-jupiter-api")

jmhImplementation(project(":tools:schedulers"))
jmhImplementation("org.openjdk.jmh:jmh-core")
Expand Down
3 changes: 2 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexFrame.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package io.libp2p.mux.mplex

import io.libp2p.etc.util.netty.mux.MuxId
import io.netty.buffer.ByteBuf
import io.netty.buffer.DefaultByteBufHolder
import io.netty.buffer.Unpooled

/**
Expand All @@ -24,7 +25,7 @@ import io.netty.buffer.Unpooled
* @param data the data segment.
* @see [mplex documentation](https://github.com/libp2p/specs/tree/master/mplex#opening-a-new-stream)
*/
data class MplexFrame(val id: MuxId, val flag: MplexFlag, val data: ByteBuf) {
data class MplexFrame(val id: MuxId, val flag: MplexFlag, val data: ByteBuf) : DefaultByteBufHolder(data) {

companion object {
fun createDataFrame(id: MuxId, data: ByteBuf) =
Expand Down
31 changes: 22 additions & 9 deletions libp2p/src/test/kotlin/io/libp2p/mux/mplex/MplexFrameCodecTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ class MplexFrameCodecTest {
)
}
val dummyId = DefaultChannelId.newInstance()
val maxFrameDataLength = 1024
val channel = EmbeddedChannel(MplexFrameCodec(maxFrameDataLength = maxFrameDataLength))

@Test
fun `check max frame size limit`() {
val channelLarge = EmbeddedChannel(MplexFrameCodec(maxFrameDataLength = 1024))

val mplexFrame = MplexFrame(
MuxId(dummyId, 777, true), MplexFlag.MessageInitiator,
ByteArray(1024).toByteBuf()
ByteArray(maxFrameDataLength).toByteBuf()
)

assertTrue(
channelLarge.writeOutbound(mplexFrame)
channel.writeOutbound(mplexFrame)
)
val largeFrameBytes = channelLarge.readOutbound<ByteBuf>()
val largeFrameBytes = channel.readOutbound<ByteBuf>()
val largeFrameBytesTrunc = largeFrameBytes.slice(0, largeFrameBytes.readableBytes() - 1)

val channelSmall = EmbeddedChannel(MplexFrameCodec(maxFrameDataLength = 128))
Expand All @@ -58,8 +58,6 @@ class MplexFrameCodecTest {
@ParameterizedTest
@MethodSource("splitIndexes")
fun testDecoder(sliceIdx: List<Int>) {
val channel = EmbeddedChannel(MplexFrameCodec())

val mplexFrames = arrayOf(
MplexFrame(MuxId(dummyId, 777, true), MplexFlag.MessageInitiator, "Hello-1".toByteArray().toByteBuf()),
MplexFrame(MuxId(dummyId, 888, true), MplexFlag.MessageInitiator, "Hello-2".toByteArray().toByteBuf()),
Expand Down Expand Up @@ -93,8 +91,6 @@ class MplexFrameCodecTest {

@Test
fun `test id initiator is inverted on decoding`() {
val channel = EmbeddedChannel(MplexFrameCodec())

val mplexFrames = arrayOf(
MplexFrame.createOpenFrame(MuxId(dummyId, 1, true)),
MplexFrame.createDataFrame(MuxId(dummyId, 2, true), "Hello-2".toByteArray().toByteBuf()),
Expand All @@ -118,4 +114,21 @@ class MplexFrameCodecTest {
assertEquals(mplexFrames[idx].flag, resFrame.flag)
}
}

@Test
fun `check the frame underlying buffer is released after send`() {
val frameDataBuf = "Hello-1".toByteArray().toByteBuf()

assertTrue(frameDataBuf.refCnt() == 1)

channel.writeOutbound(
MplexFrame(MuxId(dummyId, 777, true), MplexFlag.MessageInitiator, frameDataBuf)
)

val encodedFrame = channel.readOutbound<ByteBuf>()
// bytes are released after sending to the wire
encodedFrame.release()

assertTrue(frameDataBuf.refCnt() == 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ open class EchoProtocol : ProtocolHandler<EchoController>(Long.MAX_VALUE, Long.M
}
}

open inner class EchoInitiator(val ready: CompletableFuture<Void>) : ProtocolMessageHandler<ByteBuf>, EchoController {
open inner class EchoInitiator(val ready: CompletableFuture<Void>) :
ProtocolMessageHandler<ByteBuf>, EchoController {
lateinit var stream: Stream
var ret = CompletableFuture<String>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import java.util.concurrent.CompletableFuture

// Utility class (aka sniffer) that just forwards TCP traffic back'n'forth to another TCP address and log it
Expand Down Expand Up @@ -67,8 +65,6 @@ class TCPProxy {
return future
}

@Test
@Disabled
fun run() {
start(11111, "localhost", 10000)
.channel().closeFuture().await()
Expand Down

0 comments on commit 63869ca

Please sign in to comment.