Skip to content

Commit

Permalink
Customizable gossipsub backoff on unsubscribe (#665)
Browse files Browse the repository at this point in the history
* Customizable gossipsub backoff on unsubscribe
* change default to 5s
  • Loading branch information
Menduist authored Dec 2, 2021
1 parent c19b966 commit 6893bd9
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 6 deletions.
7 changes: 5 additions & 2 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
GossipSubParams(
explicit: true,
pruneBackoff: 1.minutes,
unsubcribeBackoff: 5.seconds,
floodPublish: true,
gossipFactor: 0.25,
d: GossipSubD,
Expand Down Expand Up @@ -77,6 +78,8 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
err("gossipsub: dOut parameter error, Number of outbound connections to keep in the mesh. Must be less than D_lo and at most D/2")
elif parameters.gossipThreshold >= 0:
err("gossipsub: gossipThreshold parameter error, Must be < 0")
elif parameters.unsubcribeBackoff.seconds <= 0:
err("gossipsub: unsubcribeBackoff parameter error, Must be > 0 seconds")
elif parameters.publishThreshold >= parameters.gossipThreshold:
err("gossipsub: publishThreshold parameter error, Must be < gossipThreshold")
elif parameters.graylistThreshold >= parameters.publishThreshold:
Expand Down Expand Up @@ -413,11 +416,11 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
backoff: g.parameters.unsubcribeBackoff.seconds.uint64)])))
g.broadcast(mpeers, msg)

for peer in mpeers:
g.pruned(peer, topic)
g.pruned(peer, topic, backoff = some(g.parameters.unsubcribeBackoff))

g.mesh.del(topic)

Expand Down
15 changes: 12 additions & 3 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,20 @@ proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} =

trace "grafted", peer=p, topic

proc pruned*(g: GossipSub, p: PubSubPeer, topic: string, setBackoff: bool = true) {.raises: [Defect].} =
proc pruned*(g: GossipSub,
p: PubSubPeer,
topic: string,
setBackoff: bool = true,
backoff = none(Duration)) {.raises: [Defect].} =
if setBackoff:
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
let
backoffDuration =
if isSome(backoff): backoff.get()
else: g.parameters.pruneBackoff
backoffMoment = Moment.fromNow(backoffDuration)

g.backingOff
.mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff
.mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoffMoment

g.peerStats.withValue(p.peerId, stats):
stats.topicInfos.withValue(topic, info):
Expand Down
1 change: 1 addition & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type
GossipSubParams* = object
explicit*: bool
pruneBackoff*: Duration
unsubcribeBackoff*: Duration
floodPublish*: bool
gossipFactor*: float64
d*: int
Expand Down
64 changes: 64 additions & 0 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,70 @@ suite "GossipSub":

await allFuturesThrowing(nodesFut.concat())

asyncTest "GossipSub unsub - resub faster than backoff":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
handlerFut.complete(true)

let
nodes = generateNodes(2, gossip = true)

# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)

# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))

await subscribeNodes(nodes)

nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)

var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar")

await allFuturesThrowing(subs)

nodes[0].unsubscribe("foobar", handler)
nodes[0].subscribe("foobar", handler)

# regular backoff is 60 seconds, so we must not wait that long
await (waitSub(nodes[0], nodes[1], "foobar") and waitSub(nodes[1], nodes[0], "foobar")).wait(30.seconds)

var validatorFut = newFuture[bool]()
proc validator(topic: string,
message: Message):
Future[ValidationResult] {.async.} =
check topic == "foobar"
validatorFut.complete(true)
result = ValidationResult.Accept

nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1

check (await validatorFut) and (await handlerFut)

await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)

await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)

await allFuturesThrowing(nodesFut.concat())

asyncTest "e2e - GossipSub should add remote peer topic subscriptions":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
Expand Down
2 changes: 1 addition & 1 deletion tests/pubsub/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ proc generateNodes*(
msgIdProvider = msgIdProvider,
anonymize = anonymize,
maxMessageSize = maxMessageSize,
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p))
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubcribeBackoff = 1.seconds; p))
# set some testing params, to enable scores
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0
Expand Down

0 comments on commit 6893bd9

Please sign in to comment.