diff --git a/README.md b/README.md index 8a162d1f26..64e5806600 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ const WS = require('libp2p-websockets') const spdy = require('libp2p-spdy') const secio = require('libp2p-secio') const MulticastDNS = require('libp2p-mdns') +const DHT = require('libp2p-kad-dht') class Node extends libp2p { constructor (peerInfo, peerBook, options) { @@ -95,7 +96,9 @@ class Node extends libp2p { }, discovery: [ new MulticastDNS(peerInfo, 'your-identifier') - ] + ], + // DHT is passed as its own enabling PeerRouting, ContentRouting and DHT itself components + dht: DHT } super(modules, peerInfo, peerBook, options) @@ -144,6 +147,36 @@ class Node extends libp2p { `callback` is a function with the following `function (err) {}` signature, where `err` is an Error in case stopping the node fails. +#### `libp2p.peerRouting.findPeer(id, callback)` + +> Looks up for multiaddrs of a peer in the DHT + +- `id`: instance of [PeerId][] + +#### `libp2p.contentRouting.findProviders(key, timeout, callback)` + +- `key`: Buffer +- `timeout`: Number miliseconds + +#### `libp2p.contentRouting.provide(key, timeout, callback)` + +- `key`: Buffer +- `timeout`: Number miliseconds + +#### `libp2p.dht.put(key, value, callback)` + +- `key`: Buffer +- `value`: Buffer + +#### `libp2p.dht.get(key, callback)` + +- `key`: Buffer + +#### `libp2p.dht.getMany(key, nVals, callback)` + +- `key`: Buffer +- `nVals`: Number + #### `libp2p.handle(protocol, handlerFunc [, matchFunc])` > Handle new protocol diff --git a/src/index.js b/src/index.js index f120645281..fddef7883d 100644 --- a/src/index.js +++ b/src/index.js @@ -1,15 +1,19 @@ 'use strict' +const EventEmitter = require('events').EventEmitter +const assert = require('assert') + +const setImmediate = require('async/setImmediate') +const each = require('async/each') +const series = require('async/series') + +const Ping = require('libp2p-ping') const Swarm = require('libp2p-swarm') const PeerId = require('peer-id') const PeerInfo = require('peer-info') -const mafmt = require('mafmt') const PeerBook = require('peer-book') +const mafmt = require('mafmt') const multiaddr = require('multiaddr') -const EventEmitter = require('events').EventEmitter -const assert = require('assert') -const Ping = require('libp2p-ping') -const setImmediate = require('async/setImmediate') exports = module.exports @@ -32,9 +36,7 @@ class Node extends EventEmitter { if (this.modules.connection.muxer) { let muxers = this.modules.connection.muxer muxers = Array.isArray(muxers) ? muxers : [muxers] - muxers.forEach((muxer) => { - this.swarm.connection.addStreamMuxer(muxer) - }) + muxers.forEach((muxer) => this.swarm.connection.addStreamMuxer(muxer)) // If muxer exists, we can use Identify this.swarm.connection.reuse() @@ -73,9 +75,49 @@ class Node extends EventEmitter { // Mount default protocols Ping.mount(this.swarm) - // Not fully implemented in js-libp2p yet - this.routing = undefined - this.records = undefined + // dht provided components (peerRouting, contentRouting, dht) + if (_modules.DHT) { + this._dht = new this.modules.DHT(this, 20, _options.DHT && _options.DHT.datastore) + } + + this.peerRouting = { + findPeer: (id, callback) => { + assert(this._dht, 'DHT is not available') + + this._dht.findPeer(id, callback) + } + } + + this.contentRouting = { + findProviders: (key, timeout, callback) => { + assert(this._dht, 'DHT is not available') + + this._dht.findProviders(key, timeout, callback) + }, + provide: (key, callback) => { + assert(this._dht, 'DHT is not available') + + this._dht.provide(key, callback) + } + } + + this.dht = { + put: (key, value, callback) => { + assert(this._dht, 'DHT is not available') + + this._dht.put(key, value, callback) + }, + get: (key, callback) => { + assert(this._dht, 'DHT is not available') + + this._dht.get(key, callback) + }, + getMany (key, nVals, callback) { + assert(this._dht, 'DHT is not available') + + this._dht.getMany(key, nVals, callback) + } + } } /* @@ -117,24 +159,30 @@ class Node extends EventEmitter { } }) - this.swarm.listen((err) => { - if (err) { - return callback(err) - } - if (ws) { - this.swarm.transport.add(ws.tag || ws.constructor.name, ws) + series([ + (cb) => this.swarm.listen(cb), + (cb) => { + // listeners on, libp2p is on + this.isOnline = true + + if (ws) { + // always add dialing on websockets + this.swarm.transport.add(ws.tag || ws.constructor.name, ws) + } + + // all transports need to be setup before discover starts + if (this.modules.discovery) { + return each(this.modules.discovery, (d, cb) => d.start(cb), cb) + } + cb() + }, + (cb) => { + if (this._dht) { + return this._dht.start(cb) + } + cb() } - - this.isOnline = true - - if (this.modules.discovery) { - this.modules.discovery.forEach((discovery) => { - setImmediate(() => discovery.start(() => {})) - }) - } - - callback() - }) + ], callback) } /* @@ -149,7 +197,15 @@ class Node extends EventEmitter { }) } - this.swarm.close(callback) + series([ + (cb) => { + if (this._dht) { + return this._dht.stop(cb) + } + cb() + }, + (cb) => this.swarm.close(cb) + ], callback) } isOn () { @@ -158,8 +214,13 @@ class Node extends EventEmitter { ping (peer, callback) { assert(this.isOn(), OFFLINE_ERROR_MESSAGE) - const peerInfo = this._getPeerInfo(peer) - callback(null, new Ping(this.swarm, peerInfo)) + this._getPeerInfo(peer, (err, peerInfo) => { + if (err) { + return callback(err) + } + + callback(null, new Ping(this.swarm, peerInfo)) + }) } dial (peer, protocol, callback) { @@ -170,27 +231,31 @@ class Node extends EventEmitter { protocol = undefined } - let peerInfo - try { - peerInfo = this._getPeerInfo(peer) - } catch (err) { - return callback(err) - } - - this.swarm.dial(peerInfo, protocol, (err, conn) => { + this._getPeerInfo(peer, (err, peerInfo) => { if (err) { return callback(err) } - this.peerBook.put(peerInfo) - callback(null, conn) + + this.swarm.dial(peerInfo, protocol, (err, conn) => { + if (err) { + return callback(err) + } + this.peerBook.put(peerInfo) + callback(null, conn) + }) }) } hangUp (peer, callback) { assert(this.isOn(), OFFLINE_ERROR_MESSAGE) - const peerInfo = this._getPeerInfo(peer) - this.swarm.hangUp(peerInfo, callback) + this._getPeerInfo(peer, (err, peerInfo) => { + if (err) { + return callback(err) + } + + this.swarm.hangUp(peerInfo, callback) + }) } handle (protocol, handlerFunc, matchFunc) { @@ -204,10 +269,12 @@ class Node extends EventEmitter { /* * Helper method to check the data type of peer and convert it to PeerInfo */ - _getPeerInfo (peer) { + _getPeerInfo (peer, callback) { let p + // PeerInfo if (PeerInfo.isPeerInfo(peer)) { p = peer + // Multiaddr instance (not string) } else if (multiaddr.isMultiaddr(peer)) { const peerIdB58Str = peer.getPeerId() try { @@ -216,19 +283,19 @@ class Node extends EventEmitter { p = new PeerInfo(PeerId.createFromB58String(peerIdB58Str)) } p.multiaddrs.add(peer) + // PeerId } else if (PeerId.isPeerId(peer)) { const peerIdB58Str = peer.toB58String() try { p = this.peerBook.get(peerIdB58Str) } catch (err) { - // TODO this is where PeerRouting comes into place - throw new Error('No knowledge about: ' + peerIdB58Str) + return this.peerRouting.findPeer(peer, callback) } } else { - throw new Error('peer type not recognized') + return setImmediate(() => callback(new Error('peer type not recognized'))) } - return p + setImmediate(() => callback(null, p)) } }