Skip to content

Commit

Permalink
refactor(networking): use addServicePeer where needed + add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta committed Feb 21, 2023
1 parent f7584df commit e9ede91
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 137 deletions.
7 changes: 4 additions & 3 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import libp2p/[switch, # manage transports, a single entry poi
nameresolving/dnsresolver]# define DNS resolution
import
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/protocol/waku_lightpush/rpc,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_store,
Expand Down Expand Up @@ -489,7 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
echo "Connecting to storenode: " & $(storenode.get())

node.mountStoreClient()
node.setStorePeer(storenode.get())
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)

proc storeHandler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:
Expand All @@ -509,13 +510,13 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
await mountLightPush(node)

node.mountLightPushClient()
node.setLightPushPeer(conf.lightpushnode)
node.peerManager.addServicePeer(parseRemotePeerInfo(conf.lightpushnode), WakuLightpushCodec)

if conf.filternode != "":
await node.mountFilter()
await node.mountFilterClient()

node.setFilterPeer(parseRemotePeerInfo(conf.filternode))
node.peerManager.addServicePeer(parseRemotePeerInfo(conf.filternode), WakuFilterCodec)

proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
Expand Down
10 changes: 8 additions & 2 deletions apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import
libp2p/errors,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/node/waku_node,
../../../waku/v2/utils/peers,
../../../waku/v2/node/peer_manager,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_store,
# Chat 2 imports
../chat2/chat2,
# Common cli config
Expand Down Expand Up @@ -281,10 +285,12 @@ when isMainModule:
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)

if conf.storenode != "":
setStorePeer(bridge.nodev2, conf.storenode)
let storePeer = parseRemotePeerInfo(conf.storenode)
bridge.nodev2.peerManager.addServicePeer(storePeer, WakuStoreCodec)

if conf.filternode != "":
setFilterPeer(bridge.nodev2, conf.filternode)
let filterPeer = parseRemotePeerInfo(conf.filternode)
bridge.nodev2.peerManager.addServicePeer(filterPeer, WakuFilterCodec)

if conf.rpc:
let ta = initTAddress(conf.rpcAddress,
Expand Down
8 changes: 6 additions & 2 deletions apps/wakubridge/wakubridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import
libp2p/nameresolving/nameresolver,
../../waku/v2/utils/time,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/node/message_cache,
../../waku/v2/node/waku_node,
../../waku/v2/node/peer_manager,
Expand Down Expand Up @@ -428,11 +430,13 @@ when isMainModule:

if conf.storenode != "":
mountStoreClient(bridge.nodev2)
setStorePeer(bridge.nodev2, conf.storenode)
let storeNode = parseRemotePeerInfo(conf.storenode)
bridge.nodev2.peerManager.addServicePeer(storeNode, WakuStoreCodec)

if conf.filternode != "":
waitFor mountFilterClient(bridge.nodev2)
setFilterPeer(bridge.nodev2, conf.filternode)
let filterNode = parseRemotePeerInfo(conf.filternode)
bridge.nodev2.peerManager.addServicePeer(filterNode, WakuFilterCodec)

if conf.rpc:
let ta = initTAddress(conf.rpcAddress,
Expand Down
26 changes: 13 additions & 13 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ procSuite "Peer Manager":
await node.mountSwap()
node.mountStoreClient()

node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())

node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
node.peerManager.addServicePeer(swapPeer.toRemotePeerInfo(), WakuSwapCodec)
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)

# Check peers were successfully added to peer manager
check:
Expand Down Expand Up @@ -127,7 +126,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.mountRelay()))

# Test default connectedness for new peers
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())
check:
# No information about node2's connectedness
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
Expand Down Expand Up @@ -160,7 +159,7 @@ procSuite "Peer Manager":

await nodes[0].start()
await nodes[0].mountRelay()
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo())

# Set a low backoff to speed up test: 2, 4, 8, 16
nodes[0].peerManager.initialBackoffInSec = 2
Expand Down Expand Up @@ -236,7 +235,8 @@ procSuite "Peer Manager":
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.mountRelay() # This should trigger a reconnect
await node3.mountRelay()
await node3.peerManager.connectToRelayPeers()

check:
# Reconnected to node2 after "restart"
Expand Down Expand Up @@ -313,12 +313,12 @@ procSuite "Peer Manager":
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])

# Attempt to connect to all known peers supporting a given protocol
await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec))
# Connect to relay peers
await nodes[0].peerManager.connectToRelayPeers()

check:
# Peerstore track all three peers
Expand Down Expand Up @@ -512,7 +512,7 @@ procSuite "Peer Manager":

# Create 15 peers and add them to the peerstore
let peers = toSeq(1..15).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
for p in peers: pm.addPeer(p, "")
for p in peers: pm.addPeer(p)

# Check that we have 15 peers in the peerstore
check:
Expand Down
8 changes: 6 additions & 2 deletions tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,12 @@ procSuite "Waku v2 JSON-RPC API - Admin":
filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])
storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr])

node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)

# Mock that we connected in the past so Identify populated this
node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuFilterCodec]
node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec]

let response = await client.get_waku_v2_admin_v1_peers()

Expand Down
3 changes: 2 additions & 1 deletion tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import
../../../waku/v2/node/jsonrpc/filter/handlers as filter_api,
../../../waku/v2/node/jsonrpc/filter/client as filter_api_client,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_filter,
../../../waku/v2/protocol/waku_filter/rpc,
../../../waku/v2/protocol/waku_filter/client,
../../../waku/v2/utils/peers,
Expand All @@ -40,7 +41,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":
await node1.mountFilter()
await node2.mountFilterClient()

node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo())
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)

# RPC server setup
let
Expand Down
3 changes: 2 additions & 1 deletion tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import
../../../waku/v2/protocol/waku_message,
../../../waku/v2/protocol/waku_archive,
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
../../../waku/v2/protocol/waku_store,
../../../waku/v2/protocol/waku_store/rpc,
../../../waku/v2/utils/peers,
../../../waku/v2/utils/time,
Expand Down Expand Up @@ -66,7 +67,7 @@ procSuite "Waku v2 JSON-RPC API - Store":
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()

node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo())
node.peerManager.addServicePeer(listenSwitch.peerInfo.toRemotePeerInfo(), WakuStoreCodec)

listenSwitch.mount(node.wakuRelay)
listenSwitch.mount(node.wakuStore)
Expand Down
102 changes: 56 additions & 46 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ else:


import
std/[options, sets, sequtils, times],
std/[options, sets, sequtils, times, strutils],
chronos,
chronicles,
metrics,
Expand All @@ -24,6 +24,7 @@ declarePublicCounter waku_node_conns_initiated, "Number of connections initiated
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"]
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
declarePublicGauge waku_service_peers, "Configured waku service peers protocol/peerId", labels = ["protocol", "peerId"]

logScope:
topics = "waku node peer_manager"
Expand Down Expand Up @@ -61,6 +62,16 @@ type
serviceSlots*: Table[string, RemotePeerInfo]
started: bool

proc protocolMatcher*(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec
proc match(proto: string): bool {.gcsafe.} =
## Matches a proto with any postfix to the provided codec.
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
return proto.startsWith(codec)

return match

####################
# Helper functions #
####################
Expand Down Expand Up @@ -244,7 +255,7 @@ proc new*(T: type PeerManager,
# Manager interface #
#####################

proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) =
# Adds peer to manager for the specified protocol

if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
Expand All @@ -260,14 +271,11 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Peer already managed
return

debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, proto = proto
trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs

pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey

# TODO: Remove this once service slots is ready
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto

# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
Expand All @@ -279,23 +287,23 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str
return

info "Adding peer to service slots", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto
waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]])

# Set peer for service slot
pm.serviceSlots[proto] = remotePeerInfo

# TODO: Remove proto once fully refactored
pm.addPeer(remotePeerInfo, proto)
pm.addPeer(remotePeerInfo)

proc reconnectPeers*(pm: PeerManager,
proto: string,
protocolMatcher: Matcher,
backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.

debug "Reconnecting peers", proto=proto

for storedInfo in pm.peerStore.peers(protocolMatcher):
# Proto is not persisted, we need to iterate over all peers.
for storedInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if storedInfo.connectedness == CannotConnect:
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
Expand Down Expand Up @@ -332,10 +340,11 @@ proc dialPeer*(pm: PeerManager,
# Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.

# First add dialed peer info to peer store, if it does not exist yet...
# First add dialed peer info to peer store, if it does not exist yet..
# TODO: nim libp2p peerstore already adds them
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto
pm.addPeer(remotePeerInfo, proto)
pm.addPeer(remotePeerInfo)

return await pm.dialPeer(remotePeerInfo.peerId,remotePeerInfo.addrs, proto, dialTimeout, source)

Expand Down Expand Up @@ -380,38 +389,32 @@ proc connectToNodes*(pm: PeerManager,
# later.
await sleepAsync(chronos.seconds(5))

# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
debug "Starting relay connectivity loop"
while pm.started:

let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers

# TODO: Enforce a given in/out peers ratio
# TODO: Enforce a given in/out peers ratio

# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
await sleepAsync(ConnectivityLoopInterval)
continue

let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
return

info "Relay connectivity loop",
connectedPeers = numConPeers,
targetConnectedPeers = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
# TODO: Track only relay connections (nwaku/issues/1566)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)

await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
info "Relay peer connections",
connectedPeers = numConPeers,
targetConnectedPeers = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len

await sleepAsync(ConnectivityLoopInterval)
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)

proc prunePeerStore*(pm: PeerManager) =
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
Expand Down Expand Up @@ -447,13 +450,6 @@ proc prunePeerStore*(pm: PeerManager) =
capacity = capacity,
pruned = pruned


proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
while pm.started:
pm.prunePeerStore()
await sleepAsync(PrunePeerStoreInterval)


proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "Selecting peer from peerstore", protocol=proto

Expand Down Expand Up @@ -481,6 +477,20 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

# Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
debug "Starting prune peerstore loop"
while pm.started:
pm.prunePeerStore()
await sleepAsync(PrunePeerStoreInterval)

# Ensures a healthy amount of connected relay peers
proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
debug "Starting relay connectivity loop"
while pm.started:
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)

proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()
Expand Down
Loading

0 comments on commit e9ede91

Please sign in to comment.