diff --git a/package.json b/package.json index af0b7a6..dd3fec4 100644 --- a/package.json +++ b/package.json @@ -153,8 +153,8 @@ "varint": "^6.0.0" }, "devDependencies": { - "@libp2p/interface-compliance-tests": "^1.1.32", - "@libp2p/interfaces": "^1.3.31", + "@libp2p/interface-compliance-tests": "^2.0.1", + "@libp2p/interfaces": "^2.0.1", "@types/varint": "^6.0.0", "aegir": "^37.0.10", "cborg": "^1.8.1", diff --git a/src/mplex.ts b/src/mplex.ts index 9ad0acf..3eefb5d 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -9,6 +9,7 @@ import { createStream } from './stream.js' import { toString as uint8ArrayToString } from 'uint8arrays' import { trackedMap } from '@libp2p/tracked-map' import { logger } from '@libp2p/logger' +import errCode from 'err-code' import type { Components } from '@libp2p/interfaces/components' import type { Sink } from 'it-stream-types' import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer' @@ -130,6 +131,10 @@ export class MplexStreamMuxer implements StreamMuxer { } const send = (msg: Message) => { + if (!registry.has(id)) { + throw errCode(new Error('the stream is not in the muxer registry, it may have already been closed'), 'ERR_STREAM_DOESNT_EXIST') + } + if (log.enabled) { log.trace('%s stream %s send', type, id, printMessage(msg)) } @@ -196,10 +201,18 @@ export class MplexStreamMuxer implements StreamMuxer { const { initiators, receivers } = this._streams // Abort all the things! for (const s of initiators.values()) { - s.abort(err) + if (err != null) { + s.abort(err) + } else { + s.close() + } } for (const s of receivers.values()) { - s.abort(err) + if (err != null) { + s.abort(err) + } else { + s.close() + } } } const source = pushableV({ onEnd }) @@ -241,14 +254,17 @@ export class MplexStreamMuxer implements StreamMuxer { switch (type) { case MessageTypes.MESSAGE_INITIATOR: case MessageTypes.MESSAGE_RECEIVER: + // We got data from the remote, push it into our local stream stream.source.push(message.data.slice()) break case MessageTypes.CLOSE_INITIATOR: case MessageTypes.CLOSE_RECEIVER: - stream.close() + // We should expect no more data from the remote, stop reading + stream.closeRead() break case MessageTypes.RESET_INITIATOR: case MessageTypes.RESET_RECEIVER: + // Stop reading and writing to the stream immediately stream.reset() break default: diff --git a/src/stream.ts b/src/stream.ts index ea0a371..f2b81a4 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -16,6 +16,7 @@ const log = logger('libp2p:mplex:stream') const ERR_MPLEX_STREAM_RESET = 'ERR_MPLEX_STREAM_RESET' const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT' +const ERR_MPLEX_SINK_ENDED = 'ERR_MPLEX_SINK_ENDED' export interface Options { id: number @@ -31,6 +32,7 @@ export function createStream (options: Options): MplexStream { const abortController = new AbortController() const resetController = new AbortController() + const closeController = new AbortController() const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes const externalId = type === 'initiator' ? (`i${id}`) : `r${id}` const streamName = `${name == null ? id : name}` @@ -49,7 +51,7 @@ export function createStream (options: Options): MplexStream { } sourceEnded = true - log.trace('%s stream %s source end', type, streamName, err) + log.trace('%s stream %s source end - err: %o', type, streamName, err) if (err != null && endErr == null) { endErr = err @@ -85,19 +87,54 @@ export function createStream (options: Options): MplexStream { } } - const stream = { - // Close for reading + const stream: MplexStream = { + // Close for both Reading and Writing close: () => { + log.trace('%s stream %s close', type, streamName) + + stream.closeRead() + stream.closeWrite() + }, + + // Close for reading + closeRead: () => { + log.trace('%s stream %s closeRead', type, streamName) + + if (sourceEnded) { + return + } + stream.source.end() }, + + // Close for writing + closeWrite: () => { + log.trace('%s stream %s closeWrite', type, streamName) + + if (sinkEnded) { + return + } + + closeController.abort() + + try { + send({ id, type: Types.CLOSE }) + } catch (err) { + log.trace('%s stream %s error sending close', type, name, err) + } + + onSinkEnd() + }, + // Close for reading and writing (local error) - abort: (err?: Error) => { + abort: (err: Error) => { log.trace('%s stream %s abort', type, streamName, err) // End the source with the passed error stream.source.end(err) abortController.abort() onSinkEnd(err) }, + // Close immediately for reading and writing (remote error) reset: () => { const err = errCode(new Error('stream reset'), ERR_MPLEX_STREAM_RESET) @@ -105,10 +142,16 @@ export function createStream (options: Options): MplexStream { stream.source.end(err) onSinkEnd(err) }, + sink: async (source: Source) => { + if (sinkEnded) { + throw errCode(new Error('stream closed for writing'), ERR_MPLEX_SINK_ENDED) + } + source = abortableSource(source, anySignal([ abortController.signal, - resetController.signal + resetController.signal, + closeController.signal ])) try { @@ -135,6 +178,10 @@ export function createStream (options: Options): MplexStream { } } catch (err: any) { if (err.type === 'aborted' && err.message === 'The operation was aborted') { + if (closeController.signal.aborted) { + return + } + if (resetController.signal.aborted) { err.message = 'stream reset' err.code = ERR_MPLEX_STREAM_RESET @@ -171,10 +218,13 @@ export function createStream (options: Options): MplexStream { onSinkEnd() }, + source: pushable({ onEnd: onSourceEnd }), + timeline, + id: externalId } diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 232d7a2..580de96 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -94,7 +94,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver // when the initiator sends a CLOSE message, we call close if (msg.type === MessageTypes.CLOSE_INITIATOR) { - receiver.close() + void receiver.closeRead() } // when the initiator sends a RESET message, we call close @@ -114,7 +114,7 @@ async function streamPair (n: number, onInitiatorMessage?: onMessage, onReceiver // when the receiver sends a CLOSE message, we call close if (msg.type === MessageTypes.CLOSE_RECEIVER) { - initiator.close() + void initiator.close() } // when the receiver sends a RESET message, we call close