Skip to content

Commit

Permalink
feat: encode record-store keys in pubsub (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored Nov 29, 2018
1 parent 068ec27 commit 510bd2c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 48 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
"homepage": "https://github.com/ipfs/js-datastore-pubsub#readme",
"dependencies": {
"assert": "^1.4.1",
"base32.js": "~0.1.0",
"debug": "^4.1.0",
"err-code": "^1.1.2",
"interface-datastore": "~0.6.0"
"interface-datastore": "~0.6.0",
"multibase": "~0.6.0"
},
"devDependencies": {
"aegir": "^17.1.0",
Expand Down
34 changes: 20 additions & 14 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { Key } = require('interface-datastore')
const { encodeBase32 } = require('./utils')
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')

const errcode = require('err-code')
const assert = require('assert')
Expand All @@ -24,19 +24,19 @@ class DatastorePubsub {
* @memberof DatastorePubsub
*/
constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) {
assert.equal(typeof validator, 'object', 'missing validator')
assert.equal(typeof validator.validate, 'function', 'missing validate function')
assert.equal(typeof validator.select, 'function', 'missing select function')
subscriptionKeyFn && assert.equal(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received')
assert.strictEqual(typeof validator, 'object', 'missing validator')
assert.strictEqual(typeof validator.validate, 'function', 'missing validate function')
assert.strictEqual(typeof validator.select, 'function', 'missing select function')
subscriptionKeyFn && assert.strictEqual(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received')

this._pubsub = pubsub
this._datastore = datastore
this._peerId = peerId
this._validator = validator
this._handleSubscriptionKeyFn = subscriptionKeyFn

// Bind _handleSubscription function, which is called by pubsub.
this._handleSubscription = this._handleSubscription.bind(this)
// Bind _onMessage function, which is called by pubsub.
this._onMessage = this._onMessage.bind(this)
}

/**
Expand All @@ -61,7 +61,7 @@ class DatastorePubsub {
return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED'))
}

const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

log(`publish value for topic ${stringifiedTopic}`)

Expand All @@ -83,7 +83,7 @@ class DatastorePubsub {
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
}

const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

this._pubsub.ls((err, res) => {
if (err) {
Expand All @@ -96,7 +96,7 @@ class DatastorePubsub {
}

// Subscribe
this._pubsub.subscribe(stringifiedTopic, this._handleSubscription, (err) => {
this._pubsub.subscribe(stringifiedTopic, this._onMessage, (err) => {
if (err) {
const errMsg = `cannot subscribe topic ${stringifiedTopic}`

Expand All @@ -116,9 +116,9 @@ class DatastorePubsub {
* @returns {void}
*/
unsubscribe (key) {
const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription)
this._pubsub.unsubscribe(stringifiedTopic, this._onMessage)
}

// Get record from local datastore
Expand Down Expand Up @@ -152,9 +152,15 @@ class DatastorePubsub {
}

// handles pubsub subscription messages
_handleSubscription (msg) {
_onMessage (msg) {
const { data, from, topicIDs } = msg
const key = topicIDs[0]
let key
try {
key = topicToKey(topicIDs[0])
} catch (err) {
log.error(err)
return
}

log(`message received for ${key} topic`)

Expand Down
29 changes: 26 additions & 3 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
'use strict'

const base32 = require('base32.js')
const multibase = require('multibase')
const errcode = require('err-code')

const namespace = '/record/'
const base64urlCode = 'u' // base64url code from multibase

module.exports.encodeBase32 = (buf) => {
const enc = new base32.Encoder()
return enc.write(buf).finalize()
return multibase.encode('base32', buf).slice(1) // slice off multibase codec
}

// converts a binary record key to a pubsub topic key.
module.exports.keyToTopic = (key) => {
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
// Encodes to "/record/base64url(key)"
const b64url = multibase.encode('base64url', key).slice(1).toString()

return `${namespace}${b64url}`
}

// converts a pubsub topic key to a binary record key.
module.exports.topicToKey = (topic) => {
if (topic.substring(0, namespace.length) !== namespace) {
throw errcode(new Error('topic received is not from a record'), 'ERR_TOPIC_IS_NOT_FROM_RECORD_NAMESPACE')
}

const key = `${base64urlCode}${topic.substring(namespace.length)}`

return multibase.decode(key).toString()
}
61 changes: 32 additions & 29 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { Key } = require('interface-datastore')
const { Record } = require('libp2p-record')

const DatastorePubsub = require('../src')
const { keyToTopic } = require('../src/utils')
const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils')

// Always returning the expected values
Expand Down Expand Up @@ -115,11 +116,12 @@ describe('datastore-pubsub', function () {

it('should subscribe the topic, but receive error as no entry is stored locally', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const subsTopic = keyToTopic(`/${keyRef}`)

pubsubA.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(`/${keyRef}`) // not subscribed key reference yet
expect(res).to.not.include(subsTopic) // not subscribed key reference yet

dsPubsubA.get(key, (err) => {
expect(err).to.exist() // not locally stored record
Expand All @@ -128,7 +130,7 @@ describe('datastore-pubsub', function () {
pubsubA.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.include(`/${keyRef}`) // subscribed key reference
expect(res).to.include(subsTopic) // subscribed key reference
done()
})
})
Expand All @@ -138,11 +140,12 @@ describe('datastore-pubsub', function () {
it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
const subsTopic = keyToTopic(`/${keyRef}`)

pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(`/${keyRef}`) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubA.put(key, serializedRecord, (err) => {
expect(err).to.not.exist()
Expand All @@ -169,7 +172,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -181,9 +184,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // no value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -200,7 +203,7 @@ describe('datastore-pubsub', function () {
it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -210,16 +213,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -300,7 +303,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -312,9 +315,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -345,7 +348,7 @@ describe('datastore-pubsub', function () {

const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -357,9 +360,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -396,7 +399,7 @@ describe('datastore-pubsub', function () {

const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -408,9 +411,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but it is subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -431,14 +434,14 @@ describe('datastore-pubsub', function () {
})
})

it('should subscribe the topic and after a message being received, discarde it using the subscriptionKeyFn', function (done) {
it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) {
const subscriptionKeyFn = (topic, callback) => {
expect(topic).to.equal(key.toString())
expect(topic).to.equal(`/${keyRef}`)
callback(new Error('DISCARD MESSAGE'))
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -448,16 +451,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -478,7 +481,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
const keyNew = Buffer.from(`${key.toString()}new`)
let receivedMessage = false

Expand All @@ -489,16 +492,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down

0 comments on commit 510bd2c

Please sign in to comment.