Skip to content

Commit

Permalink
feat: support all type of records (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored Nov 22, 2018
1 parent bf1787c commit 6c52248
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 21 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
"dependencies": {
"assert": "^1.4.1",
"base32.js": "~0.1.0",
"debug": "^3.1.0",
"debug": "^4.1.0",
"err-code": "^1.1.2",
"interface-datastore": "~0.4.2",
"libp2p-record": "~0.6.0"
"interface-datastore": "~0.6.0"
},
"devDependencies": {
"aegir": "^17.1.0",
Expand All @@ -46,7 +45,8 @@
"dirty-chai": "^2.0.1",
"ipfs": "~0.33.1",
"ipfsd-ctl": "~0.40.0",
"sinon": "^7.0.0"
"libp2p-record": "~0.6.1",
"sinon": "^7.1.1"
},
"contributors": [
"Vasco Santos <[email protected]>",
Expand Down
22 changes: 5 additions & 17 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict'

const { Record } = require('libp2p-record')
const { Key } = require('interface-datastore')
const { encodeBase32 } = require('./utils')

Expand All @@ -21,7 +20,7 @@ class DatastorePubsub {
* @param {Object} validator - validator functions.
* @param {function(record, peerId, callback)} validator.validate - function to validate a record.
* @param {function(received, current, callback)} validator.select - function to select the newest between two records.
* @param {function(key, callback)} subscriptionKeyFn - function to manipulate the key topic received before processing it.
* @param {function(key, callback)} subscriptionKeyFn - optional function to manipulate the key topic received before processing it.
* @memberof DatastorePubsub
*/
constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) {
Expand Down Expand Up @@ -208,17 +207,8 @@ class DatastorePubsub {

// Verify if the record received through pubsub is valid and better than the one currently stored
_isBetter (key, val, callback) {
let receivedRecord

try {
receivedRecord = Record.deserialize(val)
} catch (err) {
log.error(err)
return callback(err)
}

// validate received record
this._validateRecord(receivedRecord.value, key, (err, valid) => {
this._validateRecord(val, key, (err, valid) => {
// If not valid, it is not better than the one currently available
if (err || !valid) {
const errMsg = 'record received through pubsub is not valid'
Expand All @@ -230,21 +220,19 @@ class DatastorePubsub {
// Get Local record
const dsKey = new Key(key)

this._getLocal(dsKey.toBuffer(), (err, res) => {
this._getLocal(dsKey.toBuffer(), (err, currentRecord) => {
// if the old one is invalid, the new one is *always* better
if (err) {
return callback(null, true)
}

// if the same record, do not need to store
if (res.equals(val)) {
if (currentRecord.equals(val)) {
return callback(null, false)
}

const currentRecord = Record.deserialize(res)

// verify if the received record should replace the current one
this._selectRecord(receivedRecord.value, currentRecord.value, callback)
this._selectRecord(val, currentRecord, callback)
})
})
}
Expand Down
42 changes: 42 additions & 0 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,48 @@ describe('datastore-pubsub', function () {
})
})

it('should validate if record content is the same', function (done) {
const customValidator = {
validate: (data, peerId, callback) => {
const receivedRecord = Record.deserialize(data)

expect(receivedRecord.value.toString()).to.equal(value) // validator should deserialize correctly
callback(null, receivedRecord.value.toString() === value)
},
select: (receivedRecod, currentRecord, callback) => {
callback(null, 0)
}
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
let receivedMessage = false

function messageHandler () {
receivedMessage = true
}

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // no value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
// get from datastore
(cb) => dsPubsubB.get(key, cb)
], (err, res) => {
expect(err).to.not.exist()
expect(res[4]).to.exist()
done()
})
})
})

it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
Expand Down

0 comments on commit 6c52248

Please sign in to comment.