Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: close px connections after resp #1746

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion tests/v2/test_waku_peer_exchange.nim
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
{.used.}

import
std/[options, sequtils],
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/muxers/muxer,
eth/keys,
eth/p2p/discoveryv5/enr
import
Expand All @@ -18,6 +20,7 @@ import
../../waku/v2/waku_peer_exchange,
../../waku/v2/waku_peer_exchange/rpc,
../../waku/v2/waku_peer_exchange/rpc_codec,
../../waku/v2/waku_peer_exchange/protocol,
./testlib/wakucore,
./testlib/wakunode

Expand Down Expand Up @@ -259,3 +262,25 @@ procSuite "Waku Peer Exchange":

# Check that it failed gracefully
check: response.isErr


asyncTest "connections are closed after response is sent":
# Create 3 nodes
let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountPeerExchange()))

# Multiple nodes request to node 0
for i in 1..<3:
let resp = await nodes[i].wakuPeerExchange.request(2, nodes[0].switch.peerInfo.toRemotePeerInfo())
require resp.isOk

# Wait for streams to be closed
await sleepAsync(1.seconds)

# Check that all streams are closed for px
check:
nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
12 changes: 11 additions & 1 deletion waku/v2/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,19 @@ proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future
request: PeerExchangeRequest(numPeers: numPeers))

var buffer: seq[byte]
var error: string
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return err("write/read failed: " & $exc.msg)
error = $exc.msg
finally:
# close, no more data is expected
await conn.closeWithEof()

if error.len > 0:
return err("write/read failed: " & error)

let decodedBuff = PeerExchangeRpc.decode(buffer)
if decodedBuff.isErr():
Expand Down Expand Up @@ -155,6 +162,9 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
else:
waku_px_peers_sent.inc(enrs.len().int64())

# close, no data is expected
await conn.closeWithEof()

wpx.handler = handler
wpx.codec = WakuPeerExchangeCodec

Expand Down