diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 89e9c435dc..29c518c140 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -33,20 +33,32 @@ import ./testlib/waku2 procSuite "Peer Manager": - asyncTest "Peer dialing works": + asyncTest "connectRelay() works": + # Create 2 nodes + let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + await allFutures(nodes.mapIt(it.start())) + + let connOk = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo()) + check: + connOk == true + nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId) + nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected + + asyncTest "dialPeer() works": # Create 2 nodes let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) + await allFutures(nodes.mapIt(it.mountFilter())) # Dial node2 from node1 - let conn = (await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)).get() - + let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec) # Check connection check: - conn.activity - conn.peerId == nodes[1].peerInfo.peerId + conn.isSome() + conn.get.activity + conn.get.peerId == nodes[1].peerInfo.peerId # Check that node2 is being managed in node1 check: @@ -58,23 +70,25 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.stop())) - asyncTest "Dialing fails gracefully": - # Create 2 nodes + asyncTest "dialPeer() fails gracefully": + # Create 2 nodes and start them let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) - await nodes[0].start() - await nodes[0].mountRelay() - - # Purposefully don't start node2 + let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e") - # Dial node2 from node1 - let connOpt = await nodes[1].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) + # Dial non-existent peer from node1 + let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec) + check: + conn1.isNone() - # Check connection failed gracefully + # Dial peer not supporting given protocol + let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec) check: - connOpt.isNone() + conn2.isNone() - await nodes[0].stop() + await allFutures(nodes.mapIt(it.stop())) asyncTest "Adding, selecting and filtering peers work": let @@ -120,9 +134,7 @@ procSuite "Peer Manager": # Create 2 nodes let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) - await nodes[0].start() - # Do not start node2 - + await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) # Test default connectedness for new peers @@ -131,16 +143,17 @@ procSuite "Peer Manager": # No information about node2's connectedness nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected - # Purposefully don't start node2 - # Attempt dialing node2 from node1 - discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) + # Failed connection + let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e") + require: + (await nodes[0].peerManager.connectRelay(nonExistentPeer)) == false check: # Cannot connect to node2 - nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect + nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect # Successful connection - await nodes[1].start() - discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) + require: + (await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo())) == true check: # Currently connected to node2 nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected @@ -157,28 +170,31 @@ procSuite "Peer Manager": # Create 2 nodes let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) - await nodes[0].start() - await nodes[0].mountRelay() - nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo()) + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + let nonExistentPeer = parseRemotePeerInfo("/ip4/0.0.0.0/tcp/1000/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e") + + nodes[0].peerManager.addPeer(nonExistentPeer) # Set a low backoff to speed up test: 2, 4, 8, 16 nodes[0].peerManager.initialBackoffInSec = 2 nodes[0].peerManager.backoffFactor = 2 - # node2 is not started, so dialing will fail - let conn1 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds) + # try to connect to peer that doesnt exist + let conn1Ok = await nodes[0].peerManager.connectRelay(nonExistentPeer) check: # Cannot connect to node2 - nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect - nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].peerInfo.peerId] == CannotConnect - nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 1 + nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect + nodes[0].peerManager.peerStore[ConnectionBook][nonExistentPeer.peerId] == CannotConnect + nodes[0].peerManager.peerStore[NumberFailedConnBook][nonExistentPeer.peerId] == 1 - # And the connection failed - conn1.isNone() == true + # Connection attempt failed + conn1Ok == false # Right after failing there is a backoff period nodes[0].peerManager.peerStore.canBeConnected( - nodes[1].peerInfo.peerId, + nonExistentPeer.peerId, nodes[0].peerManager.initialBackoffInSec, nodes[0].peerManager.backoffFactor) == false @@ -192,13 +208,11 @@ procSuite "Peer Manager": nodes[0].peerManager.initialBackoffInSec, nodes[0].peerManager.backoffFactor) == true - await nodes[1].start() - await nodes[1].mountRelay() - - # Now we can connect and failed count is reset - let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds) + # After a successful connection, the number of failed connections is reset + nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4 + let conn2Ok = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo()) check: - conn2.isNone() == false + conn2Ok == true nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 0 await allFutures(nodes.mapIt(it.stop())) @@ -217,7 +231,8 @@ procSuite "Peer Manager": await node1.mountRelay() await node2.mountRelay() - discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) + require: + (await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true check: # Currently connected to node2 node1.peerManager.peerStore.peers().len == 1 @@ -265,7 +280,8 @@ procSuite "Peer Manager": await node2.mountRelay() node2.wakuRelay.codec = betaCodec - discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds) + require: + (await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true check: # Currently connected to node2 node1.peerManager.peerStore.peers().len == 1 @@ -353,9 +369,10 @@ procSuite "Peer Manager": let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) # all nodes connect to peer 0 - discard await nodes[1].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds) - discard await nodes[2].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds) - discard await nodes[3].peerManager.dialPeer(peerInfos[0], WakuRelayCodec, 2.seconds) + require: + (await nodes[1].peerManager.connectRelay(peerInfos[0])) == true + (await nodes[2].peerManager.connectRelay(peerInfos[0])) == true + (await nodes[3].peerManager.connectRelay(peerInfos[0])) == true check: # Peerstore track all three peers diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 05802643e5..1edc59f6c5 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -138,9 +138,9 @@ suite "WakuNode": await node3.start() await node3.mountRelay() - discard await node1.peerManager.dialPeer(node2.switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec) + discard await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo()) await sleepAsync(3.seconds) - discard await node1.peerManager.dialPeer(node3.switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec) + discard await node1.peerManager.connectRelay(node3.switch.peerInfo.toRemotePeerInfo()) check: # Verify that only the first connection succeeded diff --git a/tests/v2/waku_relay/test_waku_relay.nim b/tests/v2/waku_relay/test_waku_relay.nim index 92c15bfab5..a3e532b21f 100644 --- a/tests/v2/waku_relay/test_waku_relay.nim +++ b/tests/v2/waku_relay/test_waku_relay.nim @@ -116,9 +116,9 @@ suite "Waku Relay": await allFutures(srcSwitch.start(), dstSwitch.start()) let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo() - let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec) + let connOk = await srcPeerManager.connectRelay(dstPeerInfo) require: - conn.isSome() + connOk == true ## Given let networkTopic = "test-network1" @@ -174,9 +174,9 @@ suite "Waku Relay": await allFutures(srcSwitch.start(), dstSwitch.start()) let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo() - let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec) + let connOk = await srcPeerManager.connectRelay(dstPeerInfo) require: - conn.isSome() + connOk == true ## Given let networkTopic = "test-network1" diff --git a/tests/v2/waku_relay/test_wakunode_relay.nim b/tests/v2/waku_relay/test_wakunode_relay.nim index 6a2dfebe85..99aac81882 100644 --- a/tests/v2/waku_relay/test_wakunode_relay.nim +++ b/tests/v2/waku_relay/test_wakunode_relay.nim @@ -221,8 +221,9 @@ suite "WakuNode - Relay": await allFutures(nodes.mapIt(it.mountRelay())) # Connect nodes - let conn = await nodes[0].peerManager.dialPeer(nodes[1].switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec) - require conn.isSome + let connOk = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo()) + require: + connOk == true # Node 1 subscribes to topic nodes[1].subscribe(DefaultPubsubTopic) diff --git a/waku/v2/node/jsonrpc/admin/handlers.nim b/waku/v2/node/jsonrpc/admin/handlers.nim index aaa2c910c9..4ca5970e52 100644 --- a/waku/v2/node/jsonrpc/admin/handlers.nim +++ b/waku/v2/node/jsonrpc/admin/handlers.nim @@ -45,8 +45,8 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = debug "post_waku_v2_admin_v1_peers" for i, peer in peers: - let conn = await node.peerManager.dialPeer(parseRemotePeerInfo(peer), WakuRelayCodec, source="rpc") - if conn.isNone(): + let connOk = await node.peerManager.connectRelay(parseRemotePeerInfo(peer), source="rpc") + if not connOk: raise newException(ValueError, "Failed to connect to peer at index: " & $i & " " & $peer) return true diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index c354ca12a7..61d673e713 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -87,50 +87,116 @@ proc insertOrReplace(ps: PeerStorage, warn "failed to store peers", err = res.error waku_peers_errors.inc(labelValues = ["storage_failure"]) -proc dialPeer(pm: PeerManager, peerId: PeerID, - addrs: seq[MultiAddress], proto: string, - dialTimeout = DefaultDialTimeout, - source = "api", - ): Future[Option[Connection]] {.async.} = +proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) = + # Adds peer to manager for the specified protocol + + if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: + # Do not attempt to manage our unmanageable self + return + + # ...public key + var publicKey: PublicKey + discard remotePeerInfo.peerId.extractPublicKey(publicKey) + + if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and + pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey: + # Peer already managed + return + + trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs + + pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs + pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey + + # Add peer to storage. Entry will subsequently be updated with connectedness information + if not pm.storage.isNil: + pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected) + +# Connects to a given node. Note that this function uses `connect` and +# does not provide a protocol. Streams for relay (gossipsub) are created +# automatically without the needing to dial. +proc connectRelay*(pm: PeerManager, + peer: RemotePeerInfo, + dialTimeout = DefaultDialTimeout, + source = "api"): Future[bool] {.async.} = + + let peerId = peer.peerId # Do not attempt to dial self if peerId == pm.switch.peerInfo.peerId: - return none(Connection) + return false - let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] - debug "Dialing peer", wireAddr = addrs, peerId = peerId, failedAttempts=failedAttempts + if not pm.peerStore.hasPeer(peerId, WakuRelayCodec): + pm.addPeer(peer) - # Dial Peer - let dialFut = pm.switch.dial(peerId, addrs, proto) + let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] + debug "Connecting to relay peer", wireAddr=peer.addrs, peerId=peerId, failedAttempts=failedAttempts + var deadline = sleepAsync(dialTimeout) + var workfut = pm.switch.connect(peerId, peer.addrs) var reasonFailed = "" + try: - if (await dialFut.withTimeout(dialTimeout)): + await workfut or deadline + if workfut.finished(): + if not deadline.finished(): + deadline.cancel() waku_peers_dials.inc(labelValues = ["successful"]) - # TODO: This will be populated from the peerstore info when ready waku_node_conns_initiated.inc(labelValues = [source]) pm.peerStore[NumberFailedConnBook][peerId] = 0 - return some(dialFut.read()) + return true else: - reasonFailed = "timeout" - await cancelAndWait(dialFut) + reasonFailed = "timed out" + await cancelAndWait(workfut) except CatchableError as exc: - reasonFailed = "failed" + reasonFailed = "remote peer failed" # Dial failed pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1 pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) pm.peerStore[ConnectionBook][peerId] = CannotConnect - debug "Dialing peer failed", + debug "Connecting relay peer failed", peerId = peerId, reason = reasonFailed, failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] waku_peers_dials.inc(labelValues = [reasonFailed]) - # Update storage - if not pm.storage.isNil: - pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) + return false + +# Dialing should be used for just protocols that require a stream to write and read +# This shall not be used to dial Relay protocols, since that would create +# unneccesary unused streams. +proc dialPeer(pm: PeerManager, + peerId: PeerID, + addrs: seq[MultiAddress], + proto: string, + dialTimeout = DefaultDialTimeout, + source = "api"): Future[Option[Connection]] {.async.} = + + if peerId == pm.switch.peerInfo.peerId: + error "could not dial self" + return none(Connection) + + if proto == WakuRelayCodec: + error "dial shall not be used to connect to relays" + return none(Connection) + + debug "Dialing peer", wireAddr=addrs, peerId=peerId, proto=proto + + # Dial Peer + let dialFut = pm.switch.dial(peerId, addrs, proto) + var reasonFailed = "" + try: + if (await dialFut.withTimeout(dialTimeout)): + return some(dialFut.read()) + else: + reasonFailed = "timeout" + await cancelAndWait(dialFut) + except CatchableError as exc: + reasonFailed = "failed" + + debug "Dialing peer failed", peerId=peerId, reason=reasonFailed, proto=proto return none(Connection) @@ -255,31 +321,6 @@ proc new*(T: type PeerManager, # Manager interface # ##################### -proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo) = - # Adds peer to manager for the specified protocol - - if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: - # Do not attempt to manage our unmanageable self - return - - # ...public key - var publicKey: PublicKey - discard remotePeerInfo.peerId.extractPublicKey(publicKey) - - if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and - pm.peerStore[KeyBook][remotePeerInfo.peerId] == publicKey: - # Peer already managed - return - - trace "Adding peer to manager", peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs - - pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs - pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey - - # Add peer to storage. Entry will subsequently be updated with connectedness information - if not pm.storage.isNil: - pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected) - proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = # Do not add relay peers if proto == WakuRelayCodec: @@ -303,16 +344,16 @@ proc reconnectPeers*(pm: PeerManager, debug "Reconnecting peers", proto=proto # Proto is not persisted, we need to iterate over all peers. - for storedInfo in pm.peerStore.peers(protocolMatcher(proto)): + for peerInfo in pm.peerStore.peers(protocolMatcher(proto)): # Check that the peer can be connected - if storedInfo.connectedness == CannotConnect: - debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId + if peerInfo.connectedness == CannotConnect: + debug "Not reconnecting to unreachable or non-existing peer", peerId=peerInfo.peerId continue # Respect optional backoff period where applicable. let # TODO: Add method to peerStore (eg isBackoffExpired()) - disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert + disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect @@ -320,12 +361,11 @@ proc reconnectPeers*(pm: PeerManager, # TODO: This blocks the whole function. Try to connect to another peer in the meantime. if backoffTime > ZeroDuration: - debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime + debug "Backing off before reconnect...", peerId=peerInfo.peerId, backoffTime=backoffTime # We disconnected recently and still need to wait for a backoff period before connecting await sleepAsync(backoffTime) - trace "Reconnecting to peer", peerId= $storedInfo.peerId - discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto) + discard await pm.connectRelay(peerInfo) #################### # Dialer interface # @@ -362,7 +402,6 @@ proc dialPeer*(pm: PeerManager, proc connectToNodes*(pm: PeerManager, nodes: seq[string]|seq[RemotePeerInfo], - proto: string, dialTimeout = DefaultDialTimeout, source = "api") {.async.} = if nodes.len == 0: @@ -370,14 +409,14 @@ proc connectToNodes*(pm: PeerManager, info "Dialing multiple peers", numOfPeers = nodes.len - var futConns: seq[Future[Option[Connection]]] + var futConns: seq[Future[bool]] for node in nodes: let node = when node is string: parseRemotePeerInfo(node) else: node - futConns.add(pm.dialPeer(RemotePeerInfo(node), proto, dialTimeout, source)) + futConns.add(pm.connectRelay(node)) await allFutures(futConns) - let successfulConns = futConns.mapIt(it.read()).countIt(it.isSome) + let successfulConns = futConns.mapIt(it.read()).countIt(true) info "Finished dialing multiple peers", successfulConns=successfulConns, attempted=nodes.len @@ -414,7 +453,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} = notConnectedPeers = notConnectedPeers.len, outsideBackoffPeers = outsideBackoffPeers.len - await pm.connectToNodes(outsideBackoffPeers[0..