From b4fecd55d7511ed7df46573df017d0a5f8ad873e Mon Sep 17 00:00:00 2001 From: cce <51567+cce@users.noreply.github.com> Date: Fri, 30 Sep 2022 13:04:45 -0400 Subject: [PATCH] telemetry: Count and report the number of duplicate proposals and MsgDigestSkipTag messages received (#4605) --- agreement/proposalStore.go | 10 ++++++++++ logging/telemetryspec/event.go | 2 ++ network/wsNetwork.go | 7 ++++--- network/wsPeer.go | 13 ++++++++++++- util/metrics/metrics.go | 2 ++ 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/agreement/proposalStore.go b/agreement/proposalStore.go index 973f909d03..e375ef92f8 100644 --- a/agreement/proposalStore.go +++ b/agreement/proposalStore.go @@ -18,8 +18,16 @@ package agreement import ( "fmt" + + "github.com/algorand/go-algorand/util/metrics" ) +var proposalAlreadyFilledCounter = metrics.MakeCounter( + metrics.MetricName{Name: "algod_agreement_proposal_already_filled", Description: "Number of times a duplicate proposal payload was received before validation"}) + +var proposalAlreadyAssembledCounter = metrics.MakeCounter( + metrics.MetricName{Name: "algod_agreement_proposal_already_assembled", Description: "Number of times a duplicate proposal payload was received after validation"}) + // An blockAssembler contains the proposal data associated with some // proposal-value. // @@ -52,10 +60,12 @@ type blockAssembler struct { // an error if the pipelining operation is redundant. func (a blockAssembler) pipeline(p unauthenticatedProposal) (blockAssembler, error) { if a.Assembled { + proposalAlreadyAssembledCounter.Inc(nil) return a, fmt.Errorf("blockAssembler.pipeline: already assembled") } if a.Filled { + proposalAlreadyFilledCounter.Inc(nil) return a, fmt.Errorf("blockAssembler.pipeline: already filled") } diff --git a/logging/telemetryspec/event.go b/logging/telemetryspec/event.go index 7dd7968ce7..15a0461644 100644 --- a/logging/telemetryspec/event.go +++ b/logging/telemetryspec/event.go @@ -300,6 +300,8 @@ type PeerConnectionDetails struct { Endpoint string `json:",omitempty"` // MessageDelay is the avarage relative message delay. Not being used for incoming connection. MessageDelay int64 `json:",omitempty"` + // DuplicateFilterCount is the number of times this peer has sent us a message hash to filter that it had already sent before. + DuplicateFilterCount int64 } // CatchpointGenerationEvent event diff --git a/network/wsNetwork.go b/network/wsNetwork.go index b7b2a5e893..dff621fea9 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -1749,9 +1749,10 @@ func (wn *WebsocketNetwork) sendPeerConnectionsTelemetryStatus() { var connectionDetails telemetryspec.PeersConnectionDetails for _, peer := range peers { connDetail := telemetryspec.PeerConnectionDetails{ - ConnectionDuration: uint(now.Sub(peer.createTime).Seconds()), - TelemetryGUID: peer.TelemetryGUID, - InstanceName: peer.InstanceName, + ConnectionDuration: uint(now.Sub(peer.createTime).Seconds()), + TelemetryGUID: peer.TelemetryGUID, + InstanceName: peer.InstanceName, + DuplicateFilterCount: peer.duplicateFilterCount, } if peer.outgoing { connDetail.Address = justHost(peer.conn.RemoteAddr().String()) diff --git a/network/wsPeer.go b/network/wsPeer.go index 313df8ad58..870eefddbd 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -75,6 +75,7 @@ var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal) var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal) +var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal) var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal) var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal) @@ -184,6 +185,9 @@ type wsPeer struct { incomingMsgFilter *messageFilter outgoingMsgFilter *messageFilter + // duplicateFilterCount counts how many times the remote peer has sent us a message hash + // to filter that it had already sent before. + duplicateFilterCount int64 processed chan struct{} @@ -576,7 +580,14 @@ func (wp *wsPeer) handleFilterMessage(msg IncomingMessage) { var digest crypto.Digest copy(digest[:], msg.Data) //wp.net.log.Debugf("add filter %v", digest) - wp.outgoingMsgFilter.CheckDigest(digest, true, true) + has := wp.outgoingMsgFilter.CheckDigest(digest, true, true) + if has { + // Count that this peer has sent us duplicate filter messages: this means it received the same + // large message concurrently from several peers, and then sent the filter message to us after + // each large message finished transferring. + duplicateNetworkFilterReceivedTotal.Inc(nil) + atomic.AddInt64(&wp.duplicateFilterCount, 1) + } } func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason { diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index 01f161888f..f9965d0013 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -49,6 +49,8 @@ var ( DuplicateNetworkMessageReceivedTotal = MetricName{Name: "algod_network_duplicate_message_received_total", Description: "Total number of duplicate messages that were received from the network"} // DuplicateNetworkMessageReceivedBytesTotal The total number ,in bytes, of the duplicate messages that were received from the network DuplicateNetworkMessageReceivedBytesTotal = MetricName{Name: "algod_network_duplicate_message_received_bytes_total", Description: "The total number ,in bytes, of the duplicate messages that were received from the network"} + // DuplicateNetworkFilterReceivedTotal Total number of duplicate filter messages (tag MsgDigestSkipTag) that were received from the network + DuplicateNetworkFilterReceivedTotal = MetricName{Name: "algod_network_duplicate_filter_received_total", Description: "Total number of duplicate filter messages that were received from the network"} // OutgoingNetworkMessageFilteredOutTotal Total number of messages that were not sent per peer request OutgoingNetworkMessageFilteredOutTotal = MetricName{Name: "algod_outgoing_network_message_filtered_out_total", Description: "Total number of messages that were not sent per peer request"} // OutgoingNetworkMessageFilteredOutBytesTotal Total number of bytes saved by not sending messages that were asked not to be sent by peer