From 9a48fd302e5bc11a01125bc9fb02c08bb35d8a64 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Sat, 25 Nov 2023 14:08:57 +0000 Subject: [PATCH] feat!: yield uint8arraylists The general pattern of stream muxers is to yield protocol stream data framed by some additional metadata - stream id, flags, etc. The frame data can be prepended/appended to the protocol stream data by using a `Uint8ArrayList` instead of a `Uint8Array`, this removes the need to copy the protocol data into a new `Uint8Array` for every frame. The new `@libp2p/interface` version allows muxers to emit `Uint8ArrayList`s as well as `Uint8Array`s so we can send protocol stream data to a transport in a no-copy operation. --- package.json | 9 +++++---- src/config.ts | 9 --------- src/decode.ts | 2 +- src/index.ts | 10 +++++++--- src/muxer.ts | 34 ++++++++++++++++++---------------- src/stream.ts | 10 +++++----- test/compliance.spec.ts | 5 ++++- test/mplex.util.ts | 16 ++++++++++------ test/muxer.spec.ts | 5 +++-- test/util.ts | 24 +++++++++++++----------- 10 files changed, 66 insertions(+), 58 deletions(-) diff --git a/package.json b/package.json index 7334ece..1085a6c 100644 --- a/package.json +++ b/package.json @@ -168,8 +168,8 @@ "docs": "aegir docs" }, "dependencies": { - "@libp2p/interface": "^0.1.0", - "@libp2p/logger": "^3.0.0", + "@libp2p/interface": "^1.0.0", + "@libp2p/utils": "^5.0.0", "get-iterator": "^2.0.1", "it-foreach": "^2.0.3", "it-pipe": "^3.0.1", @@ -178,8 +178,9 @@ }, "devDependencies": { "@dapplion/benchmark": "^0.2.4", - "@libp2p/interface-compliance-tests": "^4.0.0", - "@libp2p/mplex": "^9.0.0", + "@libp2p/interface-compliance-tests": "^5.0.0", + "@libp2p/logger": "^4.0.0", + "@libp2p/mplex": "^10.0.0", "aegir": "^41.1.10", "it-drain": "^3.0.2", "it-pair": "^2.0.6", diff --git a/src/config.ts b/src/config.ts index 887e461..5c682d5 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,16 +1,8 @@ import { CodeError } from '@libp2p/interface/errors' -import { logger, type Logger } from '@libp2p/logger' import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js' // TOOD use config items or delete them export interface Config { - /** - * Used to control the log destination - * - * It can be disabled by explicitly setting to `undefined` - */ - log?: Logger - /** * Used to do periodic keep alive messages using a ping. */ @@ -55,7 +47,6 @@ export interface Config { } export const defaultConfig: Config = { - log: logger('libp2p:yamux'), enableKeepAlive: true, keepAliveInterval: 30_000, maxInboundStreams: 1_000, diff --git a/src/decode.ts b/src/decode.ts index 36b4bbe..56440b9 100644 --- a/src/decode.ts +++ b/src/decode.ts @@ -1,4 +1,4 @@ -import { CodeError } from '@libp2p/interface/errors' +import { CodeError } from '@libp2p/interface' import { Uint8ArrayList } from 'uint8arraylist' import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js' import { type FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js' diff --git a/src/index.ts b/src/index.ts index 401e8e1..cd9d78d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -79,11 +79,15 @@ import { Yamux } from './muxer.js' import type { YamuxMuxerInit } from './muxer.js' -import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer' +import type { ComponentLogger, StreamMuxerFactory } from '@libp2p/interface' export { GoAwayCode, type FrameHeader, type FrameType } from './frame.js' export type { YamuxMuxerInit } -export function yamux (init: YamuxMuxerInit = {}): () => StreamMuxerFactory { - return () => new Yamux(init) +export interface YamuxMuxerComponents { + logger: ComponentLogger +} + +export function yamux (init: YamuxMuxerInit = {}): (components: YamuxMuxerComponents) => StreamMuxerFactory { + return (components) => new Yamux(components, init) } diff --git a/src/muxer.ts b/src/muxer.ts index f409016..df4e792 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -1,19 +1,16 @@ -import { CodeError } from '@libp2p/interface/errors' -import { setMaxListeners } from '@libp2p/interface/events' -import { logger, type Logger } from '@libp2p/logger' +import { CodeError, setMaxListeners } from '@libp2p/interface' import { getIterator } from 'get-iterator' import { pushable, type Pushable } from 'it-pushable' +import { Uint8ArrayList } from 'uint8arraylist' import { type Config, defaultConfig, verifyConfig } from './config.js' import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js' import { Decoder } from './decode.js' import { encodeHeader } from './encode.js' import { Flag, type FrameHeader, FrameType, GoAwayCode } from './frame.js' import { StreamState, YamuxStream } from './stream.js' -import type { AbortOptions } from '@libp2p/interface' -import type { Stream } from '@libp2p/interface/connection' -import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer' +import type { YamuxMuxerComponents } from './index.js' +import type { AbortOptions, ComponentLogger, Logger, Stream, StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface' import type { Sink, Source } from 'it-stream-types' -import type { Uint8ArrayList } from 'uint8arraylist' const YAMUX_PROTOCOL_ID = '/yamux/1.0.0' const CLOSE_TIMEOUT = 500 @@ -23,14 +20,16 @@ export interface YamuxMuxerInit extends StreamMuxerInit, Partial { export class Yamux implements StreamMuxerFactory { protocol = YAMUX_PROTOCOL_ID + private readonly _components: YamuxMuxerComponents private readonly _init: YamuxMuxerInit - constructor (init: YamuxMuxerInit = {}) { + constructor (components: YamuxMuxerComponents, init: YamuxMuxerInit = {}) { + this._components = components this._init = init } createStreamMuxer (init?: YamuxMuxerInit): YamuxMuxer { - return new YamuxMuxer({ + return new YamuxMuxer(this._components, { ...this._init, ...init }) @@ -43,11 +42,12 @@ export interface CloseOptions extends AbortOptions { export class YamuxMuxer implements StreamMuxer { protocol = YAMUX_PROTOCOL_ID - source: Pushable + source: Pushable sink: Sink, Promise> private readonly config: Config private readonly log?: Logger + private readonly logger: ComponentLogger /** Used to close the muxer from either the sink or source */ private readonly closeController: AbortController @@ -78,10 +78,11 @@ export class YamuxMuxer implements StreamMuxer { private readonly onIncomingStream?: (stream: Stream) => void private readonly onStreamEnd?: (stream: Stream) => void - constructor (init: YamuxMuxerInit) { + constructor (components: YamuxMuxerComponents, init: YamuxMuxerInit) { this.client = init.direction === 'outbound' this.config = { ...defaultConfig, ...init } - this.log = this.config.log + this.logger = components.logger + this.log = this.logger.forComponent('libp2p:yamux') verifyConfig(this.config) this.closeController = new AbortController() @@ -363,7 +364,7 @@ export class YamuxMuxer implements StreamMuxer { this.closeStream(id) this.onStreamEnd?.(stream) }, - log: logger(`libp2p:yamux:${direction}:${id}`), + log: this.logger.forComponent(`libp2p:yamux:${direction}:${id}`), config: this.config, getRTT: this.getRTT.bind(this) }) @@ -554,14 +555,15 @@ export class YamuxMuxer implements StreamMuxer { this.onIncomingStream?.(stream) } - private sendFrame (header: FrameHeader, data?: Uint8Array): void { + private sendFrame (header: FrameHeader, data?: Uint8ArrayList): void { this.log?.trace('sending frame %o', header) if (header.type === FrameType.Data) { if (data === undefined) { throw new CodeError('invalid frame', ERR_INVALID_FRAME) } - this.source.push(encodeHeader(header)) - this.source.push(data) + this.source.push( + new Uint8ArrayList(encodeHeader(header), data) + ) } else { this.source.push(encodeHeader(header)) } diff --git a/src/stream.ts b/src/stream.ts index 6223072..3297673 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,5 +1,5 @@ -import { CodeError } from '@libp2p/interface/errors' -import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream' +import { CodeError } from '@libp2p/interface' +import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream' import each from 'it-foreach' import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, INITIAL_STREAM_WINDOW } from './constants.js' import { Flag, type FrameHeader, FrameType, HEADER_LENGTH } from './frame.js' @@ -17,7 +17,7 @@ export enum StreamState { export interface YamuxStreamInit extends AbstractStreamInit { name?: string - sendFrame(header: FrameHeader, body?: Uint8Array): void + sendFrame(header: FrameHeader, body?: Uint8ArrayList): void getRTT(): number config: Config state: StreamState @@ -49,7 +49,7 @@ export class YamuxStream extends AbstractStream { private epochStart: number private readonly getRTT: () => number - private readonly sendFrame: (header: FrameHeader, body?: Uint8Array) => void + private readonly sendFrame: (header: FrameHeader, body?: Uint8ArrayList) => void constructor (init: YamuxStreamInit) { super({ @@ -115,7 +115,7 @@ export class YamuxStream extends AbstractStream { flag: flags, streamID: this._id, length: toSend - }, buf.subarray(0, toSend)) + }, buf.sublist(0, toSend)) this.sendWindowCapacity -= toSend diff --git a/test/compliance.spec.ts b/test/compliance.spec.ts index 4ec47dc..088f492 100644 --- a/test/compliance.spec.ts +++ b/test/compliance.spec.ts @@ -1,12 +1,15 @@ /* eslint-env mocha */ import tests from '@libp2p/interface-compliance-tests/stream-muxer' +import { defaultLogger } from '@libp2p/logger' import { TestYamux } from './util.js' describe('compliance', () => { tests({ async setup () { - return new TestYamux({}) + return new TestYamux({ + logger: defaultLogger() + }) }, async teardown () {} }) diff --git a/test/mplex.util.ts b/test/mplex.util.ts index 26a2478..21877bb 100644 --- a/test/mplex.util.ts +++ b/test/mplex.util.ts @@ -1,10 +1,14 @@ +import { defaultLogger } from '@libp2p/logger' import { mplex } from '@libp2p/mplex' import { duplexPair } from 'it-pair/duplex' import { pipe } from 'it-pipe' +import { type Uint8ArrayList } from 'uint8arraylist' import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface/stream-muxer' import type { Source, Transform } from 'it-stream-types' -const factory = mplex()() +const factory = mplex()({ + logger: defaultLogger() +}) export function testYamuxMuxer (name: string, client: boolean, conf: StreamMuxerInit = {}): StreamMuxer { return factory.createStreamMuxer({ @@ -54,14 +58,14 @@ export function testClientServer (conf: StreamMuxerInit = {}): { unpauseWrite(): void } } { - const pair = duplexPair() + const pair = duplexPair() const client = testYamuxMuxer('libp2p:mplex:client', true, conf) const server = testYamuxMuxer('libp2p:mplex:server', false, conf) - const clientReadTransform = pauseableTransform() - const clientWriteTransform = pauseableTransform() - const serverReadTransform = pauseableTransform() - const serverWriteTransform = pauseableTransform() + const clientReadTransform = pauseableTransform() + const clientWriteTransform = pauseableTransform() + const serverReadTransform = pauseableTransform() + const serverWriteTransform = pauseableTransform() void pipe(pair[0], clientReadTransform.transform, client, clientWriteTransform.transform, pair[0]) void pipe(pair[1], serverReadTransform.transform, server, serverWriteTransform.transform, pair[1]) diff --git a/test/muxer.spec.ts b/test/muxer.spec.ts index 0171258..4ba2164 100644 --- a/test/muxer.spec.ts +++ b/test/muxer.spec.ts @@ -3,6 +3,7 @@ import { expect } from 'aegir/chai' import { duplexPair } from 'it-pair/duplex' import { pipe } from 'it-pipe' +import { type Uint8ArrayList } from 'uint8arraylist' import { ERR_MUXER_LOCAL_CLOSED } from '../src/constants.js' import { sleep, testClientServer, testYamuxMuxer, type YamuxFixture } from './util.js' @@ -29,7 +30,7 @@ describe('muxer', () => { }) it('test client<->client', async () => { - const pair = duplexPair() + const pair = duplexPair() const client1 = testYamuxMuxer('libp2p:yamux:1', true) const client2 = testYamuxMuxer('libp2p:yamux:2', true) void pipe(pair[0], client1, pair[0]) @@ -44,7 +45,7 @@ describe('muxer', () => { }) it('test server<->server', async () => { - const pair = duplexPair() + const pair = duplexPair() const client1 = testYamuxMuxer('libp2p:yamux:1', false) const client2 = testYamuxMuxer('libp2p:yamux:2', false) void pipe(pair[0], client1, pair[0]) diff --git a/test/util.ts b/test/util.ts index 62aa744..34a7741 100644 --- a/test/util.ts +++ b/test/util.ts @@ -1,6 +1,7 @@ -import { logger } from '@libp2p/logger' +import { prefixLogger } from '@libp2p/logger' import { duplexPair } from 'it-pair/duplex' import { pipe } from 'it-pipe' +import { type Uint8ArrayList } from 'uint8arraylist' import { Yamux, YamuxMuxer, type YamuxMuxerInit } from '../src/muxer.js' import type { Config } from '../src/config.js' import type { Source, Transform } from 'it-stream-types' @@ -28,16 +29,17 @@ export const testConf: Partial = { export class TestYamux extends Yamux { createStreamMuxer (init?: YamuxMuxerInit): YamuxMuxer { const client = isClient() - return super.createStreamMuxer({ ...testConf, ...init, direction: client ? 'outbound' : 'inbound', log: logger(`libp2p:yamux${client ? 1 : 2}`) }) + return super.createStreamMuxer({ ...testConf, ...init, direction: client ? 'outbound' : 'inbound' }) } } export function testYamuxMuxer (name: string, client: boolean, conf: YamuxMuxerInit = {}): YamuxMuxer { return new YamuxMuxer({ + logger: prefixLogger(name) + }, { ...testConf, ...conf, - direction: client ? 'outbound' : 'inbound', - log: logger(name) + direction: client ? 'outbound' : 'inbound' }) } @@ -79,14 +81,14 @@ export function testClientServer (conf: YamuxMuxerInit = {}): { client: YamuxFixture server: YamuxFixture } { - const pair = duplexPair() - const client = testYamuxMuxer('libp2p:yamux:client', true, conf) - const server = testYamuxMuxer('libp2p:yamux:server', false, conf) + const pair = duplexPair() + const client = testYamuxMuxer('client', true, conf) + const server = testYamuxMuxer('server', false, conf) - const clientReadTransform = pauseableTransform() - const clientWriteTransform = pauseableTransform() - const serverReadTransform = pauseableTransform() - const serverWriteTransform = pauseableTransform() + const clientReadTransform = pauseableTransform() + const clientWriteTransform = pauseableTransform() + const serverReadTransform = pauseableTransform() + const serverWriteTransform = pauseableTransform() void pipe(pair[0], clientReadTransform.transform, client, clientWriteTransform.transform, pair[0]) void pipe(pair[1], serverReadTransform.transform, server, serverWriteTransform.transform, pair[1])