Skip to content

Commit

Permalink
fix: graceful close of optimistic selection with early data (#2318)
Browse files Browse the repository at this point in the history
When we are doing optimistic protocol selection and we send a small
amount of data then immediately close the stream, if necessary wait
until we have finished sending protocol+data before allowing the
underlying stream to close.
  • Loading branch information
achingbrain authored Dec 18, 2023
1 parent 9eff7ef commit a7c6a93
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 8 deletions.
1 change: 1 addition & 0 deletions packages/multistream-select/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"it-pipe": "^3.0.1",
"it-stream-types": "^2.0.1",
"p-defer": "^4.0.0",
"race-signal": "^1.0.2",
"uint8-varint": "^2.0.2",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^5.0.0"
Expand Down
30 changes: 27 additions & 3 deletions packages/multistream-select/src/select.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CodeError } from '@libp2p/interface'
import { lpStream } from 'it-length-prefixed-stream'
import pDefer from 'p-defer'
import { raceSignal } from 'race-signal'
import * as varint from 'uint8-varint'
import { Uint8ArrayList } from 'uint8arraylist'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand Down Expand Up @@ -181,6 +182,12 @@ function optimisticSelect <Stream extends SelectStream> (stream: Stream, protoco
sentProtocol = true
sendingProtocol = false
doneSendingProtocol.resolve()

// read the negotiation response but don't block more sending
negotiate()
.catch(err => {
options.log.error('could not finish optimistic protocol negotiation of %s', protocol, err)
})
} else {
yield buf
}
Expand Down Expand Up @@ -321,9 +328,26 @@ function optimisticSelect <Stream extends SelectStream> (stream: Stream, protoco
const originalClose = stream.close.bind(stream)

stream.close = async (opts) => {
// the stream is being closed, don't try to negotiate a protocol if we
// haven't already
if (!negotiated) {
// if we are in the process of negotiation, let it finish before closing
// because we may have unsent early data
const tasks = []

if (sendingProtocol) {
tasks.push(doneSendingProtocol.promise)
}

if (readingProtocol) {
tasks.push(doneReadingProtocol.promise)
}

if (tasks.length > 0) {
// let the in-flight protocol negotiation finish gracefully
await raceSignal(
Promise.all(tasks),
opts?.signal
)
} else {
// no protocol negotiation attempt has occurred so don't start one
negotiated = true
negotiating = false
doneNegotiating.resolve()
Expand Down
41 changes: 36 additions & 5 deletions packages/multistream-select/test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ describe('Dialer and Listener integration', () => {
expect(new Uint8ArrayList(...output[0]).slice()).to.eql(new Uint8ArrayList(...input).slice())
})

it('should handle and lazySelect', async () => {
it('should handle and optimistically select', async () => {
const protocol = '/echo/1.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

Expand All @@ -113,7 +113,7 @@ describe('Dialer and Listener integration', () => {
expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
})

it('should handle and lazySelect that fails', async () => {
it('should handle and optimistically select that fails', async () => {
const protocol = '/echo/1.0.0'
const otherProtocol = '/echo/2.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()
Expand All @@ -134,7 +134,7 @@ describe('Dialer and Listener integration', () => {
.to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
})

it('should handle and lazySelect only by reading', async () => {
it('should handle and optimistically select only by reading', async () => {
const protocol = '/echo/1.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

Expand Down Expand Up @@ -162,7 +162,38 @@ describe('Dialer and Listener integration', () => {
expect(new Uint8ArrayList(...dialerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
})

it('should handle and lazySelect only by reading that fails', async () => {
it('should handle and optimistically select only by writing', async () => {
const protocol = '/echo/1.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

const dialerSelection = await mss.select(pair[0], [protocol], {
log: logger('mss:dialer')
})
expect(dialerSelection.protocol).to.equal(protocol)

// ensure stream is usable after selection
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]

const [listenerOut] = await Promise.all([
// the listener handles the incoming stream
mss.handle(pair[1], protocol, {
log: logger('mss:listener')
}).then(async result => {
// the listener reads from the incoming stream
return pipe(result.stream, async source => all(source))
}),
Promise.resolve().then(async () => {
// the dialer just writes to the stream
await pair[0].sink(async function * () {
yield * input
}())
})
])

expect(new Uint8ArrayList(...listenerOut).slice()).to.eql(new Uint8ArrayList(...input).slice())
})

it('should handle and optimistically select only by reading that fails', async () => {
const protocol = '/echo/1.0.0'
const otherProtocol = '/echo/2.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()
Expand All @@ -183,7 +214,7 @@ describe('Dialer and Listener integration', () => {
.to.eventually.be.rejected.with.property('code', 'ERR_UNSUPPORTED_PROTOCOL')
})

it('should abort an unhandled lazySelect', async () => {
it('should abort an unhandled optimistically select', async () => {
const protocol = '/echo/1.0.0'
const pair = duplexPair<Uint8ArrayList | Uint8Array>()

Expand Down

0 comments on commit a7c6a93

Please sign in to comment.