From 6c52248d6bc2649c00e095e36a2772d19f9365e9 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 22 Nov 2018 14:43:05 +0000 Subject: [PATCH] feat: support all type of records (#12) --- package.json | 8 ++++---- src/index.js | 22 +++++----------------- test/index.spec.js | 42 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/package.json b/package.json index fe3d2ed..13254d5 100644 --- a/package.json +++ b/package.json @@ -34,10 +34,9 @@ "dependencies": { "assert": "^1.4.1", "base32.js": "~0.1.0", - "debug": "^3.1.0", + "debug": "^4.1.0", "err-code": "^1.1.2", - "interface-datastore": "~0.4.2", - "libp2p-record": "~0.6.0" + "interface-datastore": "~0.6.0" }, "devDependencies": { "aegir": "^17.1.0", @@ -46,7 +45,8 @@ "dirty-chai": "^2.0.1", "ipfs": "~0.33.1", "ipfsd-ctl": "~0.40.0", - "sinon": "^7.0.0" + "libp2p-record": "~0.6.1", + "sinon": "^7.1.1" }, "contributors": [ "Vasco Santos ", diff --git a/src/index.js b/src/index.js index 34494ba..440aa9a 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,5 @@ 'use strict' -const { Record } = require('libp2p-record') const { Key } = require('interface-datastore') const { encodeBase32 } = require('./utils') @@ -21,7 +20,7 @@ class DatastorePubsub { * @param {Object} validator - validator functions. * @param {function(record, peerId, callback)} validator.validate - function to validate a record. * @param {function(received, current, callback)} validator.select - function to select the newest between two records. - * @param {function(key, callback)} subscriptionKeyFn - function to manipulate the key topic received before processing it. + * @param {function(key, callback)} subscriptionKeyFn - optional function to manipulate the key topic received before processing it. * @memberof DatastorePubsub */ constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) { @@ -208,17 +207,8 @@ class DatastorePubsub { // Verify if the record received through pubsub is valid and better than the one currently stored _isBetter (key, val, callback) { - let receivedRecord - - try { - receivedRecord = Record.deserialize(val) - } catch (err) { - log.error(err) - return callback(err) - } - // validate received record - this._validateRecord(receivedRecord.value, key, (err, valid) => { + this._validateRecord(val, key, (err, valid) => { // If not valid, it is not better than the one currently available if (err || !valid) { const errMsg = 'record received through pubsub is not valid' @@ -230,21 +220,19 @@ class DatastorePubsub { // Get Local record const dsKey = new Key(key) - this._getLocal(dsKey.toBuffer(), (err, res) => { + this._getLocal(dsKey.toBuffer(), (err, currentRecord) => { // if the old one is invalid, the new one is *always* better if (err) { return callback(null, true) } // if the same record, do not need to store - if (res.equals(val)) { + if (currentRecord.equals(val)) { return callback(null, false) } - const currentRecord = Record.deserialize(res) - // verify if the received record should replace the current one - this._selectRecord(receivedRecord.value, currentRecord.value, callback) + this._selectRecord(val, currentRecord, callback) }) }) } diff --git a/test/index.spec.js b/test/index.spec.js index 1b821ac..f44fd25 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -155,6 +155,48 @@ describe('datastore-pubsub', function () { }) }) + it('should validate if record content is the same', function (done) { + const customValidator = { + validate: (data, peerId, callback) => { + const receivedRecord = Record.deserialize(data) + + expect(receivedRecord.value.toString()).to.equal(value) // validator should deserialize correctly + callback(null, receivedRecord.value.toString() === value) + }, + select: (receivedRecod, currentRecord, callback) => { + callback(null, 0) + } + } + const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) + const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) + const topic = `/${keyRef}` + let receivedMessage = false + + function messageHandler () { + receivedMessage = true + } + + dsPubsubB.get(key, (err, res) => { + expect(err).to.exist() + expect(res).to.not.exist() // no value available, but subscribed now + + series([ + (cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb), + // subscribe in order to understand when the message arrive to the node + (cb) => pubsubB.subscribe(topic, messageHandler, cb), + (cb) => dsPubsubA.put(key, serializedRecord, cb), + // wait until message arrives + (cb) => waitFor(() => receivedMessage === true, cb), + // get from datastore + (cb) => dsPubsubB.get(key, cb) + ], (err, res) => { + expect(err).to.not.exist() + expect(res[4]).to.exist() + done() + }) + }) + }) + it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)