Skip to content

Commit

Permalink
feat(telemetry)_: track missed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Sep 7, 2024
1 parent 80f88ab commit dc233fc
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
56 changes: 54 additions & 2 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"

v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"

v1protocol "github.com/status-im/status-go/protocol/v1"
v2common "github.com/status-im/status-go/wakuv2/common"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
)

type TelemetryType string
Expand All @@ -34,6 +34,8 @@ const (
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
StoreConfirmationFailedMetric TelemetryType = "StoreConfirmationFailed"
MissedMessageMetric TelemetryType = "MissedMessage"
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage"
MaxRetryCache = 5000
)

Expand Down Expand Up @@ -84,6 +86,14 @@ func (c *Client) PushStoreConfirmationFailed(ctx context.Context, msgHash common
c.processAndPushTelemetry(ctx, StoreConfirmationFailed{MessageHash: msgHash.String()})
}

func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) {
c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope})
}

func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) {
c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand All @@ -103,6 +113,14 @@ type StoreConfirmationFailed struct {
MessageHash string
}

type MissedMessage struct {
Envelope *v2protocol.Envelope
}

type MissedRelevantMessage struct {
ReceivedMessage *v2common.ReceivedMessage
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -261,6 +279,18 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: StoreConfirmationFailedMetric,
TelemetryData: c.ProcessStoreConfirmationFailed(v),
}
case MissedMessage:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MissedMessageMetric,
TelemetryData: c.ProcessMissedMessage(v),
}
case MissedRelevantMessage:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MissedRelevantMessageMetric,
TelemetryData: c.ProcessMissedRelevantMessage(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -406,6 +436,28 @@ func (c *Client) ProcessStoreConfirmationFailed(storeConfirmationFailed StoreCon
return &jsonRawMessage
}

func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = missedMessage.Envelope.Hash().String()
postBody["sentAt"] = uint32(missedMessage.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = missedMessage.Envelope.PubsubTopic()
postBody["contentTopic"] = missedMessage.Envelope.Message().ContentTopic
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessage) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = missedMessage.ReceivedMessage.Envelope.Hash().String()
postBody["sentAt"] = missedMessage.ReceivedMessage.Sent
postBody["pubsubTopic"] = missedMessage.ReceivedMessage.PubsubTopic
postBody["contentTopic"] = missedMessage.ReceivedMessage.ContentTopic
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
Expand Down
12 changes: 10 additions & 2 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type ITelemetryClient interface {
PushPeerCount(ctx context.Context, peerCount int)
PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int)
PushStoreConfirmationFailed(ctx context.Context, msgHash gethcommon.Hash)
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -1099,7 +1101,6 @@ func (w *Waku) Start() error {
go w.runPeerExchangeLoop()

if w.cfg.EnableMissingMessageVerification {

w.missingMsgVerifier = missing.NewMissingMessageVerifier(
w.node.Store(),
w,
Expand Down Expand Up @@ -1332,7 +1333,11 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
}

if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushReceivedEnvelope(w.ctx, envelope)
if msgType == common.MissingMessageType {
w.statusTelemetryClient.PushMissedMessage(w.ctx, envelope)
} else {
w.statusTelemetryClient.PushReceivedEnvelope(w.ctx, envelope)
}
}

logger := w.logger.With(
Expand Down Expand Up @@ -1451,6 +1456,9 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) {
w.storeMsgIDsMu.Unlock()
} else {
logger.Debug("filters did match")
if w.statusTelemetryClient != nil && e.MsgType == common.MissingMessageType {
w.statusTelemetryClient.PushMissedRelevantMessage(w.ctx, e)
}
e.Processed.Store(true)
}

Expand Down

0 comments on commit dc233fc

Please sign in to comment.