Skip to content

Commit

Permalink
fix: wait for gossipsub heartbeat before sending message (#94)
Browse files Browse the repository at this point in the history
Only wait for the sending peer to see the remote in their topic subscriber
list but also wait a little bit longer for gossipsub to rebalance the
mesh to increase the chances of the publisher actually sending a pubsub
message to the subscriber.
  • Loading branch information
achingbrain committed Feb 28, 2023
1 parent 3b14ad0 commit dffdff0
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 26 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
"@libp2p/interface-peer-id": "^2.0.1",
"@libp2p/interface-peer-info": "^1.0.7",
"@multiformats/multiaddr": "^11.4.0",
"delay": "^5.0.0",
"it-all": "^2.0.0",
"it-first": "^2.0.0",
"it-handshake": "^4.1.2",
Expand Down
4 changes: 2 additions & 2 deletions src/pubsub/floodsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { expect } from 'aegir/chai'
import type { Daemon, DaemonFactory, NodeType, SpawnOptions } from '../index.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import first from 'it-first'
import { waitForBothSubscribed } from './utils.js'
import { waitForSubscribed } from './utils.js'

export function floodsubTests (factory: DaemonFactory): void {
const nodeTypes: NodeType[] = ['js', 'go']
Expand Down Expand Up @@ -61,7 +61,7 @@ function runFloodsubTests (factory: DaemonFactory, optionsA: SpawnOptions, optio
}

const publisher = async (): Promise<void> => {
await waitForBothSubscribed(topic, peerA, peerB)
await waitForSubscribed(topic, peerA, peerB)
await peerA.client.pubsub.publish(topic, data)
}

Expand Down
4 changes: 2 additions & 2 deletions src/pubsub/gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { expect } from 'aegir/chai'
import type { Daemon, DaemonFactory, NodeType, SpawnOptions } from '../index.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import first from 'it-first'
import { waitForBothSubscribed } from './utils.js'
import { waitForSubscribed } from './utils.js'

export function gossipsubTests (factory: DaemonFactory): void {
const nodeTypes: NodeType[] = ['js', 'go']
Expand Down Expand Up @@ -61,7 +61,7 @@ function runGossipsubTests (factory: DaemonFactory, optionsA: SpawnOptions, opti
}

const publisher = async (): Promise<void> => {
await waitForBothSubscribed(topic, peerA, peerB)
await waitForSubscribed(topic, peerA, peerB)
await peerA.client.pubsub.publish(topic, data)
}

Expand Down
4 changes: 2 additions & 2 deletions src/pubsub/hybrid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { expect } from 'aegir/chai'
import type { Daemon, DaemonFactory, NodeType, SpawnOptions } from '../index.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import first from 'it-first'
import { waitForBothSubscribed } from './utils.js'
import { waitForSubscribed } from './utils.js'

export function hybridTests (factory: DaemonFactory): void {
const nodeTypes: NodeType[] = ['js', 'go']
Expand Down Expand Up @@ -61,7 +61,7 @@ function runHybridTests (factory: DaemonFactory, optionsA: SpawnOptions, options
}

const publisher = async (): Promise<void> => {
await waitForBothSubscribed(topic, peerA, peerB)
await waitForSubscribed(topic, peerA, peerB)
await peerA.client.pubsub.publish(topic, data)
}

Expand Down
36 changes: 16 additions & 20 deletions src/pubsub/utils.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
import pWaitFor from 'p-wait-for'
import type { Daemon } from '..'
import delay from 'delay'
import type { Daemon } from '../index.js'

export async function waitForBothSubscribed (topic: string, a: Daemon, b: Daemon): Promise<void> {
await a.client.pubsub.subscribe(topic)
await b.client.pubsub.subscribe(topic)

const idA = await a.client.identify()
/**
* Wait for daemon a to see daemon b in it's subscriber list
* for the passed topic
*/
export async function waitForSubscribed (topic: string, a: Daemon, b: Daemon): Promise<void> {
const idB = await b.client.identify()

// wait for subscription stream
await Promise.all([
await pWaitFor(async () => {
const peers = await a.client.pubsub.getSubscribers(topic)
return peers.map(p => p.toString()).includes(idB.peerId.toString())
}, {
interval: 500
}),
await pWaitFor(async () => {
const peers = await b.client.pubsub.getSubscribers(topic)
return peers.map(p => p.toString()).includes(idA.peerId.toString())
}, {
interval: 500
})
])
await pWaitFor(async () => {
const peers = await a.client.pubsub.getSubscribers(topic)
return peers.map(p => p.toString()).includes(idB.peerId.toString())
}, {
interval: 500
})

// wait for the gossipsub heartbeat to rebalance the mesh
await delay(2000)
}

0 comments on commit dffdff0

Please sign in to comment.