From 510bd2c633bd169939627e520ac2b755f62f1455 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 29 Nov 2018 11:24:01 +0000 Subject: [PATCH] feat: encode record-store keys in pubsub (#9) --- package.json | 4 +-- src/index.js | 34 +++++++++++++++----------- src/utils.js | 29 +++++++++++++++++++--- test/index.spec.js | 61 ++++++++++++++++++++++++---------------------- 4 files changed, 80 insertions(+), 48 deletions(-) diff --git a/package.json b/package.json index d14457e..81dd21a 100644 --- a/package.json +++ b/package.json @@ -33,10 +33,10 @@ "homepage": "https://github.com/ipfs/js-datastore-pubsub#readme", "dependencies": { "assert": "^1.4.1", - "base32.js": "~0.1.0", "debug": "^4.1.0", "err-code": "^1.1.2", - "interface-datastore": "~0.6.0" + "interface-datastore": "~0.6.0", + "multibase": "~0.6.0" }, "devDependencies": { "aegir": "^17.1.0", diff --git a/src/index.js b/src/index.js index 440aa9a..6ab2bd6 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,7 @@ 'use strict' const { Key } = require('interface-datastore') -const { encodeBase32 } = require('./utils') +const { encodeBase32, keyToTopic, topicToKey } = require('./utils') const errcode = require('err-code') const assert = require('assert') @@ -24,10 +24,10 @@ class DatastorePubsub { * @memberof DatastorePubsub */ constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) { - assert.equal(typeof validator, 'object', 'missing validator') - assert.equal(typeof validator.validate, 'function', 'missing validate function') - assert.equal(typeof validator.select, 'function', 'missing select function') - subscriptionKeyFn && assert.equal(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received') + assert.strictEqual(typeof validator, 'object', 'missing validator') + assert.strictEqual(typeof validator.validate, 'function', 'missing validate function') + assert.strictEqual(typeof validator.select, 'function', 'missing select function') + subscriptionKeyFn && assert.strictEqual(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received') this._pubsub = pubsub this._datastore = datastore @@ -35,8 +35,8 @@ class DatastorePubsub { this._validator = validator this._handleSubscriptionKeyFn = subscriptionKeyFn - // Bind _handleSubscription function, which is called by pubsub. - this._handleSubscription = this._handleSubscription.bind(this) + // Bind _onMessage function, which is called by pubsub. + this._onMessage = this._onMessage.bind(this) } /** @@ -61,7 +61,7 @@ class DatastorePubsub { return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED')) } - const stringifiedTopic = key.toString() + const stringifiedTopic = keyToTopic(key) log(`publish value for topic ${stringifiedTopic}`) @@ -83,7 +83,7 @@ class DatastorePubsub { return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY')) } - const stringifiedTopic = key.toString() + const stringifiedTopic = keyToTopic(key) this._pubsub.ls((err, res) => { if (err) { @@ -96,7 +96,7 @@ class DatastorePubsub { } // Subscribe - this._pubsub.subscribe(stringifiedTopic, this._handleSubscription, (err) => { + this._pubsub.subscribe(stringifiedTopic, this._onMessage, (err) => { if (err) { const errMsg = `cannot subscribe topic ${stringifiedTopic}` @@ -116,9 +116,9 @@ class DatastorePubsub { * @returns {void} */ unsubscribe (key) { - const stringifiedTopic = key.toString() + const stringifiedTopic = keyToTopic(key) - this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription) + this._pubsub.unsubscribe(stringifiedTopic, this._onMessage) } // Get record from local datastore @@ -152,9 +152,15 @@ class DatastorePubsub { } // handles pubsub subscription messages - _handleSubscription (msg) { + _onMessage (msg) { const { data, from, topicIDs } = msg - const key = topicIDs[0] + let key + try { + key = topicToKey(topicIDs[0]) + } catch (err) { + log.error(err) + return + } log(`message received for ${key} topic`) diff --git a/src/utils.js b/src/utils.js index 8c0cd27..4e7ea76 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,8 +1,31 @@ 'use strict' -const base32 = require('base32.js') +const multibase = require('multibase') +const errcode = require('err-code') + +const namespace = '/record/' +const base64urlCode = 'u' // base64url code from multibase module.exports.encodeBase32 = (buf) => { - const enc = new base32.Encoder() - return enc.write(buf).finalize() + return multibase.encode('base32', buf).slice(1) // slice off multibase codec +} + +// converts a binary record key to a pubsub topic key. +module.exports.keyToTopic = (key) => { + // Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs + // Encodes to "/record/base64url(key)" + const b64url = multibase.encode('base64url', key).slice(1).toString() + + return `${namespace}${b64url}` +} + +// converts a pubsub topic key to a binary record key. +module.exports.topicToKey = (topic) => { + if (topic.substring(0, namespace.length) !== namespace) { + throw errcode(new Error('topic received is not from a record'), 'ERR_TOPIC_IS_NOT_FROM_RECORD_NAMESPACE') + } + + const key = `${base64urlCode}${topic.substring(namespace.length)}` + + return multibase.decode(key).toString() } diff --git a/test/index.spec.js b/test/index.spec.js index f44fd25..b76434a 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -15,6 +15,7 @@ const { Key } = require('interface-datastore') const { Record } = require('libp2p-record') const DatastorePubsub = require('../src') +const { keyToTopic } = require('../src/utils') const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils') // Always returning the expected values @@ -115,11 +116,12 @@ describe('datastore-pubsub', function () { it('should subscribe the topic, but receive error as no entry is stored locally', function (done) { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) + const subsTopic = keyToTopic(`/${keyRef}`) pubsubA.ls((err, res) => { expect(err).to.not.exist() expect(res).to.exist() - expect(res).to.not.include(`/${keyRef}`) // not subscribed key reference yet + expect(res).to.not.include(subsTopic) // not subscribed key reference yet dsPubsubA.get(key, (err) => { expect(err).to.exist() // not locally stored record @@ -128,7 +130,7 @@ describe('datastore-pubsub', function () { pubsubA.ls((err, res) => { expect(err).to.not.exist() expect(res).to.exist() - expect(res).to.include(`/${keyRef}`) // subscribed key reference + expect(res).to.include(subsTopic) // subscribed key reference done() }) }) @@ -138,11 +140,12 @@ describe('datastore-pubsub', function () { it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator) + const subsTopic = keyToTopic(`/${keyRef}`) pubsubB.ls((err, res) => { expect(err).to.not.exist() expect(res).to.exist() - expect(res).to.not.include(`/${keyRef}`) // not subscribed + expect(res).to.not.include(subsTopic) // not subscribed dsPubsubA.put(key, serializedRecord, (err) => { expect(err).to.not.exist() @@ -169,7 +172,7 @@ describe('datastore-pubsub', function () { } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) - const topic = `/${keyRef}` + const subsTopic = keyToTopic(`/${keyRef}`) let receivedMessage = false function messageHandler () { @@ -181,9 +184,9 @@ describe('datastore-pubsub', function () { expect(res).to.not.exist() // no value available, but subscribed now series([ - (cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb), + (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(topic, messageHandler, cb), + (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), (cb) => dsPubsubA.put(key, serializedRecord, cb), // wait until message arrives (cb) => waitFor(() => receivedMessage === true, cb), @@ -200,7 +203,7 @@ describe('datastore-pubsub', function () { it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator) - const topic = `/${keyRef}` + const subsTopic = keyToTopic(`/${keyRef}`) let receivedMessage = false function messageHandler () { @@ -210,16 +213,16 @@ describe('datastore-pubsub', function () { pubsubB.ls((err, res) => { expect(err).to.not.exist() expect(res).to.exist() - expect(res).to.not.include(topic) // not subscribed + expect(res).to.not.include(subsTopic) // not subscribed dsPubsubB.get(key, (err, res) => { expect(err).to.exist() expect(res).to.not.exist() // not value available, but subscribed now series([ - (cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb), + (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(topic, messageHandler, cb), + (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), (cb) => dsPubsubA.put(key, serializedRecord, cb), // wait until message arrives (cb) => waitFor(() => receivedMessage === true, cb), @@ -300,7 +303,7 @@ describe('datastore-pubsub', function () { } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) - const topic = `/${keyRef}` + const subsTopic = keyToTopic(`/${keyRef}`) let receivedMessage = false function messageHandler () { @@ -312,9 +315,9 @@ describe('datastore-pubsub', function () { expect(res).to.not.exist() // not value available, but subscribed now series([ - (cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb), + (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(topic, messageHandler, cb), + (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), (cb) => dsPubsubA.put(key, serializedRecord, cb), // wait until message arrives (cb) => waitFor(() => receivedMessage === true, cb), @@ -345,7 +348,7 @@ describe('datastore-pubsub', function () { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) - const topic = `/${keyRef}` + const subsTopic = keyToTopic(`/${keyRef}`) let receivedMessage = false function messageHandler () { @@ -357,9 +360,9 @@ describe('datastore-pubsub', function () { expect(res).to.not.exist() // not value available, but subscribed now series([ - (cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb), + (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(topic, messageHandler, cb), + (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), (cb) => dsPubsubA.put(key, serializedRecord, cb), // wait until message arrives (cb) => waitFor(() => receivedMessage === true, cb), @@ -396,7 +399,7 @@ describe('datastore-pubsub', function () { const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) - const topic = `/${keyRef}` + const subsTopic = keyToTopic(`/${keyRef}`) let receivedMessage = false function messageHandler () { @@ -408,9 +411,9 @@ describe('datastore-pubsub', function () { expect(res).to.not.exist() // not value available, but it is subscribed now series([ - (cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb), + (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(topic, messageHandler, cb), + (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), (cb) => dsPubsubA.put(key, serializedRecord, cb), // wait until message arrives (cb) => waitFor(() => receivedMessage === true, cb), @@ -431,14 +434,14 @@ describe('datastore-pubsub', function () { }) }) - it('should subscribe the topic and after a message being received, discarde it using the subscriptionKeyFn', function (done) { + it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) { const subscriptionKeyFn = (topic, callback) => { - expect(topic).to.equal(key.toString()) + expect(topic).to.equal(`/${keyRef}`) callback(new Error('DISCARD MESSAGE')) } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn) - const topic = `/${keyRef}` + const subsTopic = keyToTopic(`/${keyRef}`) let receivedMessage = false function messageHandler () { @@ -448,16 +451,16 @@ describe('datastore-pubsub', function () { pubsubB.ls((err, res) => { expect(err).to.not.exist() expect(res).to.exist() - expect(res).to.not.include(topic) // not subscribed + expect(res).to.not.include(subsTopic) // not subscribed dsPubsubB.get(key, (err, res) => { expect(err).to.exist() expect(res).to.not.exist() // not value available, but subscribed now series([ - (cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb), + (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(topic, messageHandler, cb), + (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), (cb) => dsPubsubA.put(key, serializedRecord, cb), // wait until message arrives (cb) => waitFor(() => receivedMessage === true, cb), @@ -478,7 +481,7 @@ describe('datastore-pubsub', function () { } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn) - const topic = `/${keyRef}` + const subsTopic = keyToTopic(`/${keyRef}`) const keyNew = Buffer.from(`${key.toString()}new`) let receivedMessage = false @@ -489,16 +492,16 @@ describe('datastore-pubsub', function () { pubsubB.ls((err, res) => { expect(err).to.not.exist() expect(res).to.exist() - expect(res).to.not.include(topic) // not subscribed + expect(res).to.not.include(subsTopic) // not subscribed dsPubsubB.get(key, (err, res) => { expect(err).to.exist() expect(res).to.not.exist() // not value available, but subscribed now series([ - (cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb), + (cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb), // subscribe in order to understand when the message arrive to the node - (cb) => pubsubB.subscribe(topic, messageHandler, cb), + (cb) => pubsubB.subscribe(subsTopic, messageHandler, cb), (cb) => dsPubsubA.put(key, serializedRecord, cb), // wait until message arrives (cb) => waitFor(() => receivedMessage === true, cb),