Skip to content

Commit

Permalink
Read unsigned ints in yamux decoder for length and stream id
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed May 19, 2023
1 parent 1df8489 commit dd78586
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 18 deletions.
2 changes: 1 addition & 1 deletion libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrame.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.netty.buffer.Unpooled
* @param flag the flag value for this frame.
* @param data the data segment.
*/
class YamuxFrame(val id: MuxId, val type: Int, val flags: Int, val lenData: Int, val data: ByteBuf? = null) :
class YamuxFrame(val id: MuxId, val type: Int, val flags: Int, val lenData: Long, val data: ByteBuf? = null) :
DefaultByteBufHolder(data ?: Unpooled.EMPTY_BUFFER) {

override fun toString(): String {
Expand Down
18 changes: 6 additions & 12 deletions libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxFrameCodec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class YamuxFrameCodec(
out.writeByte(msg.type)
out.writeShort(msg.flags)
out.writeInt(msg.id.id.toInt())
out.writeInt(msg.data?.readableBytes() ?: msg.lenData)
out.writeInt(msg.data?.readableBytes() ?: msg.lenData.toInt())
out.writeBytes(msg.data ?: Unpooled.EMPTY_BUFFER)
}

Expand All @@ -48,19 +48,13 @@ class YamuxFrameCodec(
msg.readByte(); // version always 0
val type = msg.readUnsignedByte()
val flags = msg.readUnsignedShort()
val streamId = msg.readInt()
val lenData = msg.readInt()
val streamId = msg.readUnsignedInt()
val lenData = msg.readUnsignedInt()
if (type.toInt() != YamuxType.DATA) {
val yamuxFrame = YamuxFrame(MuxId(ctx.channel().id(), streamId.toLong(), isInitiator.xor(streamId % 2 == 1).not()), type.toInt(), flags, lenData)
val yamuxFrame = YamuxFrame(MuxId(ctx.channel().id(), streamId, isInitiator.xor(streamId.mod(2).equals(1)).not()), type.toInt(), flags, lenData)
out.add(yamuxFrame)
return
}
if (lenData < 0) {
// not enough data to read the frame length
// will wait for more ...
msg.readerIndex(readerIndex)
return
}
if (lenData > maxFrameDataLength) {
msg.skipBytes(msg.readableBytes())
throw ProtocolViolationException("Yamux frame is too large: $lenData")
Expand All @@ -71,9 +65,9 @@ class YamuxFrameCodec(
msg.readerIndex(readerIndex)
return
}
val data = msg.readSlice(lenData)
val data = msg.readSlice(lenData.toInt())
data.retain() // MessageToMessageCodec releases original buffer, but it needs to be relayed
val yamuxFrame = YamuxFrame(MuxId(ctx.channel().id(), streamId.toLong(), isInitiator.xor(streamId % 2 == 1).not()), type.toInt(), flags, lenData, data)
val yamuxFrame = YamuxFrame(MuxId(ctx.channel().id(), streamId, isInitiator.xor(streamId.mod(2).equals(1)).not()), type.toInt(), flags, lenData, data)
out.add(yamuxFrame)
}
}
Expand Down
10 changes: 5 additions & 5 deletions libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ open class YamuxHandler(
val ctx = getChannelHandlerContext()
val size = msg.lenData
handleFlags(msg)
if (size == 0)
if (size.toInt() == 0)
return
val newWindow = receiveWindow.addAndGet(-size)
val newWindow = receiveWindow.addAndGet(-size.toInt())
if (newWindow < INITIAL_WINDOW_SIZE / 2) {
val delta = INITIAL_WINDOW_SIZE / 2
receiveWindow.addAndGet(delta)
ctx.write(YamuxFrame(msg.id, YamuxType.WINDOW_UPDATE, 0, delta))
ctx.write(YamuxFrame(msg.id, YamuxType.WINDOW_UPDATE, 0, delta.toLong()))
ctx.flush()
}
childRead(msg.id, msg.data!!)
}

fun handleWindowUpdate(msg: YamuxFrame) {
handleFlags(msg)
val size = msg.lenData
val size = msg.lenData.toInt()
sendWindow.addAndGet(size)
lock.release()
}
Expand All @@ -105,7 +105,7 @@ open class YamuxHandler(
data.sliceMaxSize(minOf(maxFrameDataLength, sendWindow.get()))
.map { frameSliceBuf ->
sendWindow.addAndGet(-frameSliceBuf.readableBytes())
YamuxFrame(child.id, YamuxType.DATA, 0, frameSliceBuf.readableBytes(), frameSliceBuf)
YamuxFrame(child.id, YamuxType.DATA, 0, frameSliceBuf.readableBytes().toLong(), frameSliceBuf)
}.forEach { muxFrame ->
ctx.write(muxFrame)
}
Expand Down

0 comments on commit dd78586

Please sign in to comment.