Skip to content

Commit

Permalink
fix: crash during DHT query abort when reading is slow (#2225)
Browse files Browse the repository at this point in the history
If a DHT query is aborted during reading, the deferred promise can become rejected while nothing is `await`ing it.

Switch the implementation to use a `pushable` queue instead.

Fixes #2216
  • Loading branch information
achingbrain authored Nov 10, 2023
1 parent effcfaa commit c960eb6
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 71 deletions.
2 changes: 2 additions & 0 deletions packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"it-merge": "^3.0.0",
"it-parallel": "^3.0.0",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.1",
"it-stream-types": "^2.0.1",
"it-take": "^3.0.1",
"multiformats": "^12.0.1",
Expand Down Expand Up @@ -107,6 +108,7 @@
"protons": "^7.0.2",
"sinon": "^17.0.0",
"ts-sinon": "^2.0.2",
"wherearewe": "^2.0.1",
"which": "^4.0.0"
},
"browser": {
Expand Down
73 changes: 2 additions & 71 deletions packages/kad-dht/src/query/query-path.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { CodeError } from '@libp2p/interface/errors'
import { anySignal } from 'any-signal'
import defer from 'p-defer'
import Queue from 'p-queue'
import { toString } from 'uint8arrays/to-string'
import { xor } from 'uint8arrays/xor'
import { convertPeerId, convertBuffer } from '../utils.js'
import { queryErrorEvent } from './events.js'
import { queueToGenerator } from './utils.js'
import type { CleanUpEvents } from './manager.js'
import type { QueryEvent, QueryOptions } from '../index.js'
import type { QueryFunc } from '../query/types.js'
Expand Down Expand Up @@ -182,73 +181,5 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
queryPeer(startingPeer, await convertPeerId(startingPeer))

// yield results as they come in
yield * toGenerator(queue, signal, cleanUp, log)
}

async function * toGenerator (queue: Queue, signal: AbortSignal, cleanUp: TypedEventTarget<CleanUpEvents>, log: Logger): AsyncGenerator<QueryEvent, void, undefined> {
let deferred = defer()
let running = true
const results: QueryEvent[] = []

const cleanup = (): void => {
if (!running) {
return
}

log('clean up queue, results %d, queue size %d, pending tasks %d', results.length, queue.size, queue.pending)

running = false
queue.clear()
results.splice(0, results.length)
}

queue.on('completed', result => {
results.push(result)
deferred.resolve()
})
queue.on('error', err => {
log('queue error', err)
cleanup()
deferred.reject(err)
})
queue.on('idle', () => {
log('queue idle')
running = false
deferred.resolve()
})

// clear the queue and throw if the query is aborted
signal.addEventListener('abort', () => {
log('abort queue')
const wasRunning = running
cleanup()

if (wasRunning) {
deferred.reject(new CodeError('Query aborted', 'ERR_QUERY_ABORTED'))
}
})

// the user broke out of the loop early, ensure we resolve the deferred result
// promise and clear the queue of any remaining jobs
cleanUp.addEventListener('cleanup', () => {
cleanup()
deferred.resolve()
})

while (running) { // eslint-disable-line no-unmodified-loop-condition
await deferred.promise
deferred = defer()

// yield all available results
while (results.length > 0) {
const result = results.shift()

if (result != null) {
yield result
}
}
}

// yield any remaining results
yield * results
yield * queueToGenerator(queue, signal, cleanUp, log)
}
65 changes: 65 additions & 0 deletions packages/kad-dht/src/query/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { CodeError } from '@libp2p/interface/errors'
import { pushable } from 'it-pushable'
import type { CleanUpEvents } from './manager.js'
import type { QueryEvent } from '../index.js'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { Logger } from '@libp2p/logger'
import type Queue from 'p-queue'

export async function * queueToGenerator (queue: Queue, signal: AbortSignal, cleanUp: TypedEventTarget<CleanUpEvents>, log: Logger): AsyncGenerator<QueryEvent, void, undefined> {
const stream = pushable<QueryEvent>({
objectMode: true
})

const cleanup = (err?: Error): void => {
log('clean up queue, results %d, queue size %d, pending tasks %d', stream.readableLength, queue.size, queue.pending)
queue.clear()
stream.end(err)
}

const onQueueJobComplete = (result: QueryEvent): void => {
if (result != null) {
stream.push(result)
}
}

const onQueueError = (err: Error): void => {
log('queue error', err)
cleanup(err)
}

const onQueueIdle = (): void => {
log('queue idle')
cleanup()
}

// clear the queue and throw if the query is aborted
const onSignalAbort = (): void => {
log('abort queue')
cleanup(new CodeError('Query aborted', 'ERR_QUERY_ABORTED'))
}

// the user broke out of the loop early, ensure we resolve the deferred result
// promise and clear the queue of any remaining jobs
const onCleanUp = (): void => {
cleanup()
}

// add listeners
queue.on('completed', onQueueJobComplete)
queue.on('error', onQueueError)
queue.on('idle', onQueueIdle)
signal.addEventListener('abort', onSignalAbort)
cleanUp.addEventListener('cleanup', onCleanUp)

try {
yield * stream
} finally {
// remove listeners
queue.removeListener('completed', onQueueJobComplete)
queue.removeListener('error', onQueueError)
queue.removeListener('idle', onQueueIdle)
signal.removeEventListener('abort', onSignalAbort)
cleanUp.removeEventListener('cleanup', onCleanUp)
}
}
77 changes: 77 additions & 0 deletions packages/kad-dht/test/query/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { TypedEventEmitter } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import delay from 'delay'
import all from 'it-all'
import Queue from 'p-queue'
import Sinon from 'sinon'
import { isNode } from 'wherearewe'
import { queueToGenerator } from '../../src/query/utils.js'
import type { CleanUpEvents } from '../../src/query/manager.js'

describe('query utils', () => {
describe('queue to generator', () => {
it('converts a queue to a generator', async () => {
const queue = new Queue()
const controller = new AbortController()
const signal = controller.signal
const cleanUp = new TypedEventEmitter<CleanUpEvents>()
const log = logger('test-logger')

void queue.add(async () => {
await delay(10)
return true
})

const results = await all(queueToGenerator(queue, signal, cleanUp, log))

expect(results).to.deep.equal([true])
})

it('aborts during read', async () => {
const listener = Sinon.stub()

if (isNode) {
process.on('unhandledRejection', listener)
}

const queue = new Queue({
concurrency: 1
})
const controller = new AbortController()
const signal = controller.signal
const cleanUp = new TypedEventEmitter<CleanUpEvents>()
const log = logger('test-logger')

void queue.add(async () => {
await delay(10)
return 1
})
void queue.add(async () => {
await delay(10)
return 2
})

let count = 1

await expect((async () => {
for await (const result of queueToGenerator(queue, signal, cleanUp, log) as any) {
expect(result).to.equal(count)
count++

// get the first result
if (result === 1) {
// abort the queue
controller.abort()
}
}
})()).to.eventually.be.rejected
.with.property('code', 'ERR_QUERY_ABORTED')

if (isNode) {
process.removeListener('unhandledRejection', listener)
expect(listener.called).to.be.false('unhandled promise rejection detected')
}
})
})
})

0 comments on commit c960eb6

Please sign in to comment.