Skip to content

Commit

Permalink
chore(networkmonitor): refactor setConnectedPeersMetrics, make it par…
Browse files Browse the repository at this point in the history
…tially concurrent, add version
  • Loading branch information
vpavlin committed Sep 26, 2023
1 parent 3264a4f commit 7fd66c0
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 70 deletions.
173 changes: 106 additions & 67 deletions apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,71 @@ logScope:
topics = "networkmonitor"

const ReconnectTime = 60
const MaxConnectionRetries = 10
const MaxConnectionRetries = 5
const ResetRetriesAfter = 1200
const AvgPingWindow = 10.0

const git_version* {.strdefine.} = "n/a"

proc setDiscoveredPeersCapabilities(
routingTableNodes: seq[Node]) =
for capability in @[Relay, Store, Filter, Lightpush]:
let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability))
info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability
peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])

proc analyzePeer(
customPeerInfo: CustomPeerInfoRef,
peerInfo: RemotePeerInfo,
node: WakuNode,
timeout: chronos.Duration
): Future[Result[string, string]] {.async.} =
var pingDelay: chronos.Duration

proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
try:
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
pingDelay = await node.libp2pPing.ping(conn)
return ok()

except CatchableError:
var msg = getCurrentExceptionMsg()
if msg == "Future operation cancelled!":
msg = "timedout"
warn "failed to ping the peer", peer=peerInfo, err=msg

customPeerInfo.connError = msg
return err("could not ping peer: " & msg)

let timedOut = not await ping().withTimeout(timeout)
# need this check for pingDelat == 0 because there may be a conn error before timeout
if timedOut or pingDelay == 0.millis:
customPeerInfo.retries += 1
return err(customPeerInfo.connError)

info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
peer_ping.observe(pingDelay.millis)

if customPeerInfo.avgPingDuration == 0.millis:
customPeerInfo.avgPingDuration = pingDelay

# TODO: check why the calculation ends up losing precision
customPeerInfo.avgPingDuration = int64((float64(customPeerInfo.avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
customPeerInfo.lastPingDuration = pingDelay

return ok(customPeerInfo.peerId)

proc shouldReconnect(customPeerInfo: CustomPeerInfoRef): bool =
let reconnetIntervalCheck = getTime().toUnix() >= customPeerInfo.lastTimeConnected + ReconnectTime
var retriesCheck = customPeerInfo.retries < MaxConnectionRetries

if not retriesCheck and getTime().toUnix() >= customPeerInfo.lastTimeConnected + ResetRetriesAfter:
customPeerInfo.retries = 0
retriesCheck = true
info "resetting retries counter", peerId=customPeerInfo.peerId

return reconnetIntervalCheck and retriesCheck

# TODO: Split in discover, connect
proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
node: WakuNode,
Expand All @@ -60,6 +115,8 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],

var newPeers = 0

var analyzeFuts: seq[Future[Result[string, string]]]

# iterate all newly discovered nodes
for discNode in discoveredNodes:
let typedRecord = discNode.record.toTypedRecord()
Expand All @@ -80,85 +137,67 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],

# create new entry if new peerId found
let peerId = $peerInfo.peerId
let customPeerInfo = CustomPeerInfo(peerId: peerId)

if not allPeers.hasKey(peerId):
allPeers[peerId] = customPeerInfo
allPeers[peerId] = CustomPeerInfoRef(peerId: peerId)
newPeers += 1
else:
info "already seen", peerId=peerId

allPeers[peerId].lastTimeDiscovered = currentTime
allPeers[peerId].enr = discNode.record.toURI()
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
allPeers[peerId].discovered += 1
let customPeerInfo = allPeers[peerId]

customPeerInfo.lastTimeDiscovered = currentTime
customPeerInfo.enr = discNode.record.toURI()
customPeerInfo.enrCapabilities = discNode.record.getCapabilities().mapIt($it)
customPeerInfo.discovered += 1

if not typedRecord.get().ip.isSome():
warn "ip field is not set", record=typedRecord.get()
continue

let ip = $typedRecord.get().ip.get().join(".")
allPeers[peerId].ip = ip
customPeerInfo.ip = ip

# try to ping the peer
if getTime().toUnix() >= allPeers[peerId].lastTimeConnected + ReconnectTime and allPeers[peerId].retries < MaxConnectionRetries:
if allPeers[peerId].retries > 0:
warn "trying to dial failed peer again", peerId=peerId, retry=allPeers[peerId].retries

var pingDelay:chronos.Duration

proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
try:
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
pingDelay = await node.libp2pPing.ping(conn)
return ok()

except CatchableError:
var msg = getCurrentExceptionMsg()
if msg == "Future operation cancelled!":
msg = "timedout"
warn "failed to ping the peer", peer=peerInfo, err=msg

allPeers[peerId].connError = msg
return err("could not ping peer: " & msg)

let timedOut = not await ping().withTimeout(timeout)
# need this check for pingDelat == 0 because there may be a conn error before timeout
if timedOut or pingDelay == 0.millis:
allPeers[peerId].retries += 1
continue

info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
peer_ping.observe(pingDelay.millis)

if allPeers[peerId].avgPingDuration == 0.millis:
allPeers[peerId].avgPingDuration = pingDelay

# TODO: check why the calculation ends up losing precision
allPeers[peerId].avgPingDuration = int64((float64(allPeers[peerId].avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
allPeers[peerId].lastPingDuration = pingDelay

# after connection, get supported protocols
let lp2pPeerStore = node.switch.peerStore
let nodeProtocols = lp2pPeerStore[ProtoBook][peerInfo.peerId]
allPeers[peerId].supportedProtocols = nodeProtocols
allPeers[peerId].lastTimeConnected = currentTime

# after connection, get user-agent
let nodeUserAgent = lp2pPeerStore[AgentBook][peerInfo.peerId]
allPeers[peerId].userAgent = nodeUserAgent

# store avaiable protocols in the network
for protocol in nodeProtocols:
if not allProtocols.hasKey(protocol):
allProtocols[protocol] = 0
allProtocols[protocol] += 1

# store available user-agents in the network
if not allAgentStrings.hasKey(nodeUserAgent):
allAgentStrings[nodeUserAgent] = 0
allAgentStrings[nodeUserAgent] += 1

debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
if shouldReconnect(customPeerInfo):
if customPeerInfo.retries > 0:
warn "trying to dial failed peer again", peerId=peerId, retry=customPeerInfo.retries
analyzeFuts.add(analyzePeer(customPeerInfo, peerInfo, node, timeout))

# Wait for all connection attempts to finish
let analyzedPeers = await allFinished(analyzeFuts)

for peerIdFut in analyzedPeers:
let peerIdRes = await peerIdFut
let peerIdStr = peerIdRes.valueOr():
continue
let peerId = PeerId.init(peerIdStr).valueOr():
warn "failed to parse peerId", peerId=peerIdStr
continue
var customPeerInfo = allPeers[peerIdStr]

debug "connected to peer", peer=customPeerInfo[]

# after connection, get supported protocols
let lp2pPeerStore = node.switch.peerStore
let nodeProtocols = lp2pPeerStore[ProtoBook][peerId]
customPeerInfo.supportedProtocols = nodeProtocols
customPeerInfo.lastTimeConnected = currentTime

# after connection, get user-agent
let nodeUserAgent = lp2pPeerStore[AgentBook][peerId]
customPeerInfo.userAgent = nodeUserAgent

# store avaiable protocols in the network
for protocol in nodeProtocols:
if not allProtocols.hasKey(protocol):
allProtocols[protocol] = 0
allProtocols[protocol] += 1

# store available user-agents in the network
if not allAgentStrings.hasKey(nodeUserAgent):
allAgentStrings[nodeUserAgent] = 0
allAgentStrings[nodeUserAgent] += 1

info "number of newly discovered peers", amount=newPeers
# inform the total connections that we did in this round
Expand Down
3 changes: 1 addition & 2 deletions apps/networkmonitor/networkmonitor_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type
defaultValue: 8009,
name: "metrics-rest-port" }: uint16


proc parseCmdArg*(T: type ValidIpAddress, p: string): T =
try:
result = ValidIpAddress.init(p)
Expand All @@ -85,7 +84,7 @@ proc completeCmdArg*(T: type chronos.Duration, val: string): seq[string] =

proc loadConfig*(T: type NetworkMonitorConf): Result[T, string] =
try:
let conf = NetworkMonitorConf.load()
let conf = NetworkMonitorConf.load(version=git_version)
ok(conf)
except CatchableError:
err(getCurrentExceptionMsg())
4 changes: 3 additions & 1 deletion apps/networkmonitor/networkmonitor_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ type
# only after a ok/nok connection
connError*: string

CustomPeerInfoRef* = ref CustomPeerInfo

# Stores information about all discovered/connected peers
CustomPeersTableRef* = TableRef[string, CustomPeerInfo]
CustomPeersTableRef* = TableRef[string, CustomPeerInfoRef]

# stores the content topic and the count of rx messages
ContentTopicMessageTableRef* = TableRef[string, int]
Expand Down

0 comments on commit 7fd66c0

Please sign in to comment.