Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor types, error handling, clean up endpoints #37

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package types

import "encoding/json"

type TelemetryType string

const (
ProtocolStatsMetric TelemetryType = "ProtocolStats"
ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailureMetric TelemetryType = "PeerConnFailure"
)

type TelemetryRequest struct {
Id int `json:"id"`
TelemetryType TelemetryType `json:"telemetry_type"`
TelemetryData *json.RawMessage `json:"telemetry_data"`
}

type PeerCount struct {
ID int `json:"id"`
CreatedAt int64 `json:"createdAt"`
Timestamp int64 `json:"timestamp"`
NodeName string `json:"nodeName"`
NodeKeyUid string `json:"nodeKeyUid"`
PeerID string `json:"peerId"`
PeerCount int `json:"peerCount"`
StatusVersion string `json:"statusVersion"`
}

type PeerConnFailure struct {
ID int `json:"id"`
CreatedAt int64 `json:"createdAt"`
Timestamp int64 `json:"timestamp"`
NodeName string `json:"nodeName"`
NodeKeyUid string `json:"nodeKeyUid"`
PeerId string `json:"peerId"`
StatusVersion string `json:"statusVersion"`
FailedPeerId string `json:"failedPeerId"`
FailureCount int `json:"failureCount"`
}

type SentEnvelope struct {
ID int `json:"id"`
MessageHash string `json:"messageHash"`
SentAt int64 `json:"sentAt"`
CreatedAt int64 `json:"createdAt"`
PubsubTopic string `json:"pubsubTopic"`
Topic string `json:"topic"`
SenderKeyUID string `json:"senderKeyUID"`
PeerID string `json:"peerId"`
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
PublishMethod string `json:"publishMethod"`
StatusVersion string `json:"statusVersion"`
}

type ErrorSendingEnvelope struct {
CreatedAt int64 `json:"createdAt"`
Error string `json:"error"`
SentEnvelope SentEnvelope `json:"sentEnvelope"`
}

type ReceivedEnvelope struct {
ID int `json:"id"`
MessageHash string `json:"messageHash"`
SentAt int64 `json:"sentAt"`
CreatedAt int64 `json:"createdAt"`
PubsubTopic string `json:"pubsubTopic"`
Topic string `json:"topic"`
ReceiverKeyUID string `json:"receiverKeyUID"`
PeerID string `json:"peerId"`
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
StatusVersion string `json:"statusVersion"`
}

type Metric struct {
TotalIn int64 `json:"totalIn"`
TotalOut int64 `json:"totalOut"`
RateIn float64 `json:"rateIn"`
RateOut float64 `json:"rateOut"`
}

type ProtocolStats struct {
PeerID string `json:"hostID"`
Relay Metric `json:"relay"`
Store Metric `json:"store"`
FilterPush Metric `json:"filter-push"`
FilterSubscribe Metric `json:"filter-subscribe"`
Lightpush Metric `json:"lightpush"`
}

type ReceivedMessage struct {
ID int `json:"id"`
ChatID string `json:"chatId"`
MessageHash string `json:"messageHash"`
MessageID string `json:"messageId"`
MessageType string `json:"messageType"`
MessageSize int `json:"messageSize"`
ReceiverKeyUID string `json:"receiverKeyUID"`
PeerID string `json:"peerId"`
NodeName string `json:"nodeName"`
SentAt int64 `json:"sentAt"`
Topic string `json:"topic"`
PubsubTopic string `json:"pubsubTopic"`
CreatedAt int64 `json:"createdAt"`
StatusVersion string `json:"statusVersion"`
}
45 changes: 31 additions & 14 deletions telemetry/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/status-im/dev-telemetry/pkg/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -97,11 +98,16 @@ func dropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = db.Exec("DROP INDEX IF EXISTS receivedMessages_unique")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

db.Close()
}

func updateCreatedAt(db *sql.DB, m *ReceivedMessage) error {
_, err := db.Exec("UPDATE receivedMessages SET createdAt = $1 WHERE id = $2", m.CreatedAt, m.ID)
_, err := db.Exec("UPDATE receivedMessages SET createdAt = $1 WHERE id = $2", m.data.CreatedAt, m.data.ID)
return err
}

Expand Down Expand Up @@ -134,38 +140,42 @@ func TestRunAggregatorSimple(t *testing.T) {
db := NewMock()
defer dropTables(db)

m := &ReceivedMessage{
mData := types.ReceivedMessage{
ChatID: "1",
MessageHash: "1",
ReceiverKeyUID: "1",
SentAt: time.Now().Unix(),
Topic: "1",
}

m := &ReceivedMessage{data: mData}
err := m.put(db)
require.NoError(t, err)

oneHourAndHalf := time.Hour + time.Minute*30
m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "2",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

twoHourAndHalf := 5*time.Hour + time.Minute*30
m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "3",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)
m.CreatedAt = m.SentAt
m.data.CreatedAt = m.data.SentAt
err = updateCreatedAt(db, m)
require.NoError(t, err)

Expand All @@ -189,81 +199,88 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
db := NewMock()
defer dropTables(db)

m := &ReceivedMessage{
mData := types.ReceivedMessage{
ChatID: "1",
MessageHash: "1",
ReceiverKeyUID: "1",
SentAt: time.Now().Unix(),
Topic: "1",
}
m := &ReceivedMessage{data: mData}
err := m.put(db)
require.NoError(t, err)

oneHourAndHalf := time.Hour + time.Minute*30
m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "2",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "3",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

twoHourAndHalf := 5*time.Hour + time.Minute*30
m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "4",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)
m.CreatedAt = m.SentAt
m.data.CreatedAt = m.data.SentAt
err = updateCreatedAt(db, m)
require.NoError(t, err)

m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "1",
MessageHash: "1",
ReceiverKeyUID: "2",
SentAt: time.Now().Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "2",
ReceiverKeyUID: "2",
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "4",
ReceiverKeyUID: "2",
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)
m.CreatedAt = m.SentAt
m.data.CreatedAt = m.data.SentAt
err = updateCreatedAt(db, m)
require.NoError(t, err)

Expand Down
45 changes: 45 additions & 0 deletions telemetry/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package telemetry

import (
"sync"

"go.uber.org/zap"
)

type ErrorDetail struct {
Id int `json:"id"`
Error string `json:"error"`
}

type MetricErrors struct {
logger *zap.Logger
mutex sync.Mutex
errors []ErrorDetail
}

func NewMetricErrors(logger *zap.Logger) *MetricErrors {
return &MetricErrors{
logger: logger,
}
}

func (me *MetricErrors) Append(id int, err string) {
if me.logger != nil {
me.logger.Error(err)
}
me.mutex.Lock()
defer me.mutex.Unlock()
me.errors = append(me.errors, ErrorDetail{Id: id, Error: err})
}

func (me *MetricErrors) Get() []ErrorDetail {
me.mutex.Lock()
defer me.mutex.Unlock()
return me.errors
}

func (me *MetricErrors) Len() int {
me.mutex.Lock()
defer me.mutex.Unlock()
return len(me.errors)
}
Loading
Loading