Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
Merge pull request #822 from ipfs/feat/dht
Browse files Browse the repository at this point in the history
Awesome DHT PART I
  • Loading branch information
daviddias committed May 19, 2017
2 parents 92f8ef9 + 860165c commit 2c6c838
Show file tree
Hide file tree
Showing 17 changed files with 428 additions and 96 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ const node = new IPFS({
EXPERIMENTAL: { // enable experimental features
pubsub: true,
sharding: true, // enable dir sharding
wrtcLinuxWindows: true // use unstable wrtc module on Linux or Windows with Node.js
wrtcLinuxWindows: true // use unstable wrtc module on Linux or Windows with Node.js,
dht: true // enable KadDHT, currently not interopable with go-ipfs
},
config: { // overload the default config
Addresses: {
Expand Down
40 changes: 20 additions & 20 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
"test:unit:node:http": "TEST=http npm run test:unit:node",
"test:unit:node:cli": "TEST=cli npm run test:unit:node",
"test:unit:browser": "gulp test:browser --dom",
"test:interop": "mocha -t 60000 test/interop",
"test:interop": "npm run test:interop:node",
"test:interop:node": "mocha -t 60000 test/interop/node.js",
"test:interop:browser": "mocha test/interop/browser.js",
"test:interop:browser": "mocha -t 60000 test/interop/browser.js",
"test:benchmark": "echo \"Error: no benchmarks yet\" && exit 1",
"test:benchmark:node": "echo \"Error: no benchmarks yet\" && exit 1",
"test:benchmark:node:core": "echo \"Error: no benchmarks yet\" && exit 1",
Expand Down Expand Up @@ -63,23 +63,23 @@
},
"homepage": "https://github.com/ipfs/js-ipfs#readme",
"devDependencies": {
"aegir": "^11.0.1",
"aegir": "^11.0.2",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"delay": "^2.0.0",
"detect-node": "^2.0.3",
"dir-compare": "^1.3.0",
"dir-compare": "^1.4.0",
"dirty-chai": "^1.2.2",
"eslint-plugin-react": "^6.10.3",
"eslint-plugin-react": "^7.0.1",
"execa": "^0.6.3",
"expose-loader": "^0.7.3",
"form-data": "^2.1.2",
"form-data": "^2.1.4",
"gulp": "^3.9.1",
"interface-ipfs-core": "~0.27.0",
"interface-ipfs-core": "~0.27.2",
"ipfsd-ctl": "~0.20.0",
"left-pad": "^1.1.3",
"lodash": "^4.17.4",
"mocha": "^3.2.0",
"mocha": "^3.4.1",
"ncp": "^2.0.0",
"nexpect": "^0.5.0",
"pre-commit": "^1.2.2",
Expand All @@ -91,17 +91,17 @@
"transform-loader": "^0.2.4"
},
"dependencies": {
"async": "^2.3.0",
"bl": "^1.2.0",
"async": "^2.4.0",
"bl": "^1.2.1",
"boom": "^4.3.1",
"cids": "^0.5.0",
"debug": "^2.6.3",
"debug": "^2.6.8",
"fsm-event": "^2.1.0",
"glob": "^7.1.1",
"hapi": "^16.1.1",
"hapi-set-header": "^1.0.2",
"hoek": "^4.1.1",
"ipfs-api": "^14.0.0",
"ipfs-api": "^14.0.1",
"ipfs-bitswap": "~0.13.0",
"ipfs-block": "~0.6.0",
"ipfs-block-service": "~0.9.0",
Expand All @@ -111,10 +111,10 @@
"ipfs-unixfs-engine": "~0.19.1",
"ipld-resolver": "~0.11.0",
"isstream": "^0.1.2",
"joi": "^10.4.1",
"joi": "^10.5.0",
"libp2p-floodsub": "~0.9.4",
"libp2p-ipfs-browser": "~0.23.0",
"libp2p-ipfs-nodejs": "~0.23.0",
"libp2p-ipfs-browser": "~0.24.1",
"libp2p-ipfs-nodejs": "~0.25.2",
"lodash.flatmap": "^4.5.0",
"lodash.get": "^4.4.2",
"lodash.has": "^4.5.2",
Expand All @@ -132,21 +132,21 @@
"peer-info": "~0.9.2",
"promisify-es6": "^1.0.2",
"pull-file": "^1.0.0",
"pull-paramap": "^1.2.1",
"pull-pushable": "^2.0.1",
"pull-paramap": "^1.2.2",
"pull-pushable": "^2.1.1",
"pull-sort": "^1.0.0",
"pull-stream": "^3.5.0",
"pull-stream": "^3.6.0",
"pull-stream-to-stream": "^1.3.4",
"pull-zip": "^2.0.1",
"read-pkg-up": "^2.0.0",
"readable-stream": "1.1.14",
"safe-buffer": "^5.0.1",
"stream-to-pull-stream": "^1.7.2",
"tar-stream": "^1.5.2",
"tar-stream": "^1.5.4",
"temp": "^0.8.3",
"through2": "^2.0.3",
"update-notifier": "^2.1.0",
"yargs": "7.0.2"
"yargs": "8.0.1"
},
"contributors": [
"Andrew de Andrade <[email protected]>",
Expand Down
163 changes: 163 additions & 0 deletions src/core/components/dht.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
'use strict'

const promisify = require('promisify-es6')
const every = require('async/every')
const PeerId = require('peer-id')
const CID = require('cids')
const each = require('async/each')
// const bsplit = require('buffer-split')

module.exports = (self) => {
return {
/**
* Given a key, query the DHT for its best value.
*
* @param {Buffer} key
* @param {function(Error)} [callback]
* @returns {Promise|void}
*/
get: promisify((key, options, callback) => {
if (!Buffer.isBuffer(key)) {
return callback(new Error('Not valid key'))
}

if (typeof options === 'function') {
callback = options
options = {}
}

self._libp2pNode.dht.get(key, options.timeout, callback)
}),

/**
* Write a key/value pair to the DHT.
*
* Given a key of the form /foo/bar and a value of any
* form, this will write that value to the DHT with
* that key.
*
* @param {Buffer} key
* @param {Buffer} value
* @param {function(Error)} [callback]
* @returns {Promise|void}
*/
put: promisify((key, value, callback) => {
if (!Buffer.isBuffer(key)) {
return callback(new Error('Not valid key'))
}

self._libp2pNode.dht.put(key, value, callback)
}),

/**
* Find peers in the DHT that can provide a specific value, given a key.
*
* @param {CID} key - They key to find providers for.
* @param {function(Error, Array<PeerInfo>)} [callback]
* @returns {Promise<PeerInfo>|void}
*/
findprovs: promisify((key, callback) => {
if (typeof key === 'string') {
key = new CID(key)
}

self._libp2pNode.contentRouting.findProviders(key, callback)
}),

/**
* Query the DHT for all multiaddresses associated with a `PeerId`.
*
* @param {PeerId} peer - The id of the peer to search for.
* @param {function(Error, Array<Multiaddr>)} [callback]
* @returns {Promise<Array<Multiaddr>>|void}
*/
findpeer: promisify((peer, callback) => {
if (typeof peer === 'string') {
peer = PeerId.createFromB58String(peer)
}

self._libp2pNode.peerRouting.findPeer(peer, (err, info) => {
if (err) {
return callback(err)
}

// convert to go-ipfs return value, we need to revisit
// this. For now will just conform.
const goResult = [
{
Responses: [{
ID: info.id.toB58String(),
Addresses: info.multiaddrs.toArray().map((a) => a.toString())
}]
}
]

callback(null, goResult)
})
}),

/**
* Announce to the network that we are providing given values.
*
* @param {CID|Array<CID>} keys - The keys that should be announced.
* @param {Object} [options={}]
* @param {bool} [options.recursive=false] - Provide not only the given object but also all objects linked from it.
* @param {function(Error)} [callback]
* @returns {Promise|void}
*/
provide: promisify((keys, options, callback) => {
if (!Array.isArray(keys)) {
keys = [keys]
}
if (typeof options === 'function') {
callback = options
options = {}
}

// ensure blocks are actually local
every(keys, (key, cb) => {
self._repo.blockstore.has(key, cb)
}, (err, has) => {
if (err) {
return callback(err)
}
/* TODO reconsider this. go-ipfs provides anyway
if (!has) {
return callback(new Error('Not all blocks exist locally, can not provide'))
}
*/

if (options.recursive) {
// TODO: Implement recursive providing
} else {
each(keys, (cid, cb) => {
self._libp2pNode.contentRouting.provide(cid, cb)
}, callback)
}
})
}),

/**
* Find the closest peers to a given `PeerId`, by querying the DHT.
*
* @param {PeerId} peer - The `PeerId` to run the query agains.
* @param {function(Error, Array<PeerId>)} [callback]
* @returns {Promise<Array<PeerId>>|void}
*/
query: promisify((peerId, callback) => {
if (typeof peerId === 'string') {
peerId = PeerId.createFromB58String(peerId)
}

// TODO expose this method in peerRouting
self._libp2pNode._dht.getClosestPeers(peerId.toBytes(), (err, peerIds) => {
if (err) {
return callback(err)
}
callback(null, peerIds.map((id) => {
return { ID: id.toB58String() }
}))
})
})
}
}
1 change: 1 addition & 0 deletions src/core/components/id.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module.exports = function id (self) {
addresses: self._peerInfo.multiaddrs
.toArray()
.map((ma) => ma.toString())
.filter((ma) => ma.indexOf('ipfs') >= 0)
.sort(),
agentVersion: 'js-ipfs',
protocolVersion: '9000'
Expand Down
1 change: 1 addition & 0 deletions src/core/components/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ exports.ping = require('./ping')
exports.files = require('./files')
exports.bitswap = require('./bitswap')
exports.pubsub = require('./pubsub')
exports.dht = require('./dht')
24 changes: 13 additions & 11 deletions src/core/components/libp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,23 @@ module.exports = function libp2p (self) {
const options = {
mdns: get(config, 'Discovery.MDNS.Enabled'),
webRTCStar: get(config, 'Discovery.webRTCStar.Enabled'),
bootstrap: get(config, 'Bootstrap')
bootstrap: get(config, 'Bootstrap'),
dht: self._options.EXPERIMENTAL.dht
}

self._libp2pNode = new Node(self._peerInfo, self._peerInfoBook, options)

self._libp2pNode.on('peer:discovery', (peerInfo) => {
if (self.isOnline()) {
self._peerInfoBook.put(peerInfo)
self._libp2pNode.dial(peerInfo, () => {})
}
})

self._libp2pNode.on('peer:connect', (peerInfo) => {
self._peerInfoBook.put(peerInfo)
})

self._libp2pNode.start((err) => {
if (err) {
return callback(err)
Expand All @@ -31,16 +43,6 @@ module.exports = function libp2p (self) {
console.log('Swarm listening on', ma.toString())
})

self._libp2pNode.on('peer:discovery', (peerInfo) => {
if (self.isOnline()) {
self._peerInfoBook.put(peerInfo)
self._libp2pNode.dial(peerInfo, () => {})
}
})
self._libp2pNode.on('peer:connect', (peerInfo) => {
self._peerInfoBook.put(peerInfo)
})

callback()
})
}
Expand Down
5 changes: 5 additions & 0 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,18 @@ class IPFS extends EventEmitter {
this.bitswap = components.bitswap(this)
this.ping = components.ping(this)
this.pubsub = components.pubsub(this)
this.dht = components.dht(this)

if (this._options.EXPERIMENTAL.pubsub) {
this.log('EXPERIMENTAL pubsub is enabled')
}
if (this._options.EXPERIMENTAL.sharding) {
this.log('EXPERIMENTAL sharding is enabled')
}
if (this._options.EXPERIMENTAL.dht) {
this.log('EXPERIMENTAL Kademlia DHT is enabled')
}

this.state = require('./state')(this)

boot(this)
Expand Down
22 changes: 22 additions & 0 deletions test/core/interface/dht.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* eslint-env mocha */

'use strict'

/*
const test = require('interface-ipfs-core')
const IPFSFactory = require('../../utils/ipfs-factory-instance')
let factory
const common = {
setup: function (callback) {
factory = new IPFSFactory()
callback(null, factory)
},
teardown: function (callback) {
factory.dismantle(callback)
}
}
test.dht(common)
*/
1 change: 1 addition & 0 deletions test/core/interface/interface.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ describe('interface-ipfs-core tests', () => {
if (isNode) {
require('./swarm')
require('./pubsub')
require('./dht')
}
})
Loading

0 comments on commit 2c6c838

Please sign in to comment.