Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dos protected topic relay msgs based on meta field #1614

Merged
merged 2 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import
../../waku/v2/protocol/waku_enr,
../../waku/v2/protocol/waku_dnsdisc,
../../waku/v2/protocol/waku_discv5,
../../waku/v2/protocol/waku_message/topics/pubsub_topic,
../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/protocol/waku_relay/validators,
../../waku/v2/utils/peers,
./wakunode2_setup_rest,
./wakunode2_setup_rpc,
Expand Down Expand Up @@ -385,6 +387,13 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
except CatchableError:
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())

# TODO: Get this from cli
var topicsPublicKeys = initTable[string, SkPublicKey]()
# Add validation keys to protected topics
for topic, publicKey in topicsPublicKeys.pairs:
info "routing only signed traffic", topic=topic, publicKey=publicKey
node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topic), publicKey)


# Keepalive mounted on all nodes
try:
Expand Down
244 changes: 242 additions & 2 deletions tests/v2/waku_relay/test_wakunode_relay.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/os,
std/[os, sequtils, sysrand, math],
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
Expand All @@ -14,14 +14,18 @@ import
libp2p/switch,
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub
libp2p/protocols/pubsub/gossipsub,
libp2p/multihash,
secp256k1
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/peer_manager,
../../waku/v2/utils/peers,
../../waku/v2/node/waku_node,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_relay/validators,
../testlib/testutils,
../testlib/common,
../testlib/waku2

template sourceDir: string = currentSourcePath.parentDir()
Expand Down Expand Up @@ -244,6 +248,242 @@ suite "WakuNode - Relay":

await allFutures(nodes.mapIt(it.stop()))

# TODO: Test multiple protected topics

asyncTest "Spam protected topic accepts signed messages":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable

# Start all the nodes and mount relay with protected topic
await allFutures(nodes.mapIt(it.start()))

# Mount relay for all nodes
await allFutures(nodes.mapIt(it.mountRelay()))

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)

# Connect the nodes in a full mesh
for i in 0..<5:
for j in 0..<5:
if i == j:
continue
let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
require connOk

var msgReceived = 0
proc handler(pubsubTopic: PubsubTopic, data: WakuMessage) {.async, gcsafe.} =
msgReceived += 1

# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)

# Each node publishes 10 signed messages
for i in 0..<5:
for j in 0..<10:
var msg = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)

# Include signature
msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]

await nodes[i].publish(spamProtectedTopic, msg)

# Wait for gossip
await sleepAsync(500.millis)
alrevuelta marked this conversation as resolved.
Show resolved Hide resolved

# 50 messages were sent to 5 peers = 250 messages
check:
msgReceived == 250

# No invalid messages were received by any peer
for i in 0..<5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
check:
v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0

# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))

asyncTest "Spam protected topic rejects non-signed and wrongly-signed messages":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable

# Non whitelisted secret key
let wrongSecretKey = SkSecretKey.fromHex("32ad0cc8edeb9f8a3e8635c5fe5bd200b9247a33da5e7171bd012691805151f3").expect("valid key")

# Start all the nodes and mount relay with protected topic
await allFutures(nodes.mapIt(it.start()))

# Mount relay with spam protected topics
await allFutures(nodes.mapIt(it.mountRelay()))

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)

# Connect the nodes in a full mesh
for i in 0..<5:
for j in 0..<5:
if i == j:
continue
let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
require connOk

var msgReceived = 0
proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
msgReceived += 1

# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)

# Each node sends 10 messages, signed but with a non-whitelisted key (total = 50)
for i in 0..<5:
for j in 0..<10:
var msg = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)

# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]

await nodes[i].publish(spamProtectedTopic, msg)

# Each node sends 10 messages that are not signed (total = 50)
for i in 0..<5:
for j in 0..<10:
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage)

# Wait for gossip
await sleepAsync(500.millis)

# Since we have a full mesh with 5 nodes and each one publishes 50+50 msgs
# there are 500 messages being sent.
# 100 are received ok in the handler (first hop)
# 400 are are wrong so rejected (rejected not relayed)
check:
msgReceived == 100

var msgRejected = 0
for i in 0..<5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int

check:
msgRejected == 400

await allFutures(nodes.mapIt(it.stop()))

asyncTest "Spam protected topic rejects a spammer node":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable

# Non whitelisted secret key
let wrongSecretKey = SkSecretKey.fromHex("32ad0cc8edeb9f8a3e8635c5fe5bd200b9247a33da5e7171bd012691805151f3").expect("valid key")

# Start all the nodes and mount relay with protected topic
await allFutures(nodes.mapIt(it.start()))

# Mount relay for all nodes
await allFutures(nodes.mapIt(it.mountRelay()))

var msgReceived = 0
proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
msgReceived += 1

# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)

# nodes[0] is connected only to nodes[1]
let connOk1 = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo())
require connOk1

# rest of nodes[1..4] are connected in a full mesh
for i in 1..<5:
for j in 1..<5:
if i == j:
continue
let connOk2 = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
require connOk2

await sleepAsync(500.millis)

# nodes[0] spams 50 non signed messages (nodes[0] just knows of nodes[1])
for j in 0..<50:
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[0].publish(spamProtectedTopic, unsignedMessage)

# nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1])
for j in 0..<50:
var msg = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[0].publish(spamProtectedTopic, msg)

# Wait for gossip
await sleepAsync(500.millis)

# only 100 messages are received (50 + 50) which demonstrate
# nodes[1] doest gossip invalid messages.
check:
msgReceived == 100

# peer1 got invalid messages from peer0
let p0Id = nodes[0].peerInfo.peerId
check:
nodes[1].wakuRelay.peerStats[p0Id].topicInfos[spamProtectedTopic].invalidMessageDeliveries == 100.0

# peer1 did not gossip further, so no other node rx invalid messages
for i in 0..<5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
if k == p0Id and i == 1:
continue
check:
v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0

# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))

asyncTest "Messages are relayed between two websocket nodes":
let
nodeKey1 = generateSecp256k1Key()
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ proc mountRelay*(node: WakuNode,
triggerSelf = triggerSelf
)
if initRes.isErr():
error "failed mountin relay protocol", error=initRes.error
error "failed mounting relay protocol", error=initRes.error
return

node.wakuRelay = initRes.value
Expand Down
51 changes: 51 additions & 0 deletions waku/v2/protocol/waku_relay/validators.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
chronicles,
chronos,
stew/byteutils,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/errors,
nimcrypto/sha2,
secp256k1

import
./protocol,
../waku_message

# Application level message hash
proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
var ctx: sha256
ctx.init()
defer: ctx.clear()

ctx.update(pubsubTopic.toBytes())
ctx.update(msg.payload)
ctx.update(msg.contentTopic.toBytes())

# TODO: Other fields?

return ctx.finish()

proc addSignedTopicValidator*(w: WakuRelay, topic: PubsubTopic, publicTopicKey: SkPublicKey) =
debug "adding validator to signed topic", topic=topic, publicTopicKey=publicTopicKey

proc validator(topic: string, message: messages.Message): Future[errors.ValidationResult] {.async.} =
let msg = WakuMessage.decode(message.data)
if msg.isOk():
let msgHash = SkMessage(topic.msgHash(msg.get))
let recoveredSignature = SkSignature.fromRaw(msg.get.meta)
if recoveredSignature.isErr():
# TODO: add metrics for accept/reject
return errors.ValidationResult.Reject
if recoveredSignature.get.verify(msgHash, publicTopicKey):
return errors.ValidationResult.Accept
else:
return errors.ValidationResult.Reject
return errors.ValidationResult.Reject

w.addValidator(topic, validator)