From 7fd66c0c0c2a850772b6eec6675c0ffff17d4b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Pavl=C3=ADn?= Date: Tue, 26 Sep 2023 14:53:22 +0200 Subject: [PATCH 1/4] chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version --- apps/networkmonitor/networkmonitor.nim | 173 +++++++++++------- apps/networkmonitor/networkmonitor_config.nim | 3 +- .../networkmonitor/networkmonitor_metrics.nim | 4 +- 3 files changed, 110 insertions(+), 70 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 3af9c5cee1..777be990d6 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -35,9 +35,12 @@ logScope: topics = "networkmonitor" const ReconnectTime = 60 -const MaxConnectionRetries = 10 +const MaxConnectionRetries = 5 +const ResetRetriesAfter = 1200 const AvgPingWindow = 10.0 +const git_version* {.strdefine.} = "n/a" + proc setDiscoveredPeersCapabilities( routingTableNodes: seq[Node]) = for capability in @[Relay, Store, Filter, Lightpush]: @@ -45,6 +48,58 @@ proc setDiscoveredPeersCapabilities( info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability]) +proc analyzePeer( + customPeerInfo: CustomPeerInfoRef, + peerInfo: RemotePeerInfo, + node: WakuNode, + timeout: chronos.Duration + ): Future[Result[string, string]] {.async.} = + var pingDelay: chronos.Duration + + proc ping(): Future[Result[void, string]] {.async, gcsafe.} = + try: + let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) + pingDelay = await node.libp2pPing.ping(conn) + return ok() + + except CatchableError: + var msg = getCurrentExceptionMsg() + if msg == "Future operation cancelled!": + msg = "timedout" + warn "failed to ping the peer", peer=peerInfo, err=msg + + customPeerInfo.connError = msg + return err("could not ping peer: " & msg) + + let timedOut = not await ping().withTimeout(timeout) + # need this check for pingDelat == 0 because there may be a conn error before timeout + if timedOut or pingDelay == 0.millis: + customPeerInfo.retries += 1 + return err(customPeerInfo.connError) + + info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis + peer_ping.observe(pingDelay.millis) + + if customPeerInfo.avgPingDuration == 0.millis: + customPeerInfo.avgPingDuration = pingDelay + + # TODO: check why the calculation ends up losing precision + customPeerInfo.avgPingDuration = int64((float64(customPeerInfo.avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis + customPeerInfo.lastPingDuration = pingDelay + + return ok(customPeerInfo.peerId) + +proc shouldReconnect(customPeerInfo: CustomPeerInfoRef): bool = + let reconnetIntervalCheck = getTime().toUnix() >= customPeerInfo.lastTimeConnected + ReconnectTime + var retriesCheck = customPeerInfo.retries < MaxConnectionRetries + + if not retriesCheck and getTime().toUnix() >= customPeerInfo.lastTimeConnected + ResetRetriesAfter: + customPeerInfo.retries = 0 + retriesCheck = true + info "resetting retries counter", peerId=customPeerInfo.peerId + + return reconnetIntervalCheck and retriesCheck + # TODO: Split in discover, connect proc setConnectedPeersMetrics(discoveredNodes: seq[Node], node: WakuNode, @@ -60,6 +115,8 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], var newPeers = 0 + var analyzeFuts: seq[Future[Result[string, string]]] + # iterate all newly discovered nodes for discNode in discoveredNodes: let typedRecord = discNode.record.toTypedRecord() @@ -80,85 +137,67 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], # create new entry if new peerId found let peerId = $peerInfo.peerId - let customPeerInfo = CustomPeerInfo(peerId: peerId) + if not allPeers.hasKey(peerId): - allPeers[peerId] = customPeerInfo + allPeers[peerId] = CustomPeerInfoRef(peerId: peerId) newPeers += 1 else: info "already seen", peerId=peerId - allPeers[peerId].lastTimeDiscovered = currentTime - allPeers[peerId].enr = discNode.record.toURI() - allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it) - allPeers[peerId].discovered += 1 + let customPeerInfo = allPeers[peerId] + + customPeerInfo.lastTimeDiscovered = currentTime + customPeerInfo.enr = discNode.record.toURI() + customPeerInfo.enrCapabilities = discNode.record.getCapabilities().mapIt($it) + customPeerInfo.discovered += 1 if not typedRecord.get().ip.isSome(): warn "ip field is not set", record=typedRecord.get() continue let ip = $typedRecord.get().ip.get().join(".") - allPeers[peerId].ip = ip + customPeerInfo.ip = ip # try to ping the peer - if getTime().toUnix() >= allPeers[peerId].lastTimeConnected + ReconnectTime and allPeers[peerId].retries < MaxConnectionRetries: - if allPeers[peerId].retries > 0: - warn "trying to dial failed peer again", peerId=peerId, retry=allPeers[peerId].retries - - var pingDelay:chronos.Duration - - proc ping(): Future[Result[void, string]] {.async, gcsafe.} = - try: - let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) - pingDelay = await node.libp2pPing.ping(conn) - return ok() - - except CatchableError: - var msg = getCurrentExceptionMsg() - if msg == "Future operation cancelled!": - msg = "timedout" - warn "failed to ping the peer", peer=peerInfo, err=msg - - allPeers[peerId].connError = msg - return err("could not ping peer: " & msg) - - let timedOut = not await ping().withTimeout(timeout) - # need this check for pingDelat == 0 because there may be a conn error before timeout - if timedOut or pingDelay == 0.millis: - allPeers[peerId].retries += 1 - continue - - info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis - peer_ping.observe(pingDelay.millis) - - if allPeers[peerId].avgPingDuration == 0.millis: - allPeers[peerId].avgPingDuration = pingDelay - - # TODO: check why the calculation ends up losing precision - allPeers[peerId].avgPingDuration = int64((float64(allPeers[peerId].avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis - allPeers[peerId].lastPingDuration = pingDelay - - # after connection, get supported protocols - let lp2pPeerStore = node.switch.peerStore - let nodeProtocols = lp2pPeerStore[ProtoBook][peerInfo.peerId] - allPeers[peerId].supportedProtocols = nodeProtocols - allPeers[peerId].lastTimeConnected = currentTime - - # after connection, get user-agent - let nodeUserAgent = lp2pPeerStore[AgentBook][peerInfo.peerId] - allPeers[peerId].userAgent = nodeUserAgent - - # store avaiable protocols in the network - for protocol in nodeProtocols: - if not allProtocols.hasKey(protocol): - allProtocols[protocol] = 0 - allProtocols[protocol] += 1 - - # store available user-agents in the network - if not allAgentStrings.hasKey(nodeUserAgent): - allAgentStrings[nodeUserAgent] = 0 - allAgentStrings[nodeUserAgent] += 1 - - debug "connected to peer", peer=allPeers[customPeerInfo.peerId] + if shouldReconnect(customPeerInfo): + if customPeerInfo.retries > 0: + warn "trying to dial failed peer again", peerId=peerId, retry=customPeerInfo.retries + analyzeFuts.add(analyzePeer(customPeerInfo, peerInfo, node, timeout)) + + # Wait for all connection attempts to finish + let analyzedPeers = await allFinished(analyzeFuts) + + for peerIdFut in analyzedPeers: + let peerIdRes = await peerIdFut + let peerIdStr = peerIdRes.valueOr(): + continue + let peerId = PeerId.init(peerIdStr).valueOr(): + warn "failed to parse peerId", peerId=peerIdStr + continue + var customPeerInfo = allPeers[peerIdStr] + + debug "connected to peer", peer=customPeerInfo[] + + # after connection, get supported protocols + let lp2pPeerStore = node.switch.peerStore + let nodeProtocols = lp2pPeerStore[ProtoBook][peerId] + customPeerInfo.supportedProtocols = nodeProtocols + customPeerInfo.lastTimeConnected = currentTime + + # after connection, get user-agent + let nodeUserAgent = lp2pPeerStore[AgentBook][peerId] + customPeerInfo.userAgent = nodeUserAgent + + # store avaiable protocols in the network + for protocol in nodeProtocols: + if not allProtocols.hasKey(protocol): + allProtocols[protocol] = 0 + allProtocols[protocol] += 1 + + # store available user-agents in the network + if not allAgentStrings.hasKey(nodeUserAgent): + allAgentStrings[nodeUserAgent] = 0 + allAgentStrings[nodeUserAgent] += 1 info "number of newly discovered peers", amount=newPeers # inform the total connections that we did in this round diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index 769c2264fb..ac5832568c 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -64,7 +64,6 @@ type defaultValue: 8009, name: "metrics-rest-port" }: uint16 - proc parseCmdArg*(T: type ValidIpAddress, p: string): T = try: result = ValidIpAddress.init(p) @@ -85,7 +84,7 @@ proc completeCmdArg*(T: type chronos.Duration, val: string): seq[string] = proc loadConfig*(T: type NetworkMonitorConf): Result[T, string] = try: - let conf = NetworkMonitorConf.load() + let conf = NetworkMonitorConf.load(version=git_version) ok(conf) except CatchableError: err(getCurrentExceptionMsg()) diff --git a/apps/networkmonitor/networkmonitor_metrics.nim b/apps/networkmonitor/networkmonitor_metrics.nim index e5905ba464..b030b69d28 100644 --- a/apps/networkmonitor/networkmonitor_metrics.nim +++ b/apps/networkmonitor/networkmonitor_metrics.nim @@ -64,8 +64,10 @@ type # only after a ok/nok connection connError*: string + CustomPeerInfoRef* = ref CustomPeerInfo + # Stores information about all discovered/connected peers - CustomPeersTableRef* = TableRef[string, CustomPeerInfo] + CustomPeersTableRef* = TableRef[string, CustomPeerInfoRef] # stores the content topic and the count of rx messages ContentTopicMessageTableRef* = TableRef[string, int] From 8d52210081ef908294c77ddce7353e3adc37583e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Pavl=C3=ADn?= Date: Wed, 27 Sep 2023 14:23:34 +0200 Subject: [PATCH 2/4] add more metrics, refactor how most metrics are calculated --- apps/networkmonitor/networkmonitor.nim | 74 +++++++++++++------ .../networkmonitor/networkmonitor_metrics.nim | 16 +++- 2 files changed, 63 insertions(+), 27 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 777be990d6..1668336c8e 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -46,7 +46,7 @@ proc setDiscoveredPeersCapabilities( for capability in @[Relay, Store, Filter, Lightpush]: let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability)) info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability - peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability]) + networkmonitor_peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability]) proc analyzePeer( customPeerInfo: CustomPeerInfoRef, @@ -78,7 +78,7 @@ proc analyzePeer( return err(customPeerInfo.connError) info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis - peer_ping.observe(pingDelay.millis) + networkmonitor_peer_ping.observe(pingDelay.millis) if customPeerInfo.avgPingDuration == 0.millis: customPeerInfo.avgPingDuration = pingDelay @@ -109,11 +109,8 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], let currentTime = getTime().toUnix() - # Protocols and agent string and its count - var allProtocols: Table[string, int] - var allAgentStrings: Table[string, int] - var newPeers = 0 + var successfulConnections = 0 var analyzeFuts: seq[Future[Result[string, string]]] @@ -171,6 +168,8 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], let peerIdRes = await peerIdFut let peerIdStr = peerIdRes.valueOr(): continue + + successfulConnections += 1 let peerId = PeerId.init(peerIdStr).valueOr(): warn "failed to parse peerId", peerId=peerIdStr continue @@ -188,34 +187,56 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], let nodeUserAgent = lp2pPeerStore[AgentBook][peerId] customPeerInfo.userAgent = nodeUserAgent - # store avaiable protocols in the network - for protocol in nodeProtocols: - if not allProtocols.hasKey(protocol): - allProtocols[protocol] = 0 - allProtocols[protocol] += 1 - - # store available user-agents in the network - if not allAgentStrings.hasKey(nodeUserAgent): - allAgentStrings[nodeUserAgent] = 0 - allAgentStrings[nodeUserAgent] += 1 - info "number of newly discovered peers", amount=newPeers # inform the total connections that we did in this round - let nOfOkConnections = allProtocols.len() - info "number of successful connections", amount=nOfOkConnections + info "number of successful connections", amount=successfulConnections - # update count on each protocol +proc updateMetrics(allPeersRef: CustomPeersTableRef) {.gcsafe, raises: [KeyError].} = + var allProtocols: Table[string, int] + var allAgentStrings: Table[string, int] + var countries: Table[string, int] + var connectedPeers = 0 + var failedPeers = 0 + + for peerInfo in allPeersRef.values: + if peerInfo.connError == "": + for protocol in peerInfo.supportedProtocols: + if not allProtocols.hasKey(protocol): + allProtocols[protocol] = 0 + allProtocols[protocol] += 1 + + # store available user-agents in the network + if not allAgentStrings.hasKey(peerInfo.userAgent): + allAgentStrings[peerInfo.userAgent] = 0 + allAgentStrings[peerInfo.userAgent] += 1 + if peerInfo.country != "": + if not countries.hasKey(peerInfo.country): + countries[peerInfo.country] = 0 + countries[peerInfo.country] += 1 + + connectedPeers += 1 + else: + failedPeers += 1 + + networkmonitor_peer_count.set(int64(connectedPeers), labelValues = ["true"]) + networkmonitor_peer_count.set(int64(failedPeers), labelValues = ["false"]) + # update count on each protocol for protocol in allProtocols.keys(): let countOfProtocols = allProtocols[protocol] - peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol]) + networkmonitor_peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol]) info "supported protocols in the network", protocol=protocol, count=countOfProtocols # update count on each user-agent for userAgent in allAgentStrings.keys(): let countOfUserAgent = allAgentStrings[userAgent] - peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent]) + networkmonitor_peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent]) info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent + for country in countries.keys(): + let peerCount = countries[country] + networkmonitor_peer_country_count.set(int64(peerCount), labelValues = [country]) + info "number of peers per country", country=country, count=peerCount + proc populateInfoFromIp(allPeersRef: CustomPeersTableRef, restClient: RestClientRef) {.async.} = for peer in allPeersRef.keys(): @@ -248,6 +269,7 @@ proc crawlNetwork(node: WakuNode, let crawlInterval = conf.refreshInterval * 1000 while true: + let startTime = Moment.now() # discover new random nodes let discoveredNodes = await wakuDiscv5.protocol.queryRandom() @@ -262,6 +284,8 @@ proc crawlNetwork(node: WakuNode, # note random discovered nodes can be already known await setConnectedPeersMetrics(discoveredNodes, node, conf.timeout, restClient, allPeersRef) + updateMetrics(allPeersRef) + # populate info from ip addresses await populateInfoFromIp(allPeersRef, restClient) @@ -273,8 +297,12 @@ proc crawlNetwork(node: WakuNode, # Notes: # we dont run ipMajorityLoop # we dont run revalidateLoop + let endTime = Moment.now() + let elapsed = (endTime - startTime).nanos + + info "crawl duration", time=elapsed.millis - await sleepAsync(crawlInterval.millis) + await sleepAsync(crawlInterval.millis - elapsed.millis) proc retrieveDynamicBootstrapNodes(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): Result[seq[RemotePeerInfo], string] = if dnsDiscovery and dnsDiscoveryUrl != "": diff --git a/apps/networkmonitor/networkmonitor_metrics.nim b/apps/networkmonitor/networkmonitor_metrics.nim index b030b69d28..61d2d69122 100644 --- a/apps/networkmonitor/networkmonitor_metrics.nim +++ b/apps/networkmonitor/networkmonitor_metrics.nim @@ -25,22 +25,30 @@ logScope: #discovery_message_requests_outgoing_total{response=""} #discovery_message_requests_outgoing_total{response="no_response"} -declarePublicGauge peer_type_as_per_enr, +declarePublicGauge networkmonitor_peer_type_as_per_enr, "Number of peers supporting each capability according the the ENR", labels = ["capability"] -declarePublicGauge peer_type_as_per_protocol, +declarePublicGauge networkmonitor_peer_type_as_per_protocol, "Number of peers supporting each protocol, after a successful connection) ", labels = ["protocols"] -declarePublicGauge peer_user_agents, +declarePublicGauge networkmonitor_peer_user_agents, "Number of peers with each user agent", labels = ["user_agent"] -declarePublicHistogram peer_ping, +declarePublicHistogram networkmonitor_peer_ping, "Histogram tracking ping durations for discovered peers", buckets = [100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0, Inf] +declarePublicGauge networkmonitor_peer_count, + "Number of discovered peers", + labels = ["connected"] + +declarePublicGauge networkmonitor_peer_country_count, + "Number of peers per country", + labels = ["country"] + type CustomPeerInfo* = object # populated after discovery From 31c9af7517c7e11f5d859fe92441117858c7b26d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Pavl=C3=ADn?= Date: Wed, 27 Sep 2023 17:16:29 +0200 Subject: [PATCH 3/4] rework metrics table fillup --- apps/networkmonitor/networkmonitor.nim | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 1668336c8e..d04661e054 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -191,7 +191,7 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], # inform the total connections that we did in this round info "number of successful connections", amount=successfulConnections -proc updateMetrics(allPeersRef: CustomPeersTableRef) {.gcsafe, raises: [KeyError].} = +proc updateMetrics(allPeersRef: CustomPeersTableRef) {.gcsafe.} = var allProtocols: Table[string, int] var allAgentStrings: Table[string, int] var countries: Table[string, int] @@ -201,18 +201,13 @@ proc updateMetrics(allPeersRef: CustomPeersTableRef) {.gcsafe, raises: [KeyError for peerInfo in allPeersRef.values: if peerInfo.connError == "": for protocol in peerInfo.supportedProtocols: - if not allProtocols.hasKey(protocol): - allProtocols[protocol] = 0 - allProtocols[protocol] += 1 + allProtocols[protocol] = allProtocols.mgetOrPut(protocol, 0) + 1 # store available user-agents in the network - if not allAgentStrings.hasKey(peerInfo.userAgent): - allAgentStrings[peerInfo.userAgent] = 0 - allAgentStrings[peerInfo.userAgent] += 1 + allAgentStrings[peerInfo.userAgent] = allAgentStrings.mgetOrPut(peerInfo.userAgent, 0) + 1 + if peerInfo.country != "": - if not countries.hasKey(peerInfo.country): - countries[peerInfo.country] = 0 - countries[peerInfo.country] += 1 + countries[peerInfo.country] = countries.mgetOrPut(peerInfo.country, 0) + 1 connectedPeers += 1 else: @@ -222,18 +217,18 @@ proc updateMetrics(allPeersRef: CustomPeersTableRef) {.gcsafe, raises: [KeyError networkmonitor_peer_count.set(int64(failedPeers), labelValues = ["false"]) # update count on each protocol for protocol in allProtocols.keys(): - let countOfProtocols = allProtocols[protocol] + let countOfProtocols = allProtocols.mgetOrPut(protocol, 0) networkmonitor_peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol]) info "supported protocols in the network", protocol=protocol, count=countOfProtocols # update count on each user-agent for userAgent in allAgentStrings.keys(): - let countOfUserAgent = allAgentStrings[userAgent] + let countOfUserAgent = allAgentStrings.mgetOrPut(userAgent, 0) networkmonitor_peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent]) info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent for country in countries.keys(): - let peerCount = countries[country] + let peerCount = countries.mgetOrPut(country, 0) networkmonitor_peer_country_count.set(int64(peerCount), labelValues = [country]) info "number of peers per country", country=country, count=peerCount From 8b0188514b55ac9677eef06f8cb8c0613cf3505d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Pavl=C3=ADn?= Date: Thu, 28 Sep 2023 09:13:26 +0200 Subject: [PATCH 4/4] reset connErr to make sure we honour successful reconnection --- apps/networkmonitor/networkmonitor.nim | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index d04661e054..2d676c0d40 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -77,6 +77,7 @@ proc analyzePeer( customPeerInfo.retries += 1 return err(customPeerInfo.connError) + customPeerInfo.connError = "" info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis networkmonitor_peer_ping.observe(pingDelay.millis)