Skip to content

Commit

Permalink
feat(networking): prune peers from same ip beyond colocation limit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta authored May 31, 2023
1 parent ffac776 commit 047d1cf
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 31 deletions.
37 changes: 37 additions & 0 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,40 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 5,
storage = nil)

asyncTest "colocationLimit is enforced by pruneConnsByIp()":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])

# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])

# force max 1 conn per ip
nodes[0].peerManager.colocationLimit = 1
nodes[0].peerManager.updateIpTable()

# table is updated and we have 4 conns (2in 2out)
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 4
nodes[0].peerManager.switch.connManager.getConnections().len == 4
nodes[0].peerManager.peerStore.peers().len == 4

await nodes[0].peerManager.pruneConnsByIp()

# peers are pruned, max 1 conn per ip
nodes[0].peerManager.updateIpTable()
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 1
nodes[0].peerManager.switch.connManager.getConnections().len == 1
nodes[0].peerManager.peerStore.peers().len == 1
109 changes: 78 additions & 31 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import
chronicles,
metrics,
libp2p/multistream,
libp2p/muxers/muxer
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver
import
../../../common/nimchronos,
../../waku_core,
Expand Down Expand Up @@ -54,11 +55,14 @@ const
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(5)

# How often the peer store is updated with metrics
UpdateMetricsInterval = chronos.seconds(15)
# How often metrics and logs are shown/updated
LogAndMetricsInterval = chronos.seconds(60)

# How often to log peer manager metrics
LogSummaryInterval = chronos.seconds(60)
# Prune by ip interval
PruneByIpInterval = chronos.seconds(30)

# Max peers that we allow from the same IP
ColocationLimit = 5

type
PeerManager* = ref object of RootObj
Expand All @@ -70,6 +74,8 @@ type
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
outPeersTarget*: int
ipTable*: Table[string, seq[PeerId]]
colocationLimit*: int
started: bool

proc protocolMatcher*(codec: string): Matcher =
Expand Down Expand Up @@ -278,44 +284,60 @@ proc canBeConnected*(pm: PeerManager,
# Initialisation #
##################

# currently disabled. note that peer connection state connected/disconnected
# cant be tracked using this handler when more than one conn is allowed and
# when using autonat. eg if a peer has 2 conns and one is disconnected we cant
# assume that the peer is disconnected, because the other one might still be active.
# note that even with maxconn = 1, autonat forces more than one connection.
# called when a connection i) is created or ii) is closed
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =

case event.kind
of ConnEventKind.Connected:
let direction = if event.incoming: Inbound else: Outbound
discard
of ConnEventKind.Disconnected:
discard

# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
var direction: PeerDirection
var connectedness: Connectedness

if event.kind == PeerEventKind.Joined:
let direction = if event.initiator: Outbound else: Inbound
pm.peerStore[ConnectionBook][peerId] = Connected
pm.peerStore[DirectionBook][peerId] = direction
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
elif event.kind == PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
return
pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix)

elif event.kind == PeerEventKind.Left:
pm.peerStore[DirectionBook][peerId] = UnknownDirection
pm.peerStore[ConnectionBook][peerId] = CanConnect
proc updateIpTable*(pm: PeerManager) =
# clean table
pm.ipTable = initTable[string, seq[PeerId]]()

# populate ip->peerIds from existing out/in connections
for peerId, conn in pm.switch.connManager.getConnections():
if conn.len == 0:
continue

# we may want to enable it only in inbound peers
#if conn[0].connection.transportDir != In:
# continue

# assumes just one physical connection per peer
let observedAddr = conn[0].connection.observedAddr
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
pm.ipTable.mgetOrPut(ip, newSeq[PeerId]()).add(peerId)

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
return

proc new*(T: type PeerManager,
switch: Switch,
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,): PeerManager =
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = ColocationLimit,): PeerManager =

let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
Expand All @@ -338,7 +360,8 @@ proc new*(T: type PeerManager,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
outPeersTarget: max(maxConnections div 10, 10),
maxFailedAttempts: maxFailedAttempts)
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit)

proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
Expand All @@ -360,6 +383,7 @@ proc new*(T: type PeerManager,
pm.peerStore[AddressBook].addHandler(peerStoreChanged)

pm.serviceSlots = initTable[string, RemotePeerInfo]()
pm.ipTable = initTable[string, seq[PeerId]]()

if not storage.isNil():
debug "found persistent peer storage"
Expand Down Expand Up @@ -520,6 +544,23 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
for p in inRelayPeers[0..<connsToPrune]:
await pm.switch.disconnect(p)

proc pruneConnsByIp*(pm: PeerManager) {.async.} =
## prunes connections based on ip colocation, allowing no more
## than ColocationLimit inbound connections from same ip
##

# update the table tracking ip and the connected peers
pm.updateIpTable()

# trigger disconnections based on colocationLimit
for ip, peersInIp in pm.ipTable.pairs:
if peersInIp.len > pm.colocationLimit:
let connsToPrune = peersInIp.len - pm.colocationLimit
for peerId in peersInIp[0..<connsToPrune]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
await pm.switch.disconnect(peerId)
pm.peerStore.delete(peerId)

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
Expand Down Expand Up @@ -603,6 +644,12 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

proc pruneConnsByIpLoop(pm: PeerManager) {.async.} =
debug "Starting prune peer by ip loop"
while pm.started:
await pm.pruneConnsByIp()
await sleepAsync(PruneByIpInterval)

# Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
debug "Starting prune peerstore loop"
Expand All @@ -617,8 +664,9 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)

proc logSummary*(pm: PeerManager) {.async.} =
heartbeat "Log peer manager summary", LogSummaryInterval:
proc logAndMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling log and metrics run", LogAndMetricsInterval:
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
Expand All @@ -634,8 +682,7 @@ proc logSummary*(pm: PeerManager) {.async.} =
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len

proc updateMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
# update prometheus metrics
for proto in pm.peerStore.getWakuProtos():
let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto)
let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto)
Expand All @@ -646,10 +693,10 @@ proc updateMetrics(pm: PeerManager) {.async.} =

proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.updateMetrics()
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()
asyncSpawn pm.logSummary()
asyncSpawn pm.pruneConnsByIpLoop()
asyncSpawn pm.logAndMetrics()

proc stop*(pm: PeerManager) =
pm.started = false

0 comments on commit 047d1cf

Please sign in to comment.