Skip to content

Commit

Permalink
fix: network monitor improvements (#2939)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Jul 30, 2024
1 parent d4e8a0d commit 8058323
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 26 deletions.
18 changes: 16 additions & 2 deletions apps/networkmonitor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,27 @@ networkmonitor [OPTIONS]...

The following options are available:

-l, --log-level Sets the log level [=LogLevel.DEBUG].
-l, --log-level Sets the log level [=LogLevel.INFO].
-t, --timeout Timeout to consider that the connection failed [=chronos.seconds(10)].
-b, --bootstrap-node Bootstrap ENR node. Argument may be repeated. [=@[""]].
--dns-discovery-url URL for DNS node list in format 'enrtree://<key>@<fqdn>'.
--pubsub-topic Default pubsub topic to subscribe to. Argument may be repeated..
-r, --refresh-interval How often new peers are discovered and connected to (in seconds) [=5].
--cluster-id Cluster id that the node is running in. Node in a different cluster id is
disconnected. [=1].
--rln-relay Enable spam protection through rln-relay: true|false [=true].
--rln-relay-dynamic Enable waku-rln-relay with on-chain dynamic group management: true|false
[=true].
--rln-relay-tree-path Path to the RLN merkle tree sled db (https://github.com/spacejam/sled).
--rln-relay-eth-client-address HTTP address of an Ethereum testnet client e.g., http://localhost:8540/
[=http://localhost:8540/].
--rln-relay-eth-contract-address Address of membership contract on an Ethereum testnet.
--rln-relay-epoch-sec Epoch size in seconds used to rate limit RLN memberships. Default is 1 second.
[=1].
--rln-relay-user-message-limit Set a user message limit for the rln membership registration. Must be a positive
integer. Default is 1. [=1].
--metrics-server Enable the metrics server: true|false [=true].
--metrics-server-address Listening address of the metrics server. [=ValidIpAddress.init("127.0.0.1")].
--metrics-server-address Listening address of the metrics server. [=parseIpAddress("127.0.0.1")].
--metrics-server-port Listening HTTP port of the metrics server. [=8008].
--metrics-rest-address Listening address of the metrics rest server. [=127.0.0.1].
--metrics-rest-port Listening HTTP port of the metrics rest server. [=8009].
Expand Down
34 changes: 34 additions & 0 deletions apps/networkmonitor/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: '3.8'
networks:
monitoring:
driver: bridge

volumes:
prometheus-data:
driver: local
grafana-data:
driver: local

# Services definitions
services:

prometheus:
image: docker.io/prom/prometheus:latest
container_name: prometheus
ports:
- 9090:9090
command:
- '--config.file=/etc/prometheus/prometheus.yaml'
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yaml:ro
- ./data:/prometheus
restart: unless-stopped

grafana:
image: grafana/grafana-oss:latest
container_name: grafana
ports:
- '3000:3000'
volumes:
- grafana-data:/var/lib/grafana
restart: unless-stopped
43 changes: 20 additions & 23 deletions apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ const MaxConnectedPeers = 150

const git_version* {.strdefine.} = "n/a"

proc setDiscoveredPeersCapabilities(routingTableNodes: seq[Node]) =
proc setDiscoveredPeersCapabilities(routingTableNodes: seq[waku_enr.Record]) =
for capability in @[Relay, Store, Filter, Lightpush]:
let nOfNodesWithCapability =
routingTableNodes.countIt(it.record.supportsCapability(capability))
routingTableNodes.countIt(it.supportsCapability(capability))
info "capabilities as per ENR waku flag",
capability = capability, amount = nOfNodesWithCapability
networkmonitor_peer_type_as_per_enr.set(
Expand Down Expand Up @@ -116,7 +116,7 @@ proc shouldReconnect(customPeerInfo: CustomPeerInfoRef): bool =

# TODO: Split in discover, connect
proc setConnectedPeersMetrics(
discoveredNodes: seq[Node],
discoveredNodes: seq[waku_enr.Record],
node: WakuNode,
timeout: chronos.Duration,
restClient: RestClientRef,
Expand All @@ -141,20 +141,10 @@ proc setConnectedPeersMetrics(

# iterate all newly discovered nodes
for discNode in discoveredNodes:
let typedRecord = discNode.record.toTypedRecord()
if not typedRecord.isOk():
warn "could not convert record to typed record", record = discNode.record
continue

let secp256k1 = typedRecord.get().secp256k1
if not secp256k1.isSome():
warn "could not get secp256k1 key", typedRecord = typedRecord.get()
continue

let peerRes = toRemotePeerInfo(discNode.record)
let peerRes = toRemotePeerInfo(discNode)

let peerInfo = peerRes.valueOr:
warn "error converting record to remote peer info", record = discNode.record
warn "error converting record to remote peer info", record = discNode
continue

# create new entry if new peerId found
Expand All @@ -169,10 +159,17 @@ proc setConnectedPeersMetrics(
let customPeerInfo = allPeers[peerId]

customPeerInfo.lastTimeDiscovered = currentTime
customPeerInfo.enr = discNode.record.toURI()
customPeerInfo.enrCapabilities = discNode.record.getCapabilities().mapIt($it)
customPeerInfo.enr = discNode.toURI()
customPeerInfo.enrCapabilities = discNode.getCapabilities().mapIt($it)
customPeerInfo.discovered += 1

for maddr in peerInfo.addrs:
if $maddr notin customPeerInfo.maddrs:
customPeerInfo.maddrs.add $maddr
let typedRecord = discNode.toTypedRecord()
if not typedRecord.isOk():
warn "could not convert record to typed record", record = discNode
continue
if not typedRecord.get().ip.isSome():
warn "ip field is not set", record = typedRecord.get()
continue
Expand Down Expand Up @@ -301,13 +298,13 @@ proc crawlNetwork(
while true:
let startTime = Moment.now()
# discover new random nodes
let discoveredNodes = await wakuDiscv5.protocol.queryRandom()
let discoveredNodes = await wakuDiscv5.findRandomPeers()

# nodes are nested into bucket, flat it
let flatNodes = wakuDiscv5.protocol.routingTable.buckets.mapIt(it.nodes).flatten()
#let flatNodes = wakuDiscv5.protocol.routingTable.buckets.mapIt(it.nodes).flatten()

# populate metrics related to capabilities as advertised by the ENR (see waku field)
setDiscoveredPeersCapabilities(flatNodes)
setDiscoveredPeersCapabilities(discoveredNodes)

# tries to connect to all newly discovered nodes
# and populates metrics related to peers we could connect
Expand All @@ -321,10 +318,10 @@ proc crawlNetwork(
# populate info from ip addresses
await populateInfoFromIp(allPeersRef, restClient)

let totalNodes = flatNodes.len
let seenNodes = flatNodes.countIt(it.seen)
let totalNodes = discoveredNodes.len
#let seenNodes = totalNodes

info "discovered nodes: ", total = totalNodes, seen = seenNodes
info "discovered nodes: ", total = totalNodes #, seen = seenNodes

# Notes:
# we dont run ipMajorityLoop
Expand Down
1 change: 1 addition & 0 deletions apps/networkmonitor/networkmonitor_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type
enrCapabilities*: seq[string]
country*: string
city*: string
maddrs*: seq[string]

# only after ok connection
lastTimeConnected*: int64
Expand Down
9 changes: 9 additions & 0 deletions apps/networkmonitor/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
global:
scrape_interval: 15s

scrape_configs:
- job_name: 'prometheus'
scrape_interval: 5s
static_configs:
- targets: ['host.docker.internal:8008']
metrics_path: '/metrics'
3 changes: 2 additions & 1 deletion waku/waku_metadata/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ proc initProtocolHandler(m: WakuMetadata) =
remoteClusterId = response.clusterId,
remoteShards = response.shards,
localClusterId = m.clusterId,
localShards = m.shards
localShards = m.shards,
peer = conn.peerId

discard await m.respond(conn)

Expand Down

0 comments on commit 8058323

Please sign in to comment.