Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit relay connections below max conns #1813

Merged
merged 4 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ proc initNode(conf: WakuNodeConf,
sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
agentString = some(conf.agentString)
)
builder.withPeerManagerConfig(maxRelayPeers = some(conf.maxRelayPeers.int))
builder.withPeerManagerConfig(maxRelayPeers = conf.maxRelayPeers)

node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)

Expand Down
3 changes: 1 addition & 2 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ type

maxRelayPeers* {.
desc: "Maximum allowed number of relay peers."
defaultValue: 50
name: "max-relay-peers" }: uint16
name: "max-relay-peers" }: Option[int]

peerStoreCapacity* {.
desc: "Maximum stored peers in the peerstore."
Expand Down
10 changes: 5 additions & 5 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 1,
maxRelayPeers = 5,
maxRelayPeers = some(5),
storage = nil)

# Create 15 peers and add them to the peerstore
Expand Down Expand Up @@ -660,7 +660,7 @@ procSuite "Peer Manager":
initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
backoffFactor = 2,
maxFailedAttempts = 10,
maxRelayPeers = 5,
maxRelayPeers = some(5),
storage = nil)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
Expand Down Expand Up @@ -709,7 +709,7 @@ procSuite "Peer Manager":
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxRelayPeers = 5,
maxRelayPeers = some(5),
maxFailedAttempts = 150,
storage = nil)

Expand All @@ -721,7 +721,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 10,
maxRelayPeers = 5,
maxRelayPeers = some(5),
storage = nil)

let pm = PeerManager.new(
Expand All @@ -730,7 +730,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 5,
maxRelayPeers = 5,
maxRelayPeers = some(5),
storage = nil)

asyncTest "colocationLimit is enforced by pruneConnsByIp()":
Expand Down
9 changes: 1 addition & 8 deletions waku/v2/node/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,6 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
if builder.record.isNone():
return err("node record is required")

# fallbck to max connections if not set
var maxRelayPeers: int
if builder.maxRelayPeers.isNone():
maxRelayPeers = builder.switchMaxConnections.get(builders.MaxConnections)
else:
maxRelayPeers = builder.maxRelayPeers.get()

var switch: Switch
try:
switch = newWakuSwitch(
Expand All @@ -176,7 +169,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
let peerManager = PeerManager.new(
switch = switch,
storage = builder.peerStorage.get(nil),
maxRelayPeers = maxRelayPeers,
maxRelayPeers = builder.maxRelayPeers,
)

var node: WakuNode
Expand Down
63 changes: 34 additions & 29 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const
PrunePeerStoreInterval = chronos.minutes(5)

# How often metrics and logs are shown/updated
LogAndMetricsInterval = chronos.seconds(60)
LogAndMetricsInterval = chronos.minutes(3)

# Max peers that we allow from the same IP
ColocationLimit = 5
Expand All @@ -71,7 +71,8 @@ type
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
maxRelayPeers*: int
outPeersTarget*: int
outRelayPeersTarget: int
inRelayPeersTarget: int
ipTable*: Table[string, seq[PeerId]]
colocationLimit*: int
started: bool
Expand Down Expand Up @@ -343,7 +344,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =

proc new*(T: type PeerManager,
switch: Switch,
maxRelayPeers: int = 50,
maxRelayPeers: Option[int] = none(int),
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
Expand All @@ -358,16 +359,22 @@ proc new*(T: type PeerManager,
maxConnections = maxConnections
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")

if maxRelayPeers > maxConnections:
error "Max number of relay peers can't be greater the max amount of connections",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers
raise newException(Defect, "Max number of relay peers can't be greater the max amount of connections")

if maxRelayPeers == maxConnections:
warn "Max number of relay peers is equal to max amount of connections, peer wont be contribute to service peers",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers
var maxRelayPeersValue = 0
if maxRelayPeers.isSome():
if maxRelayPeers.get() > maxConnections:
error "Max number of relay peers can't be greater than the max amount of connections",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers.get()
raise newException(Defect, "Max number of relay peers can't be greater than the max amount of connections")

if maxRelayPeers.get() == maxConnections:
warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers.get()
maxRelayPeersValue = maxRelayPeers.get()
else:
# Leave by default 20% of connections for service peers
maxRelayPeersValue = maxConnections - (maxConnections div 5)

# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
Expand All @@ -376,15 +383,18 @@ proc new*(T: type PeerManager,
maxBackoff=backoff
raise newException(Defect, "Max backoff time can't be over 1 week")

let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10)

let pm = PeerManager(switch: switch,
peerStore: switch.peerStore,
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
outPeersTarget: max(maxConnections div 2, 10),
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
maxRelayPeers: maxRelayPeers)
colocationLimit: colocationLimit)

proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
Expand Down Expand Up @@ -571,16 +581,12 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let inPeersTarget = maxConnections - pm.outPeersTarget
let inPeersTarget = maxConnections - pm.outRelayPeersTarget

if inRelayPeers.len > inPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len-inPeersTarget)

if outRelayPeers.len >= pm.outPeersTarget:
return
if inRelayPeers.len > pm.inRelayPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)

# Leave some room for service peers
if totalRelayPeers >= (maxConnections - 5):
if outRelayPeers.len >= pm.outRelayPeersTarget:
return

let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
Expand Down Expand Up @@ -670,15 +676,14 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let inPeersTarget = maxConnections - pm.outPeersTarget
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let totalConnections = pm.switch.connManager.getConnections().len

info "Relay peer connections",
inRelayConns = $inRelayPeers.len & "/" & $inPeersTarget,
outRelayConns = $outRelayPeers.len & "/" & $pm.outPeersTarget,
totalRelayConns = totalRelayPeers,
maxConnections = maxConnections,
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
totalConnections = $totalConnections & "/" & $maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len

Expand Down