Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: limit unprocessed message queue size separately to message size (#…
Browse files Browse the repository at this point in the history
…234)

* fix: limit unprocessed message queue size separately to message size

It's possible to receive lots of small messages in one buffer that can
be larger than the max message size, so limit the unprocessed message
queue size separately from the max message size.

* chore: add tests

* chore: pass option to decoder

* chore: PR comment

Co-authored-by: Marin Petrunić <[email protected]>

Co-authored-by: Marin Petrunić <[email protected]>
  • Loading branch information
achingbrain and mpetrunic committed Nov 24, 2022
1 parent 31d3938 commit 2297856
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Creates a factory that can be used to create new muxers.
`options` is an optional `Object` that may have the following properties:

- `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
- `maxUnprocessedMessageQueueSize` - a number that limits the size of the unprocessed input buffer (default: 4194304 - e.g. 4MB)
- `maxInboundStreams` - a number that defines how many incoming streams are allowed per connection (default: 1024)
- `maxOutboundStreams` - a number that defines how many outgoing streams are allowed per connection (default: 1024)
- `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 \* 1024 \* 4 - e.g. 4MB)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
"it-drain": "^2.0.0",
"it-foreach": "^1.0.0",
"it-map": "^2.0.0",
"it-to-buffer": "^3.0.0",
"p-defer": "^4.0.0",
"random-int": "^3.0.0",
"typescript": "^4.7.4"
Expand Down
24 changes: 18 additions & 6 deletions src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Source } from 'it-stream-types'
import type { Message } from './message-types.js'

export const MAX_MSG_SIZE = 1 << 20 // 1MB
export const MAX_MSG_QUEUE_SIZE = 4 << 20 // 4MB

interface MessageHeader {
id: number
Expand All @@ -16,11 +17,13 @@ class Decoder {
private readonly _buffer: Uint8ArrayList
private _headerInfo: MessageHeader | null
private readonly _maxMessageSize: number
private readonly _maxUnprocessedMessageQueueSize: number

constructor (maxMessageSize: number = MAX_MSG_SIZE) {
constructor (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
this._buffer = new Uint8ArrayList()
this._headerInfo = null
this._maxMessageSize = maxMessageSize
this._maxUnprocessedMessageQueueSize = maxUnprocessedMessageQueueSize
}

write (chunk: Uint8Array) {
Expand All @@ -30,8 +33,8 @@ class Decoder {

this._buffer.append(chunk)

if (this._buffer.byteLength > this._maxMessageSize) {
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
if (this._buffer.byteLength > this._maxUnprocessedMessageQueueSize) {
throw Object.assign(new Error('unprocessed message queue size too large!'), { code: 'ERR_MSG_QUEUE_TOO_BIG' })
}

const msgs: Message[] = []
Expand All @@ -40,7 +43,11 @@ class Decoder {
if (this._headerInfo == null) {
try {
this._headerInfo = this._decodeHeader(this._buffer)
} catch (_) {
} catch (err: any) {
if (err.code === 'ERR_MSG_TOO_BIG') {
throw err
}

break // We haven't received enough data yet
}
}
Expand Down Expand Up @@ -90,6 +97,11 @@ class Decoder {
throw new Error(`Invalid type received: ${type}`)
}

// test message type varint + data length
if (length > this._maxMessageSize) {
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
}

// @ts-expect-error h is a number not a CODE
return { id: h >> 3, type, offset: offset + end, length }
}
Expand Down Expand Up @@ -128,9 +140,9 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
/**
* Decode a chunk and yield an _array_ of decoded messages
*/
export function decode (maxMessageSize: number = MAX_MSG_SIZE) {
export function decode (maxMessageSize: number = MAX_MSG_SIZE, maxUnprocessedMessageQueueSize: number = MAX_MSG_QUEUE_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
const decoder = new Decoder(maxMessageSize)
const decoder = new Decoder(maxMessageSize, maxUnprocessedMessageQueueSize)

for await (const chunk of source) {
const msgs = decoder.write(chunk)
Expand Down
12 changes: 11 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@ export interface MplexInit {
/**
* The maximum size of message that can be sent in one go in bytes.
* Messages larger than this will be split into multiple smaller
* messages (default: 1MB)
* messages. If we receive a message larger than this an error will
* be thrown and the connection closed. (default: 1MB)
*/
maxMsgSize?: number

/**
* Constrains the size of the unprocessed message queue buffer.
* Before messages are deserialized, the raw bytes are buffered to ensure
* we have the complete message to deserialized. If the queue gets longer
* than this value an error will be thrown and the connection closed.
* (default: 4MB)
*/
maxUnprocessedMessageQueueSize?: number

/**
* The maximum number of multiplexed streams that can be open at any
* one time. A request to open more than this will have a stream
Expand Down
2 changes: 1 addition & 1 deletion src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer {
try {
await pipe(
source,
decode(this._init.maxMsgSize),
decode(this._init.maxMsgSize, this._init.maxUnprocessedMessageQueueSize),
async source => {
for await (const msg of source) {
await this._handleIncoming(msg)
Expand Down
65 changes: 64 additions & 1 deletion test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { Message, MessageTypes } from '../src/message-types.js'
import { encode } from '../src/encode.js'
import { decode } from '../src/decode.js'
import { Uint8ArrayList } from 'uint8arraylist'
import toBuffer from 'it-to-buffer'

describe('restrict size', () => {
it('should throw when size is too big', async () => {
Expand All @@ -36,9 +37,10 @@ describe('restrict size', () => {
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG')
expect(output).to.have.length(2)
expect(output).to.have.length(3)
expect(output[0]).to.deep.equal(input[0])
expect(output[1]).to.deep.equal(input[1])
expect(output[2]).to.deep.equal(input[2])
return
}
throw new Error('did not restrict size')
Expand All @@ -59,4 +61,65 @@ describe('restrict size', () => {
)
expect(output).to.deep.equal(input)
})

it('should throw when unprocessed message queue size is too big', async () => {
const maxMessageSize = 32
const maxUnprocessedMessageQueueSize = 64

const input: Message[] = [
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
]

const output: Message[] = []

try {
await pipe(
input,
encode,
async function * (source) {
// make one big buffer
yield toBuffer(source)
},
decode(maxMessageSize, maxUnprocessedMessageQueueSize),
(source) => each(source, chunk => {
output.push(chunk)
}),
async (source) => await drain(source)
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG')
expect(output).to.have.length(0)
return
}
throw new Error('did not restrict size')
})

it('should throw when unprocessed message queue size is too big because of garbage', async () => {
const maxMessageSize = 32
const maxUnprocessedMessageQueueSize = 64
const input = randomBytes(maxUnprocessedMessageQueueSize + 1)
const output: Message[] = []

try {
await pipe(
[input],
decode(maxMessageSize, maxUnprocessedMessageQueueSize),
(source) => each(source, chunk => {
output.push(chunk)
}),
async (source) => await drain(source)
)
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_QUEUE_TOO_BIG')
expect(output).to.have.length(0)
return
}
throw new Error('did not restrict size')
})
})

0 comments on commit 2297856

Please sign in to comment.