From 3f04b3d5f4cf56782caa4b1069a7657f93d67024 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 22 Nov 2022 08:28:29 +0000 Subject: [PATCH 1/2] fix: yield single buffers Messages are serialized to multiple buffers, intsead of yield each buffer one by one, create single buffers that contain the whole serialized message. This greatly improves transport performance as writing one big buffer is a lot faster than writing lots of small buffers to network sockets etc. Before: ``` testing 0.40.x-mplex sender 3276811 messages 17 invocations sender 6553636 bufs 17 b 24197 ms 105 MB in 32 B chunks in 24170ms ``` After: ``` testing 0.40.x-mplex sender 3276811 messages 1638408 invocations 1638411 bufs 68 b 8626 ms 105 MB in 32 B chunks in 8611ms ``` --- src/encode.ts | 22 +++++++++++----------- src/mplex.ts | 2 +- src/stream.ts | 25 ++++++++----------------- 3 files changed, 20 insertions(+), 29 deletions(-) diff --git a/src/encode.ts b/src/encode.ts index 17564e1..d2e9eb4 100644 --- a/src/encode.ts +++ b/src/encode.ts @@ -1,5 +1,6 @@ import type { Source } from 'it-stream-types' import varint from 'varint' +import { Uint8ArrayList } from 'uint8arraylist' import { allocUnsafe } from './alloc-unsafe.js' import { Message, MessageTypes } from './message-types.js' @@ -17,7 +18,7 @@ class Encoder { /** * Encodes the given message and returns it and its header */ - write (msg: Message): Uint8Array[] { + write (msg: Message, list: Uint8ArrayList): void { const pool = this._pool let offset = this._poolOffset @@ -41,16 +42,11 @@ class Encoder { this._poolOffset = offset } + list.append(header) + if ((msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) && msg.data != null) { - return [ - header, - ...(msg.data instanceof Uint8Array ? [msg.data] : msg.data) - ] + list.append(msg.data) } - - return [ - header - ] } } @@ -61,12 +57,16 @@ const encoder = new Encoder() */ export async function * encode (source: Source) { for await (const msg of source) { + const list = new Uint8ArrayList() + if (Array.isArray(msg)) { for (const m of msg) { - yield * encoder.write(m) + encoder.write(m, list) } } else { - yield * encoder.write(msg) + encoder.write(msg, list) } + + yield list.subarray() } } diff --git a/src/mplex.ts b/src/mplex.ts index 2d44172..0a359db 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -156,7 +156,7 @@ export class MplexStreamMuxer implements StreamMuxer { _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map }) { const { id, name, type, registry } = options - log('new %s stream %s %s', type, id) + log('new %s stream %s', type, id) if (type === 'initiator' && this._streams.initiators.size === (this._init.maxOutboundStreams ?? MAX_STREAMS_OUTBOUND_STREAMS_PER_CONNECTION)) { throw errCode(new Error('Too many outbound streams open'), 'ERR_TOO_MANY_OUTBOUND_STREAMS') diff --git a/src/stream.ts b/src/stream.ts index bfc29cf..f049217 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -171,24 +171,15 @@ export function createStream (options: Options): MplexStream { send({ id, type: InitiatorMessageTypes.NEW_STREAM, data: new Uint8ArrayList(uint8ArrayFromString(streamName)) }) } - const uint8ArrayList = new Uint8ArrayList() - - for await (const data of source) { - if (data.length <= maxMsgSize) { - send({ id, type: Types.MESSAGE, data: data instanceof Uint8ArrayList ? data : new Uint8ArrayList(data) }) - } else { - uint8ArrayList.append(data) - - while (uint8ArrayList.length !== 0) { - // eslint-disable-next-line max-depth - if (uint8ArrayList.length <= maxMsgSize) { - send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist() }) - uint8ArrayList.consume(uint8ArrayList.length) - break - } - send({ id, type: Types.MESSAGE, data: uint8ArrayList.sublist(0, maxMsgSize) }) - uint8ArrayList.consume(maxMsgSize) + for await (let data of source) { + while (data.length > 0) { + if (data.length <= maxMsgSize) { + send({ id, type: Types.MESSAGE, data: data instanceof Uint8Array ? new Uint8ArrayList(data) : data }) + break } + data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data + send({ id, type: Types.MESSAGE, data: data.sublist(0, maxMsgSize) }) + data.consume(maxMsgSize) } } } catch (err: any) { From 1af558619d910ba5bc20921649cadfd7be62e5d8 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 24 Nov 2022 07:42:56 +0000 Subject: [PATCH 2/2] chore: update comment --- src/encode.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/encode.ts b/src/encode.ts index d2e9eb4..f2c726a 100644 --- a/src/encode.ts +++ b/src/encode.ts @@ -16,7 +16,7 @@ class Encoder { } /** - * Encodes the given message and returns it and its header + * Encodes the given message and adds it to the passed list */ write (msg: Message, list: Uint8ArrayList): void { const pool = this._pool