Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: yield uint8arraylists #65

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@
"docs": "aegir docs"
},
"dependencies": {
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"@libp2p/interface": "^1.0.0",
"@libp2p/utils": "^5.0.0",
"get-iterator": "^2.0.1",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
Expand All @@ -178,8 +178,9 @@
},
"devDependencies": {
"@dapplion/benchmark": "^0.2.4",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/mplex": "^9.0.0",
"@libp2p/interface-compliance-tests": "^5.0.0",
"@libp2p/logger": "^4.0.0",
"@libp2p/mplex": "^10.0.0",
"aegir": "^41.1.10",
"it-drain": "^3.0.2",
"it-pair": "^2.0.6",
Expand Down
9 changes: 0 additions & 9 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js'

// TOOD use config items or delete them
export interface Config {
/**
* Used to control the log destination
*
* It can be disabled by explicitly setting to `undefined`
*/
log?: Logger

/**
* Used to do periodic keep alive messages using a ping.
*/
Expand Down Expand Up @@ -55,7 +47,6 @@ export interface Config {
}

export const defaultConfig: Config = {
log: logger('libp2p:yamux'),
enableKeepAlive: true,
keepAliveInterval: 30_000,
maxInboundStreams: 1_000,
Expand Down
2 changes: 1 addition & 1 deletion src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CodeError } from '@libp2p/interface/errors'
import { CodeError } from '@libp2p/interface'
import { Uint8ArrayList } from 'uint8arraylist'
import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js'
import { type FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js'
Expand Down
10 changes: 7 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@

import { Yamux } from './muxer.js'
import type { YamuxMuxerInit } from './muxer.js'
import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer'
import type { ComponentLogger, StreamMuxerFactory } from '@libp2p/interface'

export { GoAwayCode, type FrameHeader, type FrameType } from './frame.js'
export type { YamuxMuxerInit }

export function yamux (init: YamuxMuxerInit = {}): () => StreamMuxerFactory {
return () => new Yamux(init)
export interface YamuxMuxerComponents {
logger: ComponentLogger
}

export function yamux (init: YamuxMuxerInit = {}): (components: YamuxMuxerComponents) => StreamMuxerFactory {
return (components) => new Yamux(components, init)
}
34 changes: 18 additions & 16 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger, type Logger } from '@libp2p/logger'
import { CodeError, setMaxListeners } from '@libp2p/interface'
import { getIterator } from 'get-iterator'
import { pushable, type Pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import { type Config, defaultConfig, verifyConfig } from './config.js'
import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js'
import { Decoder } from './decode.js'
import { encodeHeader } from './encode.js'
import { Flag, type FrameHeader, FrameType, GoAwayCode } from './frame.js'
import { StreamState, YamuxStream } from './stream.js'
import type { AbortOptions } from '@libp2p/interface'
import type { Stream } from '@libp2p/interface/connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { YamuxMuxerComponents } from './index.js'
import type { AbortOptions, ComponentLogger, Logger, Stream, StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface'
import type { Sink, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const YAMUX_PROTOCOL_ID = '/yamux/1.0.0'
const CLOSE_TIMEOUT = 500
Expand All @@ -23,14 +20,16 @@ export interface YamuxMuxerInit extends StreamMuxerInit, Partial<Config> {

export class Yamux implements StreamMuxerFactory {
protocol = YAMUX_PROTOCOL_ID
private readonly _components: YamuxMuxerComponents
private readonly _init: YamuxMuxerInit

constructor (init: YamuxMuxerInit = {}) {
constructor (components: YamuxMuxerComponents, init: YamuxMuxerInit = {}) {
this._components = components
this._init = init
}

createStreamMuxer (init?: YamuxMuxerInit): YamuxMuxer {
return new YamuxMuxer({
return new YamuxMuxer(this._components, {
...this._init,
...init
})
Expand All @@ -43,11 +42,12 @@ export interface CloseOptions extends AbortOptions {

export class YamuxMuxer implements StreamMuxer {
protocol = YAMUX_PROTOCOL_ID
source: Pushable<Uint8Array>
source: Pushable<Uint8ArrayList | Uint8Array>
sink: Sink<Source<Uint8ArrayList | Uint8Array>, Promise<void>>

private readonly config: Config
private readonly log?: Logger
private readonly logger: ComponentLogger

/** Used to close the muxer from either the sink or source */
private readonly closeController: AbortController
Expand Down Expand Up @@ -78,10 +78,11 @@ export class YamuxMuxer implements StreamMuxer {
private readonly onIncomingStream?: (stream: Stream) => void
private readonly onStreamEnd?: (stream: Stream) => void

constructor (init: YamuxMuxerInit) {
constructor (components: YamuxMuxerComponents, init: YamuxMuxerInit) {
this.client = init.direction === 'outbound'
this.config = { ...defaultConfig, ...init }
this.log = this.config.log
this.logger = components.logger
this.log = this.logger.forComponent('libp2p:yamux')
verifyConfig(this.config)

this.closeController = new AbortController()
Expand Down Expand Up @@ -363,7 +364,7 @@ export class YamuxMuxer implements StreamMuxer {
this.closeStream(id)
this.onStreamEnd?.(stream)
},
log: logger(`libp2p:yamux:${direction}:${id}`),
log: this.logger.forComponent(`libp2p:yamux:${direction}:${id}`),
config: this.config,
getRTT: this.getRTT.bind(this)
})
Expand Down Expand Up @@ -554,14 +555,15 @@ export class YamuxMuxer implements StreamMuxer {
this.onIncomingStream?.(stream)
}

private sendFrame (header: FrameHeader, data?: Uint8Array): void {
private sendFrame (header: FrameHeader, data?: Uint8ArrayList): void {
this.log?.trace('sending frame %o', header)
if (header.type === FrameType.Data) {
if (data === undefined) {
throw new CodeError('invalid frame', ERR_INVALID_FRAME)
}
this.source.push(encodeHeader(header))
this.source.push(data)
this.source.push(
new Uint8ArrayList(encodeHeader(header), data)
)
} else {
this.source.push(encodeHeader(header))
}
Expand Down
10 changes: 5 additions & 5 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CodeError } from '@libp2p/interface/errors'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream'
import { CodeError } from '@libp2p/interface'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream'
import each from 'it-foreach'
import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, INITIAL_STREAM_WINDOW } from './constants.js'
import { Flag, type FrameHeader, FrameType, HEADER_LENGTH } from './frame.js'
Expand All @@ -17,7 +17,7 @@ export enum StreamState {

export interface YamuxStreamInit extends AbstractStreamInit {
name?: string
sendFrame(header: FrameHeader, body?: Uint8Array): void
sendFrame(header: FrameHeader, body?: Uint8ArrayList): void
getRTT(): number
config: Config
state: StreamState
Expand Down Expand Up @@ -49,7 +49,7 @@ export class YamuxStream extends AbstractStream {
private epochStart: number
private readonly getRTT: () => number

private readonly sendFrame: (header: FrameHeader, body?: Uint8Array) => void
private readonly sendFrame: (header: FrameHeader, body?: Uint8ArrayList) => void

constructor (init: YamuxStreamInit) {
super({
Expand Down Expand Up @@ -115,7 +115,7 @@ export class YamuxStream extends AbstractStream {
flag: flags,
streamID: this._id,
length: toSend
}, buf.subarray(0, toSend))
}, buf.sublist(0, toSend))

this.sendWindowCapacity -= toSend

Expand Down
5 changes: 4 additions & 1 deletion test/compliance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
/* eslint-env mocha */

import tests from '@libp2p/interface-compliance-tests/stream-muxer'
import { defaultLogger } from '@libp2p/logger'
import { TestYamux } from './util.js'

describe('compliance', () => {
tests({
async setup () {
return new TestYamux({})
return new TestYamux({
logger: defaultLogger()
})
},
async teardown () {}
})
Expand Down
16 changes: 10 additions & 6 deletions test/mplex.util.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { defaultLogger } from '@libp2p/logger'
import { mplex } from '@libp2p/mplex'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { Source, Transform } from 'it-stream-types'

const factory = mplex()()
const factory = mplex()({
logger: defaultLogger()
})

export function testYamuxMuxer (name: string, client: boolean, conf: StreamMuxerInit = {}): StreamMuxer {
return factory.createStreamMuxer({
Expand Down Expand Up @@ -54,14 +58,14 @@ export function testClientServer (conf: StreamMuxerInit = {}): {
unpauseWrite(): void
}
} {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client = testYamuxMuxer('libp2p:mplex:client', true, conf)
const server = testYamuxMuxer('libp2p:mplex:server', false, conf)

const clientReadTransform = pauseableTransform<Uint8Array>()
const clientWriteTransform = pauseableTransform<Uint8Array>()
const serverReadTransform = pauseableTransform<Uint8Array>()
const serverWriteTransform = pauseableTransform<Uint8Array>()
const clientReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const clientWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()

void pipe(pair[0], clientReadTransform.transform, client, clientWriteTransform.transform, pair[0])
void pipe(pair[1], serverReadTransform.transform, server, serverWriteTransform.transform, pair[1])
Expand Down
5 changes: 3 additions & 2 deletions test/muxer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { expect } from 'aegir/chai'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import { ERR_MUXER_LOCAL_CLOSED } from '../src/constants.js'
import { sleep, testClientServer, testYamuxMuxer, type YamuxFixture } from './util.js'

Expand All @@ -29,7 +30,7 @@ describe('muxer', () => {
})

it('test client<->client', async () => {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client1 = testYamuxMuxer('libp2p:yamux:1', true)
const client2 = testYamuxMuxer('libp2p:yamux:2', true)
void pipe(pair[0], client1, pair[0])
Expand All @@ -44,7 +45,7 @@ describe('muxer', () => {
})

it('test server<->server', async () => {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client1 = testYamuxMuxer('libp2p:yamux:1', false)
const client2 = testYamuxMuxer('libp2p:yamux:2', false)
void pipe(pair[0], client1, pair[0])
Expand Down
24 changes: 13 additions & 11 deletions test/util.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { logger } from '@libp2p/logger'
import { prefixLogger } from '@libp2p/logger'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import { Yamux, YamuxMuxer, type YamuxMuxerInit } from '../src/muxer.js'
import type { Config } from '../src/config.js'
import type { Source, Transform } from 'it-stream-types'
Expand Down Expand Up @@ -28,16 +29,17 @@ export const testConf: Partial<Config> = {
export class TestYamux extends Yamux {
createStreamMuxer (init?: YamuxMuxerInit): YamuxMuxer {
const client = isClient()
return super.createStreamMuxer({ ...testConf, ...init, direction: client ? 'outbound' : 'inbound', log: logger(`libp2p:yamux${client ? 1 : 2}`) })
return super.createStreamMuxer({ ...testConf, ...init, direction: client ? 'outbound' : 'inbound' })
}
}

export function testYamuxMuxer (name: string, client: boolean, conf: YamuxMuxerInit = {}): YamuxMuxer {
return new YamuxMuxer({
logger: prefixLogger(name)
}, {
...testConf,
...conf,
direction: client ? 'outbound' : 'inbound',
log: logger(name)
direction: client ? 'outbound' : 'inbound'
})
}

Expand Down Expand Up @@ -79,14 +81,14 @@ export function testClientServer (conf: YamuxMuxerInit = {}): {
client: YamuxFixture
server: YamuxFixture
} {
const pair = duplexPair<Uint8Array>()
const client = testYamuxMuxer('libp2p:yamux:client', true, conf)
const server = testYamuxMuxer('libp2p:yamux:server', false, conf)
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client = testYamuxMuxer('client', true, conf)
const server = testYamuxMuxer('server', false, conf)

const clientReadTransform = pauseableTransform<Uint8Array>()
const clientWriteTransform = pauseableTransform<Uint8Array>()
const serverReadTransform = pauseableTransform<Uint8Array>()
const serverWriteTransform = pauseableTransform<Uint8Array>()
const clientReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const clientWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()

void pipe(pair[0], clientReadTransform.transform, client, clientWriteTransform.transform, pair[0])
void pipe(pair[1], serverReadTransform.transform, server, serverWriteTransform.transform, pair[1])
Expand Down