Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: logging received message info via onValidated observer #2973

Merged
merged 5 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =

proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msg_hash = topic.computeMessageHash(msg).to0xHex()

notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

let msgSizeKB = msg.payload.len / 1000

waku_node_messages.inc(labelValues = ["relay"])
Expand Down
6 changes: 6 additions & 0 deletions waku/waku_api/rest/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ proc installRelayApiHandlers*(
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
return RestApiResponse.badRequest("Failed to publish: " & error)

# Log for message tracking purposes
logMessageInfo(node.wakuRelay, "rest", pubsubTopic, "none", message, onRecv = true)

# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message",
pubSubTopic = pubSubTopic, rln = not node.wakuRlnRelay.isNil()
Expand Down Expand Up @@ -272,6 +275,9 @@ proc installRelayApiHandlers*(
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
return RestApiResponse.badRequest("Failed to publish: " & error)

# Log for message tracking purposes
logMessageInfo(node.wakuRelay, "rest", pubsubTopic, "none", message, onRecv = true)

# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message",
contentTopic = message.contentTopic, rln = not node.wakuRlnRelay.isNil()
Expand Down
73 changes: 54 additions & 19 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,36 @@ proc initProtocolHandler(w: WakuRelay) =
w.handler = handler
w.codec = WakuRelayCodec

proc initRelayMetricObserver(w: WakuRelay) =
proc logMessageInfo*(
w: WakuRelay,
peerId: string,
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
topic: string,
msg_id_short: string,
msg: WakuMessage,
onRecv: bool,
) =
let msg_hash = computeMessageHash(topic, msg).to0xHex()

if onRecv:
notice "received relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = peerId,
topic = topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
else:
notice "sent relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peerId,
topic = topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

proc initRelayObservers(w: WakuRelay) =
proc decodeRpcMessageInfo(
peer: PubSubPeer, msg: Message
): Result[
Expand Down Expand Up @@ -179,20 +208,6 @@ proc initRelayMetricObserver(w: WakuRelay) =
let msgSize = msg.data.len + msg.topic.len
return ok((msg_id_short, msg.topic, wakuMessage, msgSize))

proc logMessageInfo(
peer: PubSubPeer, topic: string, msg_id_short: string, msg: WakuMessage
) =
let msg_hash = computeMessageHash(topic, msg).to0xHex()

notice "sent relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peer.peerId,
topic = topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

proc updateMetrics(
peer: PubSubPeer,
pubsub_topic: string,
Expand All @@ -208,18 +223,38 @@ proc initRelayMetricObserver(w: WakuRelay) =
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
continue
# message receive log happens in treaceHandler as this one is called before checks
# message receive log happens in onValidated observer as onRecv is called before checks
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true)
discard

proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
let msg_id_short = shortLog(msgId)
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "onValidated: failed decoding to Waku Message",
my_peer_id = w.switch.peerInfo.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
pubsub_topic = msg.topic,
error = $error
return

logMessageInfo(
w, shortLog(peer.peerId), msg.topic, msg_id_short, wakuMessage, onRecv = true
)

proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
warn "onSend: failed decoding RPC info",
my_peer_id = w.switch.peerInfo.peerId, to_peer_id = peer.peerId
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
continue
logMessageInfo(peer, topic, msg_id_short, wakuMessage)
logMessageInfo(
w, shortLog(peer.peerId), topic, msg_id_short, wakuMessage, onRecv = false
)
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false)

let administrativeObserver = PubSubObserver(onRecv: onRecv, onSend: onSend)
let administrativeObserver =
PubSubObserver(onRecv: onRecv, onSend: onSend, onValidated: onValidated)

w.addObserver(administrativeObserver)

Expand All @@ -243,7 +278,7 @@ proc new*(

procCall GossipSub(w).initPubSub()
w.initProtocolHandler()
w.initRelayMetricObserver()
w.initRelayObservers()
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())

Expand Down
Loading