Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version #2080

Merged
merged 5 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 143 additions & 80 deletions apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,71 @@ logScope:
topics = "networkmonitor"

const ReconnectTime = 60
const MaxConnectionRetries = 10
const MaxConnectionRetries = 5
const ResetRetriesAfter = 1200
const AvgPingWindow = 10.0

const git_version* {.strdefine.} = "n/a"

proc setDiscoveredPeersCapabilities(
routingTableNodes: seq[Node]) =
for capability in @[Relay, Store, Filter, Lightpush]:
let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability))
info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability
peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])
networkmonitor_peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])

proc analyzePeer(
customPeerInfo: CustomPeerInfoRef,
peerInfo: RemotePeerInfo,
node: WakuNode,
timeout: chronos.Duration
): Future[Result[string, string]] {.async.} =
var pingDelay: chronos.Duration

proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
try:
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
pingDelay = await node.libp2pPing.ping(conn)
return ok()

except CatchableError:
var msg = getCurrentExceptionMsg()
if msg == "Future operation cancelled!":
msg = "timedout"
warn "failed to ping the peer", peer=peerInfo, err=msg

customPeerInfo.connError = msg
return err("could not ping peer: " & msg)

let timedOut = not await ping().withTimeout(timeout)
# need this check for pingDelat == 0 because there may be a conn error before timeout
if timedOut or pingDelay == 0.millis:
customPeerInfo.retries += 1
return err(customPeerInfo.connError)

customPeerInfo.connError = ""
info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
networkmonitor_peer_ping.observe(pingDelay.millis)

if customPeerInfo.avgPingDuration == 0.millis:
customPeerInfo.avgPingDuration = pingDelay

# TODO: check why the calculation ends up losing precision
customPeerInfo.avgPingDuration = int64((float64(customPeerInfo.avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
customPeerInfo.lastPingDuration = pingDelay

return ok(customPeerInfo.peerId)

proc shouldReconnect(customPeerInfo: CustomPeerInfoRef): bool =
let reconnetIntervalCheck = getTime().toUnix() >= customPeerInfo.lastTimeConnected + ReconnectTime
var retriesCheck = customPeerInfo.retries < MaxConnectionRetries

if not retriesCheck and getTime().toUnix() >= customPeerInfo.lastTimeConnected + ResetRetriesAfter:
customPeerInfo.retries = 0
retriesCheck = true
info "resetting retries counter", peerId=customPeerInfo.peerId

return reconnetIntervalCheck and retriesCheck

# TODO: Split in discover, connect
proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
Expand All @@ -54,11 +110,10 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],

let currentTime = getTime().toUnix()

# Protocols and agent string and its count
var allProtocols: Table[string, int]
var allAgentStrings: Table[string, int]

var newPeers = 0
var successfulConnections = 0

var analyzeFuts: seq[Future[Result[string, string]]]

# iterate all newly discovered nodes
for discNode in discoveredNodes:
Expand All @@ -80,103 +135,104 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],

# create new entry if new peerId found
let peerId = $peerInfo.peerId
let customPeerInfo = CustomPeerInfo(peerId: peerId)

if not allPeers.hasKey(peerId):
allPeers[peerId] = customPeerInfo
allPeers[peerId] = CustomPeerInfoRef(peerId: peerId)
newPeers += 1
else:
info "already seen", peerId=peerId

allPeers[peerId].lastTimeDiscovered = currentTime
allPeers[peerId].enr = discNode.record.toURI()
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
allPeers[peerId].discovered += 1
let customPeerInfo = allPeers[peerId]

customPeerInfo.lastTimeDiscovered = currentTime
customPeerInfo.enr = discNode.record.toURI()
customPeerInfo.enrCapabilities = discNode.record.getCapabilities().mapIt($it)
customPeerInfo.discovered += 1

if not typedRecord.get().ip.isSome():
warn "ip field is not set", record=typedRecord.get()
continue

let ip = $typedRecord.get().ip.get().join(".")
allPeers[peerId].ip = ip
customPeerInfo.ip = ip

# try to ping the peer
if getTime().toUnix() >= allPeers[peerId].lastTimeConnected + ReconnectTime and allPeers[peerId].retries < MaxConnectionRetries:
if allPeers[peerId].retries > 0:
warn "trying to dial failed peer again", peerId=peerId, retry=allPeers[peerId].retries

var pingDelay:chronos.Duration

proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
try:
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
pingDelay = await node.libp2pPing.ping(conn)
return ok()

except CatchableError:
var msg = getCurrentExceptionMsg()
if msg == "Future operation cancelled!":
msg = "timedout"
warn "failed to ping the peer", peer=peerInfo, err=msg

allPeers[peerId].connError = msg
return err("could not ping peer: " & msg)

let timedOut = not await ping().withTimeout(timeout)
# need this check for pingDelat == 0 because there may be a conn error before timeout
if timedOut or pingDelay == 0.millis:
allPeers[peerId].retries += 1
continue

info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
peer_ping.observe(pingDelay.millis)

if allPeers[peerId].avgPingDuration == 0.millis:
allPeers[peerId].avgPingDuration = pingDelay

# TODO: check why the calculation ends up losing precision
allPeers[peerId].avgPingDuration = int64((float64(allPeers[peerId].avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
allPeers[peerId].lastPingDuration = pingDelay

# after connection, get supported protocols
let lp2pPeerStore = node.switch.peerStore
let nodeProtocols = lp2pPeerStore[ProtoBook][peerInfo.peerId]
allPeers[peerId].supportedProtocols = nodeProtocols
allPeers[peerId].lastTimeConnected = currentTime

# after connection, get user-agent
let nodeUserAgent = lp2pPeerStore[AgentBook][peerInfo.peerId]
allPeers[peerId].userAgent = nodeUserAgent

# store avaiable protocols in the network
for protocol in nodeProtocols:
if not allProtocols.hasKey(protocol):
allProtocols[protocol] = 0
allProtocols[protocol] += 1

# store available user-agents in the network
if not allAgentStrings.hasKey(nodeUserAgent):
allAgentStrings[nodeUserAgent] = 0
allAgentStrings[nodeUserAgent] += 1

debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
if shouldReconnect(customPeerInfo):
if customPeerInfo.retries > 0:
warn "trying to dial failed peer again", peerId=peerId, retry=customPeerInfo.retries
analyzeFuts.add(analyzePeer(customPeerInfo, peerInfo, node, timeout))

# Wait for all connection attempts to finish
let analyzedPeers = await allFinished(analyzeFuts)

for peerIdFut in analyzedPeers:
let peerIdRes = await peerIdFut
let peerIdStr = peerIdRes.valueOr():
continue

successfulConnections += 1
let peerId = PeerId.init(peerIdStr).valueOr():
warn "failed to parse peerId", peerId=peerIdStr
continue
var customPeerInfo = allPeers[peerIdStr]

debug "connected to peer", peer=customPeerInfo[]

# after connection, get supported protocols
let lp2pPeerStore = node.switch.peerStore
let nodeProtocols = lp2pPeerStore[ProtoBook][peerId]
customPeerInfo.supportedProtocols = nodeProtocols
customPeerInfo.lastTimeConnected = currentTime

# after connection, get user-agent
let nodeUserAgent = lp2pPeerStore[AgentBook][peerId]
customPeerInfo.userAgent = nodeUserAgent

info "number of newly discovered peers", amount=newPeers
# inform the total connections that we did in this round
let nOfOkConnections = allProtocols.len()
info "number of successful connections", amount=nOfOkConnections
info "number of successful connections", amount=successfulConnections

proc updateMetrics(allPeersRef: CustomPeersTableRef) {.gcsafe.} =
var allProtocols: Table[string, int]
var allAgentStrings: Table[string, int]
var countries: Table[string, int]
var connectedPeers = 0
var failedPeers = 0

for peerInfo in allPeersRef.values:
if peerInfo.connError == "":
for protocol in peerInfo.supportedProtocols:
allProtocols[protocol] = allProtocols.mgetOrPut(protocol, 0) + 1

# store available user-agents in the network
allAgentStrings[peerInfo.userAgent] = allAgentStrings.mgetOrPut(peerInfo.userAgent, 0) + 1

if peerInfo.country != "":
countries[peerInfo.country] = countries.mgetOrPut(peerInfo.country, 0) + 1

# update count on each protocol
connectedPeers += 1
else:
failedPeers += 1

networkmonitor_peer_count.set(int64(connectedPeers), labelValues = ["true"])
networkmonitor_peer_count.set(int64(failedPeers), labelValues = ["false"])
# update count on each protocol
for protocol in allProtocols.keys():
let countOfProtocols = allProtocols[protocol]
peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
let countOfProtocols = allProtocols.mgetOrPut(protocol, 0)
networkmonitor_peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
info "supported protocols in the network", protocol=protocol, count=countOfProtocols

# update count on each user-agent
for userAgent in allAgentStrings.keys():
let countOfUserAgent = allAgentStrings[userAgent]
peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
let countOfUserAgent = allAgentStrings.mgetOrPut(userAgent, 0)
networkmonitor_peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent

for country in countries.keys():
let peerCount = countries.mgetOrPut(country, 0)
networkmonitor_peer_country_count.set(int64(peerCount), labelValues = [country])
info "number of peers per country", country=country, count=peerCount

proc populateInfoFromIp(allPeersRef: CustomPeersTableRef,
restClient: RestClientRef) {.async.} =
for peer in allPeersRef.keys():
Expand Down Expand Up @@ -209,6 +265,7 @@ proc crawlNetwork(node: WakuNode,

let crawlInterval = conf.refreshInterval * 1000
while true:
let startTime = Moment.now()
# discover new random nodes
let discoveredNodes = await wakuDiscv5.protocol.queryRandom()

Expand All @@ -223,6 +280,8 @@ proc crawlNetwork(node: WakuNode,
# note random discovered nodes can be already known
await setConnectedPeersMetrics(discoveredNodes, node, conf.timeout, restClient, allPeersRef)

updateMetrics(allPeersRef)

# populate info from ip addresses
await populateInfoFromIp(allPeersRef, restClient)

Expand All @@ -234,8 +293,12 @@ proc crawlNetwork(node: WakuNode,
# Notes:
# we dont run ipMajorityLoop
# we dont run revalidateLoop
let endTime = Moment.now()
let elapsed = (endTime - startTime).nanos

info "crawl duration", time=elapsed.millis

await sleepAsync(crawlInterval.millis)
await sleepAsync(crawlInterval.millis - elapsed.millis)

proc retrieveDynamicBootstrapNodes(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): Result[seq[RemotePeerInfo], string] =
if dnsDiscovery and dnsDiscoveryUrl != "":
Expand Down
3 changes: 1 addition & 2 deletions apps/networkmonitor/networkmonitor_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type
defaultValue: 8009,
name: "metrics-rest-port" }: uint16


proc parseCmdArg*(T: type ValidIpAddress, p: string): T =
try:
result = ValidIpAddress.init(p)
Expand All @@ -85,7 +84,7 @@ proc completeCmdArg*(T: type chronos.Duration, val: string): seq[string] =

proc loadConfig*(T: type NetworkMonitorConf): Result[T, string] =
try:
let conf = NetworkMonitorConf.load()
let conf = NetworkMonitorConf.load(version=git_version)
ok(conf)
except CatchableError:
err(getCurrentExceptionMsg())
20 changes: 15 additions & 5 deletions apps/networkmonitor/networkmonitor_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,30 @@ logScope:
#discovery_message_requests_outgoing_total{response=""}
#discovery_message_requests_outgoing_total{response="no_response"}

declarePublicGauge peer_type_as_per_enr,
declarePublicGauge networkmonitor_peer_type_as_per_enr,
"Number of peers supporting each capability according the the ENR",
labels = ["capability"]

declarePublicGauge peer_type_as_per_protocol,
declarePublicGauge networkmonitor_peer_type_as_per_protocol,
"Number of peers supporting each protocol, after a successful connection) ",
labels = ["protocols"]

declarePublicGauge peer_user_agents,
declarePublicGauge networkmonitor_peer_user_agents,
"Number of peers with each user agent",
labels = ["user_agent"]

declarePublicHistogram peer_ping,
declarePublicHistogram networkmonitor_peer_ping,
"Histogram tracking ping durations for discovered peers",
buckets = [100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0, Inf]

declarePublicGauge networkmonitor_peer_count,
"Number of discovered peers",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this "discovered"? based on the code i think its something like "attempted to connect"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it track number of all discovered peers, but distinguishes between successful and unsuccessful connections based on connected label

labels = ["connected"]

declarePublicGauge networkmonitor_peer_country_count,
"Number of peers per country",
labels = ["country"]

type
CustomPeerInfo* = object
# populated after discovery
Expand All @@ -64,8 +72,10 @@ type
# only after a ok/nok connection
connError*: string

CustomPeerInfoRef* = ref CustomPeerInfo

# Stores information about all discovered/connected peers
CustomPeersTableRef* = TableRef[string, CustomPeerInfo]
CustomPeersTableRef* = TableRef[string, CustomPeerInfoRef]

# stores the content topic and the count of rx messages
ContentTopicMessageTableRef* = TableRef[string, int]
Expand Down
Loading