Skip to content

Commit

Permalink
docs: update message filtering example (#1362)
Browse files Browse the repository at this point in the history
Updates the example to use the new pubsub `addEventListener`-style API along with the README.

Also updates the test to actually test that the relevant messages were received.

Fixes #1288
  • Loading branch information
achingbrain committed Aug 30, 2022
1 parent 1f38ab7 commit 0e7096d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 69 deletions.
53 changes: 36 additions & 17 deletions examples/pubsub/message-filtering/1.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,42 @@ const createNode = async () => {
await node2.dial(node3.peerId)

//subscribe
node1.pubsub.addEventListener(topic, (evt) => {
node1.pubsub.addEventListener('message', (evt) => {
if (evt.detail.topic !== topic) {
return
}

// Will not receive own published messages by default
console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)}`)
})
node1.pubsub.subscribe(topic)

node2.pubsub.addEventListener(topic, (evt) => {
node2.pubsub.addEventListener('message', (evt) => {
if (evt.detail.topic !== topic) {
return
}

console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)}`)
})
node2.pubsub.subscribe(topic)

node3.pubsub.addEventListener('message', (evt) => {
if (evt.detail.topic !== topic) {
return
}

node3.pubsub.addEventListener(topic, (evt) => {
console.log(`node3 received: ${uint8ArrayToString(evt.detail.data)}`)
})
node3.pubsub.subscribe(topic)

// wait for subscriptions to propagate
await delay(1000)

const validateFruit = (msgTopic, msg) => {
const fruit = uint8ArrayToString(msg.data)
const validFruit = ['banana', 'apple', 'orange']

// car is not a fruit !
if (!validFruit.includes(fruit)) {
throw new Error('no valid fruit received')
}
Expand All @@ -68,18 +86,19 @@ const createNode = async () => {
node2.pubsub.topicValidators.set(topic, validateFruit)
node3.pubsub.topicValidators.set(topic, validateFruit)

// node1 publishes "fruits" every five seconds
var count = 0;
const myFruits = ['banana', 'apple', 'car', 'orange'];
// car is not a fruit !
setInterval(() => {
console.log('############## fruit ' + myFruits[count] + ' ##############')
node1.pubsub.publish(topic, uint8ArrayFromString(myFruits[count])).catch(err => {
console.info(err)
})
count++
if (count == myFruits.length) {
count = 0
}
}, 5000)
// node1 publishes "fruits"
for (const fruit of ['banana', 'apple', 'car', 'orange']) {
console.log('############## fruit ' + fruit + ' ##############')
await node1.pubsub.publish(topic, uint8ArrayFromString(fruit))
}

// wait a few seconds for messages to be received
await delay(5000)
console.log('############## all messages sent ##############')
})()

async function delay (ms) {
await new Promise((resolve) => {
setTimeout(() => resolve(), ms)
})
}
37 changes: 20 additions & 17 deletions examples/pubsub/message-filtering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,30 @@ Now we' can subscribe to the fruit topic and log incoming messages.
```JavaScript
const topic = 'fruit'

node1.pubsub.on(topic, (msg) => {
node1.pubsub.addEventListener('message', (msg) => {
if (msg.detail.topic !== topic) {
return
}

console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
})
await node1.pubsub.subscribe(topic)

node2.pubsub.on(topic, (msg) => {
node2.pubsub.addEventListener('message', (msg) => {
if (msg.detail.topic !== topic) {
return
}

console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
})
await node2.pubsub.subscribe(topic)

node3.pubsub.on(topic, (msg) => {
console.log(`node3 received: ${uint8ArrayToString(msg.data)}`)
node3.pubsub.addEventListener('message', (msg) => {
if (msg.detail.topic !== topic) {
return
}

console.log(`node3 received: ${uint8ArrayToString(msg.data)}`)
})
await node3.pubsub.subscribe(topic)
```
Expand All @@ -83,19 +95,10 @@ node3.pubsub.topicValidators.set(topic, validateFruit)
In this example, node one has an outdated version of the system, or is a malicious node. When it tries to publish fruit, the messages are re-shared and all the nodes share the message. However, when it tries to publish a vehicle the message is not re-shared.

```JavaScript
var count = 0;
const myFruits = ['banana', 'apple', 'car', 'orange'];

setInterval(() => {
console.log('############## fruit ' + myFruits[count] + ' ##############')
node1.pubsub.publish(topic, new TextEncoder().encode(myFruits[count])).catch(err => {
console.error(err)
})
count++
if (count == myFruits.length) {
count = 0
}
}, 5000)
for (const fruit of ['banana', 'apple', 'car', 'orange']) {
console.log('############## fruit ' + fruit + ' ##############')
await node1.pubsub.publish(topic, uint8ArrayFromString(fruit))
}
```

Result
Expand Down
53 changes: 18 additions & 35 deletions examples/pubsub/message-filtering/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,11 @@ import { fileURLToPath } from 'url'

const __dirname = path.dirname(fileURLToPath(import.meta.url))

const stdout = [
{
topic: 'banana',
messageCount: 2
},
{
topic: 'apple',
messageCount: 2
},
{
topic: 'car',
messageCount: 0
},
{
topic: 'orange',
messageCount: 2
},
]
// holds messages received by peers
const messages = {}

export async function test () {
const defer = pDefer()
let topicCount = 0
let topicMessageCount = 0

process.stdout.write('message-filtering/1.js\n')

Expand All @@ -38,26 +20,27 @@ export async function test () {
})

proc.all.on('data', async (data) => {
// End
if (topicCount === stdout.length) {
defer.resolve()
proc.all.removeAllListeners('data')
}

process.stdout.write(data)
const line = uint8ArrayToString(data)

if (stdout[topicCount] && line.includes(stdout[topicCount].topic)) {
// Validate previous number of messages
if (topicCount > 0 && topicMessageCount > stdout[topicCount - 1].messageCount) {
defer.reject()
throw new Error(`topic ${stdout[topicCount - 1].topic} had ${topicMessageCount} messages instead of ${stdout[topicCount - 1].messageCount}`)
// End
if (line.includes('all messages sent')) {
if (messages.car > 0) {
defer.reject(new Error('Message validation failed - peers failed to filter car messages'))
}

for (const fruit of ['banana', 'apple', 'orange']) {
if (messages[fruit] !== 2) {
defer.reject(new Error(`Not enough ${fruit} messages - received ${messages[fruit] ?? 0}, expected 2`))
}
}

topicCount++
topicMessageCount = 0
} else {
topicMessageCount++
defer.resolve()
}

if (line.includes('received:')) {
const fruit = line.split('received:')[1].trim()
messages[fruit] = (messages[fruit] ?? 0) + 1
}
})

Expand Down

0 comments on commit 0e7096d

Please sign in to comment.