Skip to content

Commit

Permalink
feat(telemetry)_: add metric for confirmed delivery
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Oct 1, 2024
1 parent d1887cd commit 2749da2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 35 deletions.
71 changes: 37 additions & 34 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"go.uber.org/zap"

"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"
Expand All @@ -27,21 +26,21 @@ import (
type TelemetryType string

const (
ProtocolStatsMetric TelemetryType = "ProtocolStats"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
StoreConfirmationFailedMetric TelemetryType = "StoreConfirmationFailed"
MissedMessageMetric TelemetryType = "MissedMessage"
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage"
MaxRetryCache = 5000
ProtocolStatsMetric TelemetryType = "ProtocolStats"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
MissedMessageMetric TelemetryType = "MissedMessage"
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage"
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
MaxRetryCache = 5000
)

type TelemetryRequest struct {
Expand Down Expand Up @@ -107,10 +106,6 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma
}
}

func (c *Client) PushStoreConfirmationFailed(ctx context.Context, msgHash common.Hash) {
c.processAndPushTelemetry(ctx, StoreConfirmationFailed{MessageHash: msgHash.String()})
}

func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) {
c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope})
}
Expand All @@ -119,6 +114,10 @@ func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage
c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage})
}

func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, numMessages int) {
c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{NumMessages: numMessages})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -164,6 +163,10 @@ type MissedRelevantMessage struct {
ReceivedMessage *v2common.ReceivedMessage
}

type MessageDeliveryConfirmed struct {
NumMessages int
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -334,12 +337,6 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerCountByOriginMetric,
TelemetryData: c.ProcessPeerCountByOrigin(v),
}
case StoreConfirmationFailed:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: StoreConfirmationFailedMetric,
TelemetryData: c.ProcessStoreConfirmationFailed(v),
}
case MissedMessage:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
Expand All @@ -352,6 +349,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: MissedRelevantMessageMetric,
TelemetryData: c.ProcessMissedRelevantMessage(v),
}
case MessageDeliveryConfirmed:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MessageDeliveryConfirmedMetric,
TelemetryData: c.ProcessMessageDeliveryConfirmed(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -511,14 +514,6 @@ func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *
return &jsonRawMessage
}

func (c *Client) ProcessStoreConfirmationFailed(storeConfirmationFailed StoreConfirmationFailed) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = storeConfirmationFailed.MessageHash
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = missedMessage.Envelope.Hash().String()
Expand All @@ -541,6 +536,14 @@ func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessag
return &jsonRawMessage
}

func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed MessageDeliveryConfirmed) *json.RawMessage {
postBody := c.commonPostBody()
postBody["numMessages"] = messageDeliveryConfirmed.NumMessages
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
Expand Down
5 changes: 4 additions & 1 deletion wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ type ITelemetryClient interface {
PushMessageCheckFailure(ctx context.Context, messageHash string)
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
PushStoreConfirmationFailed(ctx context.Context, msgHash gethcommon.Hash)
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
PushMessageDeliveryConfirmed(ctx context.Context, numMessages int)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -986,6 +986,9 @@ func (w *Waku) SkipPublishToTopic(value bool) {

func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
w.messageSender.MessagesDelivered(hashes)
if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushMessageDeliveryConfirmed(w.ctx, len(hashes))
}
}

func (w *Waku) SetStorePeerID(peerID peer.ID) {
Expand Down

0 comments on commit 2749da2

Please sign in to comment.