diff --git a/consensus/hotstuff_leader.go b/consensus/hotstuff_leader.go index 5607ad1bb..6d8cfe4e8 100644 --- a/consensus/hotstuff_leader.go +++ b/consensus/hotstuff_leader.go @@ -24,16 +24,7 @@ type HotstuffLeaderMessageHandler struct{} /*** Prepare Step ***/ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_NEW_ROUND, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_LEADER, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -89,16 +80,7 @@ func (handler *HotstuffLeaderMessageHandler) HandleNewRoundMessage(m *consensusM /*** PreCommit Step ***/ func (handler *HotstuffLeaderMessageHandler) HandlePrepareMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_PREPARE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_LEADER, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -142,16 +124,7 @@ func (handler *HotstuffLeaderMessageHandler) HandlePrepareMessage(m *consensusMo /*** Commit Step ***/ func (handler *HotstuffLeaderMessageHandler) HandlePrecommitMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_PRECOMMIT, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_LEADER, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -195,16 +168,7 @@ func (handler *HotstuffLeaderMessageHandler) HandlePrecommitMessage(m *consensus /*** Decide Step ***/ func (handler *HotstuffLeaderMessageHandler) HandleCommitMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_COMMIT, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_LEADER, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -253,16 +217,8 @@ func (handler *HotstuffLeaderMessageHandler) HandleCommitMessage(m *consensusMod } func (handler *HotstuffLeaderMessageHandler) HandleDecideMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_DECIDE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_LEADER, - ) + handler.emitTelemetryEvent(m, msg) + if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) return @@ -278,6 +234,19 @@ func (handler *HotstuffLeaderMessageHandler) anteHandle(m *consensusModule, msg return nil } +func (handler *HotstuffLeaderMessageHandler) emitTelemetryEvent(m *consensusModule, msg *typesCons.HotstuffMessage) { + m.GetBus(). + GetTelemetryModule(). + GetEventMetricsAgent(). + EmitEvent( + consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, + consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, + consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), + typesCons.StepToString[msg.GetStep()], + consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_LEADER, + ) +} + // ValidateBasic general validation checks that apply to every HotstuffLeaderMessage func (handler *HotstuffLeaderMessageHandler) validateBasic(m *consensusModule, msg *typesCons.HotstuffMessage) error { // Discard messages with invalid partial signatures before storing it in the leader's consensus mempool diff --git a/consensus/hotstuff_replica.go b/consensus/hotstuff_replica.go index 8ab507da8..52ae9fb93 100644 --- a/consensus/hotstuff_replica.go +++ b/consensus/hotstuff_replica.go @@ -2,6 +2,7 @@ package consensus import ( "fmt" + "log" consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry" typesCons "github.com/pokt-network/pocket/consensus/types" @@ -23,16 +24,7 @@ var ( /*** NewRound Step ***/ func (handler *HotstuffReplicaMessageHandler) HandleNewRoundMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_NEW_ROUND, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_REPLICA, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -46,16 +38,7 @@ func (handler *HotstuffReplicaMessageHandler) HandleNewRoundMessage(m *consensus /*** Prepare Step ***/ func (handler *HotstuffReplicaMessageHandler) HandlePrepareMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_PREPARE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_REPLICA, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -88,16 +71,7 @@ func (handler *HotstuffReplicaMessageHandler) HandlePrepareMessage(m *consensusM /*** PreCommit Step ***/ func (handler *HotstuffReplicaMessageHandler) HandlePrecommitMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_PRECOMMIT, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_REPLICA, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -125,16 +99,7 @@ func (handler *HotstuffReplicaMessageHandler) HandlePrecommitMessage(m *consensu /*** Commit Step ***/ func (handler *HotstuffReplicaMessageHandler) HandleCommitMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_COMMIT, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_REPLICA, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -162,16 +127,7 @@ func (handler *HotstuffReplicaMessageHandler) HandleCommitMessage(m *consensusMo /*** Decide Step ***/ func (handler *HotstuffReplicaMessageHandler) HandleDecideMessage(m *consensusModule, msg *typesCons.HotstuffMessage) { - m.GetBus(). - GetTelemetryModule(). - GetEventMetricsAgent(). - EmitEvent( - consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_DECIDE, - consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_REPLICA, - ) + handler.emitTelemetryEvent(m, msg) if err := handler.anteHandle(m, msg); err != nil { m.nodeLogError(typesCons.ErrHotstuffValidation.Error(), err) @@ -195,9 +151,23 @@ func (handler *HotstuffReplicaMessageHandler) HandleDecideMessage(m *consensusMo // anteHandle is the handler called on every replica message before specific handler func (handler *HotstuffReplicaMessageHandler) anteHandle(m *consensusModule, msg *typesCons.HotstuffMessage) error { + log.Println("TODO: Hotstuff replica ante handle not implemented yet") return nil } +func (handler *HotstuffReplicaMessageHandler) emitTelemetryEvent(m *consensusModule, msg *typesCons.HotstuffMessage) { + m.GetBus(). + GetTelemetryModule(). + GetEventMetricsAgent(). + EmitEvent( + consensusTelemetry.CONSENSUS_EVENT_METRICS_NAMESPACE, + consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_NAME, + consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT, m.CurrentHeight(), + typesCons.StepToString[msg.GetStep()], + consensusTelemetry.HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_REPLICA, + ) +} + func (m *consensusModule) validateProposal(msg *typesCons.HotstuffMessage) error { if !(msg.Type == Propose && msg.Step == Prepare) { return typesCons.ErrProposalNotValidInPrepare diff --git a/consensus/telemetry/metrics.go b/consensus/telemetry/metrics.go index 967b97f27..54e0ecd7d 100644 --- a/consensus/telemetry/metrics.go +++ b/consensus/telemetry/metrics.go @@ -14,11 +14,6 @@ const ( HOTPOKT_MESSAGE_EVENT_METRIC_NAME = "hotpokt_message_event_metric" HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_HEIGHT = "HEIGHT" - HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_NEW_ROUND = "STEP_NEW_ROUND" - HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_PREPARE = "STEP_PREPARE" - HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_PRECOMMIT = "STEP_PRECOMMIT" - HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_COMMIT = "STEP_COMMIT" - HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_STEP_DECIDE = "STEP_DECIDE" HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_LEADER = "VALIDATOR_TYPE_LEADER" HOTPOKT_MESSAGE_EVENT_METRIC_LABEL_VALIDATOR_TYPE_REPLICA = "VALIDATOR_TYPE_REPLICA" ) diff --git a/p2p/module.go b/p2p/module.go index f0e3d0303..7f2c94f60 100644 --- a/p2p/module.go +++ b/p2p/module.go @@ -53,6 +53,8 @@ func Create(cfg *config.Config) (m modules.P2PModule, err error) { } func (m *p2pModule) SetBus(bus modules.Bus) { + // INVESTIGATE: Can the code flow be modified to set the bus here? + // m.network.SetBus(m.GetBus()) m.bus = bus } @@ -85,6 +87,7 @@ func (m *p2pModule) Start() error { } else { m.network = stdnetwork.NewNetwork(addrBook) } + m.network.SetBus(m.GetBus()) m.network.SetBus(m.GetBus()) @@ -98,11 +101,12 @@ func (m *p2pModule) Start() error { go m.handleNetworkMessage(data) } }() - m. - GetBus(). + + m.GetBus(). GetTelemetryModule(). GetTimeSeriesAgent(). CounterIncrement(p2pTelemetry.P2P_NODE_STARTED_TIMESERIES_METRIC_NAME) + return nil } diff --git a/p2p/raintree/network.go b/p2p/raintree/network.go index 6dc65aff3..f18e5d73d 100644 --- a/p2p/raintree/network.go +++ b/p2p/raintree/network.go @@ -151,7 +151,7 @@ func (n *rainTreeNetwork) HandleNetworkData(data []byte) ([]byte, error) { EmitEvent( p2pTelemetry.P2P_EVENT_METRICS_NAMESPACE, p2pTelemetry.RAINTREE_MESSAGE_EVENT_METRIC_NAME, - "height", blockHeight, + p2pTelemetry.RAINTREE_MESSAGE_EVENT_METRIC_HEIGHT_LABEL, blockHeight, ) var rainTreeMsg typesP2P.RainTreeMessage @@ -188,8 +188,8 @@ func (n *rainTreeNetwork) HandleNetworkData(data []byte) ([]byte, error) { EmitEvent( p2pTelemetry.P2P_EVENT_METRICS_NAMESPACE, p2pTelemetry.BROADCAST_MESSAGE_REDUNDANCY_PER_BLOCK_EVENT_METRIC_NAME, - "hash", hashString, - "height", blockHeight, + p2pTelemetry.RAINTREE_MESSAGE_EVENT_METRIC_HASH_LABEL, hashString, + p2pTelemetry.RAINTREE_MESSAGE_EVENT_METRIC_HEIGHT_LABEL, blockHeight, ) return nil, nil @@ -225,6 +225,11 @@ func (n *rainTreeNetwork) SetBus(bus modules.Bus) { } func (n *rainTreeNetwork) GetBus() modules.Bus { + // TODO: Do we need this if? + // if n.bus == nil { + // log.Printf("[WARN] PocketBus is not initialized in rainTreeNetwork") + // return nil + // } return n.bus } @@ -234,6 +239,7 @@ func getNonce() uint64 { } // INVESTIGATE(olshansky): This did not generate a random nonce on every call + // func getNonce() uint64 { // seed, err := cryptRand.Int(cryptRand.Reader, big.NewInt(math.MaxInt64)) // if err != nil { diff --git a/p2p/raintree/utils.go b/p2p/raintree/utils.go index 859fad738..0f4351c8f 100644 --- a/p2p/raintree/utils.go +++ b/p2p/raintree/utils.go @@ -6,6 +6,7 @@ import ( cryptoPocket "github.com/pokt-network/pocket/shared/crypto" ) +// CLEANUP: Consolidate this with other similar functions in the codebase. func GetHashStringFromBytes(b []byte) string { hash := cryptoPocket.SHA3Hash(b) return hex.EncodeToString(hash) diff --git a/p2p/telemetry/metrics.go b/p2p/telemetry/metrics.go index 190e5680c..773190763 100644 --- a/p2p/telemetry/metrics.go +++ b/p2p/telemetry/metrics.go @@ -10,4 +10,8 @@ const ( BROADCAST_MESSAGE_REDUNDANCY_PER_BLOCK_EVENT_METRIC_NAME = "broadcast_message_redundancy_per_block_event_metric" RAINTREE_MESSAGE_EVENT_METRIC_NAME = "raintree_message_event_metric" + + // Attributes + RAINTREE_MESSAGE_EVENT_METRIC_HEIGHT_LABEL = "height" + RAINTREE_MESSAGE_EVENT_METRIC_HASH_LABEL = "hash" ) diff --git a/p2p/types/network.go b/p2p/types/network.go index 69ebd2cf5..ed1918a9a 100644 --- a/p2p/types/network.go +++ b/p2p/types/network.go @@ -17,6 +17,7 @@ type AddrBookMap map[string]*NetworkPeer // can be simplified greatly. type Network interface { modules.IntegratableModule + NetworkBroadcast(data []byte) error NetworkSend(data []byte, address cryptoPocket.Address) error diff --git a/shared/modules/telemetry_module.go b/shared/modules/telemetry_module.go index 624df6d55..9c998a681 100644 --- a/shared/modules/telemetry_module.go +++ b/shared/modules/telemetry_module.go @@ -26,7 +26,7 @@ type TimeSeriesAgent interface { // Register a gauge by name GaugeRegister(name string, description string) - // Sets the Gauge to an arbitrary value. + // Sets the Gauge to an arbitrary value GaugeSet(name string, value float64) (prometheus.Gauge, error) // Increments the Gauge by 1. Use Add to increment it by arbitrary values. diff --git a/shared/node.go b/shared/node.go index 28c6a2eab..262751eac 100644 --- a/shared/node.go +++ b/shared/node.go @@ -58,15 +58,7 @@ func Create(cfg *config.Config) (n *Node, err error) { return nil, err } - bus, err := CreateBus( - persistenceMod, - p2pMod, - utilityMod, - consensusMod, - telemetryMod, - cfg, - ) - + bus, err := CreateBus(persistenceMod, p2pMod, utilityMod, consensusMod, telemetryMod, cfg) if err != nil { return nil, err } diff --git a/shared/telemetry/README.md b/shared/telemetry/README.md index df0946b99..2555ccdcb 100644 --- a/shared/telemetry/README.md +++ b/shared/telemetry/README.md @@ -170,21 +170,21 @@ _NOTE: Make sure you use `http` and not `https` when developing locally._ ![](./docs/browsing-existing-dashboards.gif) - # Defining your own metrics We follow a specific pattern to define our metrics to guarantee consistency and ease of use. -Metric definitions for each module/domain/service are stored under a new folder called `telemetry` in the module's folder, in a file named `metrics.go`, and they respect the following rules: +Metric definitions for each module/domain/service are stored under a new folder called `telemetry` in the module's folder, in a file named `metrics.go`, and they respect the following rules: -* Every metric's name and description and any additional information about the metric should be defined as a constant. -* Constants relative to a metric's definition follow a naming pattern: `__` -* We keep the actual metric name value open for definition however the developer sees fit. +- Every metric's name and description and any additional information about the metric should be defined as a constant. +- Constants relative to a metric's definition follow a naming pattern: `__` +- We keep the actual metric name value open for definition however the developer sees fit. For example: We want to define a metric of type: timeseries, with a name: `nodes_alive_counter`, The constants definition will be as follows: + ```go // metric_name=NODES_ALIVE_COUNTER // metric_type=TIME_SERIES