Skip to content

Commit

Permalink
fix: improve dial queue and parallel dials (libp2p#315)
Browse files Browse the repository at this point in the history
* feat: allow dialer queues to do many requests to a peer

* fix: parallel dials and validate cancelled conns

* feat: make dial timeout configurable

* fix: allow already connected peers to dial immediately

* refactor: add dial timeout to consts file

* fix: keep better track of in progress queues

* refactor: make dials race
  • Loading branch information
jacobheun authored Mar 28, 2019
1 parent 57146a0 commit fcbcccc
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 50 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ const sw = new switch(peerInfo , peerBook [, options])
If defined, `options` should be an object with the following keys and respective values:

- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds)
- `maxParallelDials` - number of concurrent dials the switch should allow. Defaults to `50`
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `50`
- `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds)
- `stats`: an object with the following keys and respective values:
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
- `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`.
Expand Down
1 change: 1 addition & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

module.exports = {
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials
}
10 changes: 3 additions & 7 deletions src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Queue {
* @constructor
* @param {string} peerId
* @param {Switch} _switch
* @param {function} onStopped Called when the queue stops
* @param {function(string)} onStopped Called when the queue stops
*/
constructor (peerId, _switch, onStopped) {
this.id = peerId
Expand All @@ -78,20 +78,16 @@ class Queue {
}

/**
* Adds the dial request to the queue and starts the
* queue if it is stopped
* Adds the dial request to the queue. The queue is not automatically started
* @param {string} protocol
* @param {boolean} useFSM If callback should use a ConnectionFSM instead
* @param {function(Error, Connection)} callback
* @returns {boolean} whether or not the queue has been started
*/
add (protocol, useFSM, callback) {
if (!this.isDialAllowed()) {
nextTick(callback, ERR_BLACKLISTED())
return false
}
this._queue.push({ protocol, useFSM, callback })
return this.start()
}

/**
Expand Down Expand Up @@ -133,7 +129,7 @@ class Queue {
if (this.isRunning) {
log('stopping dial queue to %s', this.id)
this.isRunning = false
this.onStopped()
this.onStopped(this.id)
}
}

Expand Down
51 changes: 29 additions & 22 deletions src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const once = require('once')
const Queue = require('./queue')
const { DIAL_ABORTED } = require('../errors')
const noop = () => {}

class DialQueueManager {
Expand All @@ -11,10 +10,10 @@ class DialQueueManager {
* @param {Switch} _switch
*/
constructor (_switch) {
this._queue = []
this._queue = new Set()
this._dialingQueues = new Set()
this._queues = {}
this.switch = _switch
this.dials = 0
}

/**
Expand All @@ -24,11 +23,8 @@ class DialQueueManager {
* This causes the entire DialerQueue to be drained
*/
abort () {
// Abort items in the general queue
while (this._queue.length > 0) {
let dial = this._queue.shift()
dial.callback(DIAL_ABORTED())
}
// Clear the general queue
this._queue.clear()

// Abort the individual peer queues
const queues = Object.values(this._queues)
Expand All @@ -46,29 +42,39 @@ class DialQueueManager {
add ({ peerInfo, protocol, useFSM, callback }) {
callback = callback ? once(callback) : noop

// If the target queue is currently running, just add the dial
// directly to it. This acts as a crude priority lane for multiple
// calls to a peer.
// Add the dial to its respective queue
const targetQueue = this.getQueue(peerInfo)
if (targetQueue.isRunning) {
targetQueue.add(protocol, useFSM, callback)
targetQueue.add(protocol, useFSM, callback)

// If we're already connected to the peer, start the queue now
// While it might cause queues to go over the max parallel amount,
// it avoids blocking peers we're already connected to
if (peerInfo.isConnected()) {
targetQueue.start()
return
}

this._queue.push({ peerInfo, protocol, useFSM, callback })
// Add the id to the general queue set if the queue isn't running
// and if the queue is allowed to dial
if (!targetQueue.isRunning && targetQueue.isDialAllowed()) {
this._queue.add(targetQueue.id)
}

this.run()
}

/**
* Will execute up to `MAX_PARALLEL_DIALS` dials
*/
run () {
if (this.dials < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.length > 0) {
let { peerInfo, protocol, useFSM, callback } = this._queue.shift()
let dialQueue = this.getQueue(peerInfo)
if (dialQueue.add(protocol, useFSM, callback)) {
this.dials++
}
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) {
let nextQueue = this._queue.values().next()
if (nextQueue.done) return

this._queue.delete(nextQueue.value)
let targetQueue = this._queues[nextQueue.value]
this._dialingQueues.add(targetQueue.id)
targetQueue.start()
}
}

Expand All @@ -84,9 +90,10 @@ class DialQueueManager {
* A handler for when dialing queues stop. This will trigger
* `run()` in order to keep the queue processing.
* @private
* @param {string} id peer id of the queue that stopped
*/
_onQueueStopped () {
this.dials--
_onQueueStopped (id) {
this._dialingQueues.delete(id)
this.run()
}

Expand Down
21 changes: 12 additions & 9 deletions src/limit-dialer/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
'use strict'

const tryEach = require('async/tryEach')
const race = require('async/race')
const debug = require('debug')
const once = require('once')

const log = debug('libp2p:switch:dialer')

const DialQueue = require('./queue')
const { CONNECTION_FAILED } = require('../errors')

/**
* Track dials per peer and limited them.
Expand Down Expand Up @@ -42,19 +43,21 @@ class LimitDialer {

let errors = []
const tasks = addrs.map((m) => {
return (cb) => this.dialSingle(peer, transport, m, token, (err, result) => {
if (err) {
errors.push(err)
return cb(err)
return (cb) => this.dialSingle(peer, transport, m, token, (err, res) => {
if (res) return cb(null, res)

errors.push(err || CONNECTION_FAILED())

if (errors.length === tasks.length) {
cb(errors)
}
return cb(null, result)
})
})

tryEach(tasks, (_, result) => {
if (result && result.conn) {
race(tasks, (_, successfulDial) => {
if (successfulDial) {
log('dialMany:success')
return callback(null, result)
return callback(null, successfulDial)
}

log('dialMany:error')
Expand Down
4 changes: 2 additions & 2 deletions src/limit-dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ class DialQueue {
pull(empty(), conn)
// If we can close the connection, do it
if (typeof conn.close === 'function') {
return conn.close((_) => callback(null, { cancel: true }))
return conn.close((_) => callback(null))
}
return callback(null, { cancel: true })
return callback(null)
}

// one is enough
Expand Down
7 changes: 2 additions & 5 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,19 @@ const debug = require('debug')
const log = debug('libp2p:switch:transport')

const LimitDialer = require('./limit-dialer')
const { DIAL_TIMEOUT } = require('./constants')

// number of concurrent outbound dials to make per peer, same as go-libp2p-swtch
const defaultPerPeerRateLimit = 8

// the amount of time a single dial has to succeed
// TODO this should be exposed as a option
const dialTimeout = 30 * 1000

/**
* Manages the transports for the switch. This simplifies dialing and listening across
* multiple transports.
*/
class TransportManager {
constructor (_switch) {
this.switch = _switch
this.dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout)
this.dialer = new LimitDialer(defaultPerPeerRateLimit, this.switch._options.dialTimeout || DIAL_TIMEOUT)
}

/**
Expand Down
13 changes: 9 additions & 4 deletions test/limit-dialer.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-checkmark'))
const expect = chai.expect
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
Expand Down Expand Up @@ -52,13 +53,15 @@ describe('LimitDialer', () => {
it('two success', (done) => {
const dialer = new LimitDialer(2, 10)

expect(2).checks(done)

// mock transport
const t1 = {
dial (addr, cb) {
const as = addr.toString()
if (as.match(/191/)) {
setImmediate(() => cb(new Error('fail')))
return {}
return null
} else if (as.match(/192/)) {
setTimeout(cb, 2)
return {
Expand All @@ -69,7 +72,10 @@ describe('LimitDialer', () => {
setTimeout(cb, 8)
return {
source: pull.values([2]),
sink: pull.drain()
sink: pull.onEnd((err) => {
// Verify the unused connection gets closed
expect(err).to.not.exist().mark()
})
}
}
}
Expand All @@ -83,8 +89,7 @@ describe('LimitDialer', () => {
conn,
pull.collect((err, res) => {
expect(err).to.not.exist()
expect(res).to.be.eql([1])
done()
expect(res).to.be.eql([1]).mark()
})
)
})
Expand Down

0 comments on commit fcbcccc

Please sign in to comment.