From a7c6a93c6717a073bd8677a714565c91515290f2 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 18 Dec 2023 06:55:46 +0000 Subject: [PATCH] fix: graceful close of optimistic selection with early data (#2318) When we are doing optimistic protocol selection and we send a small amount of data then immediately close the stream, if necessary wait until we have finished sending protocol+data before allowing the underlying stream to close. --- packages/multistream-select/package.json | 1 + packages/multistream-select/src/select.ts | 30 ++++++++++++-- .../test/integration.spec.ts | 41 ++++++++++++++++--- 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/packages/multistream-select/package.json b/packages/multistream-select/package.json index 673d0d6dfd..44f8c4ac71 100644 --- a/packages/multistream-select/package.json +++ b/packages/multistream-select/package.json @@ -63,6 +63,7 @@ "it-pipe": "^3.0.1", "it-stream-types": "^2.0.1", "p-defer": "^4.0.0", + "race-signal": "^1.0.2", "uint8-varint": "^2.0.2", "uint8arraylist": "^2.4.3", "uint8arrays": "^5.0.0" diff --git a/packages/multistream-select/src/select.ts b/packages/multistream-select/src/select.ts index 4abc2ca438..48e8cb3cf7 100644 --- a/packages/multistream-select/src/select.ts +++ b/packages/multistream-select/src/select.ts @@ -1,6 +1,7 @@ import { CodeError } from '@libp2p/interface' import { lpStream } from 'it-length-prefixed-stream' import pDefer from 'p-defer' +import { raceSignal } from 'race-signal' import * as varint from 'uint8-varint' import { Uint8ArrayList } from 'uint8arraylist' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' @@ -181,6 +182,12 @@ function optimisticSelect (stream: Stream, protoco sentProtocol = true sendingProtocol = false doneSendingProtocol.resolve() + + // read the negotiation response but don't block more sending + negotiate() + .catch(err => { + options.log.error('could not finish optimistic protocol negotiation of %s', protocol, err) + }) } else { yield buf } @@ -321,9 +328,26 @@ function optimisticSelect (stream: Stream, protoco const originalClose = stream.close.bind(stream) stream.close = async (opts) => { - // the stream is being closed, don't try to negotiate a protocol if we - // haven't already - if (!negotiated) { + // if we are in the process of negotiation, let it finish before closing + // because we may have unsent early data + const tasks = [] + + if (sendingProtocol) { + tasks.push(doneSendingProtocol.promise) + } + + if (readingProtocol) { + tasks.push(doneReadingProtocol.promise) + } + + if (tasks.length > 0) { + // let the in-flight protocol negotiation finish gracefully + await raceSignal( + Promise.all(tasks), + opts?.signal + ) + } else { + // no protocol negotiation attempt has occurred so don't start one negotiated = true negotiating = false doneNegotiating.resolve() diff --git a/packages/multistream-select/test/integration.spec.ts b/packages/multistream-select/test/integration.spec.ts index 479f798ad3..729439083d 100644 --- a/packages/multistream-select/test/integration.spec.ts +++ b/packages/multistream-select/test/integration.spec.ts @@ -88,7 +88,7 @@ describe('Dialer and Listener integration', () => { expect(new Uint8ArrayList(...output[0]).slice()).to.eql(new Uint8ArrayList(...input).slice()) }) - it('should handle and lazySelect', async () => { + it('should handle and optimistically select', async () => { const protocol = '/echo/1.0.0' const pair = duplexPair() @@ -113,7 +113,7 @@ describe('Dialer and Listener integration', () => { expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice()) }) - it('should handle and lazySelect that fails', async () => { + it('should handle and optimistically select that fails', async () => { const protocol = '/echo/1.0.0' const otherProtocol = '/echo/2.0.0' const pair = duplexPair() @@ -134,7 +134,7 @@ describe('Dialer and Listener integration', () => { .to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL') }) - it('should handle and lazySelect only by reading', async () => { + it('should handle and optimistically select only by reading', async () => { const protocol = '/echo/1.0.0' const pair = duplexPair() @@ -162,7 +162,38 @@ describe('Dialer and Listener integration', () => { expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice()) }) - it('should handle and lazySelect only by reading that fails', async () => { + it('should handle and optimistically select only by writing', async () => { + const protocol = '/echo/1.0.0' + const pair = duplexPair() + + const dialerSelection = await mss.select(pair[0], [protocol], { + log: logger('mss:dialer') + }) + expect(dialerSelection.protocol).to.equal(protocol) + + // ensure stream is usable after selection + const input = [randomBytes(10), randomBytes(64), randomBytes(3)] + + const [listenerOut] = await Promise.all([ + // the listener handles the incoming stream + mss.handle(pair[1], protocol, { + log: logger('mss:listener') + }).then(async result => { + // the listener reads from the incoming stream + return pipe(result.stream, async source => all(source)) + }), + Promise.resolve().then(async () => { + // the dialer just writes to the stream + await pair[0].sink(async function * () { + yield * input + }()) + }) + ]) + + expect(new Uint8ArrayList(...listenerOut).slice()).to.eql(new Uint8ArrayList(...input).slice()) + }) + + it('should handle and optimistically select only by reading that fails', async () => { const protocol = '/echo/1.0.0' const otherProtocol = '/echo/2.0.0' const pair = duplexPair() @@ -183,7 +214,7 @@ describe('Dialer and Listener integration', () => { .to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL') }) - it('should abort an unhandled lazySelect', async () => { + it('should abort an unhandled optimistically select', async () => { const protocol = '/echo/1.0.0' const pair = duplexPair()