Skip to content

Commit

Permalink
chore: add peer manager config to builder (#1816)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta authored Jun 23, 2023
1 parent 4573e8c commit 71c4ac1
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 42 deletions.
1 change: 1 addition & 0 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ proc initNode(conf: WakuNodeConf,
agentString = some(conf.agentString)
)
builder.withWakuDiscv5(wakuDiscv5.get(nil))
builder.withPeerManagerConfig(maxRelayPeers = some(conf.maxRelayPeers.int))

node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)

Expand Down
5 changes: 5 additions & 0 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type
defaultValue: 50
name: "max-connections" }: uint16

maxRelayPeers* {.
desc: "Maximum allowed number of relay peers."
defaultValue: 50
name: "max-relay-peers" }: uint16

peerStoreCapacity* {.
desc: "Maximum stored peers in the peerstore."
name: "peer-store-capacity" }: Option[int]
Expand Down
5 changes: 5 additions & 0 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 1,
maxRelayPeers = 5,
storage = nil)

# Create 15 peers and add them to the peerstore
Expand Down Expand Up @@ -659,6 +660,7 @@ procSuite "Peer Manager":
initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
backoffFactor = 2,
maxFailedAttempts = 10,
maxRelayPeers = 5,
storage = nil)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
Expand Down Expand Up @@ -707,6 +709,7 @@ procSuite "Peer Manager":
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxRelayPeers = 5,
maxFailedAttempts = 150,
storage = nil)

Expand All @@ -718,6 +721,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 10,
maxRelayPeers = 5,
storage = nil)

let pm = PeerManager.new(
Expand All @@ -726,6 +730,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 5,
maxRelayPeers = 5,
storage = nil)

asyncTest "colocationLimit is enforced by pruneConnsByIp()":
Expand Down
2 changes: 1 addition & 1 deletion tests/v2/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
secureKey = if secureKey != "": some(secureKey) else: none(string),
secureCert = if secureCert != "": some(secureCert) else: none(string),
agentString = agentString,

)
builder.withWakuDiscv5(wakuDiscv5.get(nil))

Expand Down
62 changes: 48 additions & 14 deletions waku/v2/node/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import
chronicles,
libp2p/crypto/crypto,
libp2p/builders,
libp2p/nameresolving/nameresolver
libp2p/nameresolving/nameresolver,
libp2p/transports/wstransport
import
../waku_enr,
../waku_discv5,
./config,
./peer_manager,
./waku_node
./waku_node,
./waku_switch


type
Expand All @@ -30,7 +32,9 @@ type
# Peer storage and peer manager
peerStorage: Option[PeerStorage]
peerStorageCapacity: Option[int]
peerManager: Option[PeerManager]

# Peer manager config
maxRelayPeers: Option[int]

# Libp2p switch
switchMaxConnections: Option[int]
Expand Down Expand Up @@ -104,8 +108,10 @@ proc withPeerStorage*(builder: var WakuNodeBuilder, peerStorage: PeerStorage, ca

builder.peerStorageCapacity = capacity

proc withPeerManager*(builder: var WakuNodeBuilder, peerManager: PeerManager) =
builder.peerManager = some(peerManager)
proc withPeerManagerConfig*(builder: var WakuNodeBuilder,
maxRelayPeers = none(int)) =
builder.maxRelayPeers = maxRelayPeers



## Waku switch
Expand Down Expand Up @@ -149,22 +155,50 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
if builder.netConfig.isNone():
return err("network configuration is required")

# fallbck to max connections if not set
var maxRelayPeers: int
if builder.maxRelayPeers.isNone():
maxRelayPeers = builder.switchMaxConnections.get(builders.MaxConnections)
else:
maxRelayPeers = builder.maxRelayPeers.get()

var switch: Switch
try:
switch = newWakuSwitch(
privKey = builder.nodekey,
address = builder.netConfig.get().hostAddress,
wsAddress = builder.netConfig.get().wsHostAddress,
transportFlags = {ServerFlags.ReuseAddr},
rng = rng,
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
wssEnabled = builder.netConfig.get().wssEnabled,
secureKeyPath = builder.switchSslSecureKey.get(""),
secureCertPath = builder.switchSslSecureCert.get(""),
nameResolver = builder.switchNameResolver.get(nil),
sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false),
agentString = builder.switchAgentString,
peerStoreCapacity = builder.peerStorageCapacity,
services = @[Service(getAutonatService(rng))],
)
except CatchableError:
return err("failed to create switch: " & getCurrentExceptionMsg())

let peerManager = PeerManager.new(
switch = switch,
storage = builder.peerStorage.get(nil),
maxRelayPeers = maxRelayPeers,
)

var node: WakuNode
try:
node = WakuNode.new(
rng = rng,
nodeKey = builder.nodeKey.get(),
netConfig = builder.netConfig.get(),
enr = builder.record,
peerStorage = builder.peerStorage.get(nil),
peerStoreCapacity = builder.peerStorageCapacity,
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
nameResolver = builder.switchNameResolver.get(nil),
agentString = builder.switchAgentString,
secureKey = builder.switchSslSecureKey.get(""),
secureCert = builder.switchSslSecureCert.get(""),
sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false),
switch = switch,
wakuDiscv5 = builder.wakuDiscv5,
peerManager = peerManager,
rng = rng,
)
except Exception:
return err("failed to build WakuNode instance: " & getCurrentExceptionMsg())
Expand Down
16 changes: 15 additions & 1 deletion waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type
maxFailedAttempts*: int
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
maxRelayPeers*: int
outPeersTarget*: int
ipTable*: Table[string, seq[PeerId]]
colocationLimit*: int
Expand Down Expand Up @@ -333,6 +334,7 @@ proc updateIpTable*(pm: PeerManager) =

proc new*(T: type PeerManager,
switch: Switch,
maxRelayPeers: int = 50,
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
Expand All @@ -347,6 +349,17 @@ proc new*(T: type PeerManager,
maxConnections = maxConnections
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")

if maxRelayPeers > maxConnections:
error "Max number of relay peers can't be greater the max amount of connections",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers
raise newException(Defect, "Max number of relay peers can't be greater the max amount of connections")

if maxRelayPeers == maxConnections:
warn "Max number of relay peers is equal to max amount of connections, peer wont be contribute to service peers",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers

# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
if backoff.weeks() > 1:
Expand All @@ -361,7 +374,8 @@ proc new*(T: type PeerManager,
backoffFactor: backoffFactor,
outPeersTarget: max(maxConnections div 2, 10),
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit)
colocationLimit: colocationLimit,
maxRelayPeers: maxRelayPeers)

proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
Expand Down
29 changes: 3 additions & 26 deletions waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -159,39 +159,16 @@ proc new*(T: type WakuNode,
nodeKey: crypto.PrivateKey,
netConfig: NetConfig,
enr: Option[enr.Record],
peerStorage: PeerStorage = nil,
maxConnections = builders.MaxConnections,
secureKey: string = "",
secureCert: string = "",
nameResolver: NameResolver = nil,
sendSignedPeerRecord = false,
switch: Switch,
wakuDiscv5 = none(WakuDiscoveryV5),
agentString = none(string), # defaults to nim-libp2p version
peerStoreCapacity = none(int), # defaults to 1.25 maxConnections
peerManager: PeerManager,
# TODO: make this argument required after tests are updated
rng: ref HmacDrbgContext = crypto.newRng()
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
## Creates a Waku Node instance.

info "Initializing networking", addrs= $netConfig.announcedAddresses

let switch = newWakuSwitch(
some(nodekey),
address = netConfig.hostAddress,
wsAddress = netConfig.wsHostAddress,
transportFlags = {ServerFlags.ReuseAddr},
rng = rng,
maxConnections = maxConnections,
wssEnabled = netConfig.wssEnabled,
secureKeyPath = secureKey,
secureCertPath = secureCert,
nameResolver = nameResolver,
sendSignedPeerRecord = sendSignedPeerRecord,
agentString = agentString,
peerStoreCapacity = peerStoreCapacity,
services = @[Service(getAutonatService(rng))],
)

let enr =
if enr.isNone():
let nodeEnrRes = getEnr(netConfig, wakuDiscv5, nodekey)
Expand All @@ -203,7 +180,7 @@ proc new*(T: type WakuNode,
else: enr.get()

return WakuNode(
peerManager: PeerManager.new(switch, peerStorage),
peerManager: peerManager,
switch: switch,
rng: rng,
enr: enr,
Expand Down

0 comments on commit 71c4ac1

Please sign in to comment.