Skip to content

Commit

Permalink
Merge pull request #37 from vpavlin/chore/extract-types
Browse files Browse the repository at this point in the history
chore: refactor types, error handling, clean up endpoints
  • Loading branch information
vpavlin authored Aug 13, 2024
2 parents dc7d3d2 + dce9202 commit 63f5395
Show file tree
Hide file tree
Showing 11 changed files with 564 additions and 449 deletions.
113 changes: 113 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package types

import "encoding/json"

type TelemetryType string

const (
ProtocolStatsMetric TelemetryType = "ProtocolStats"
ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailureMetric TelemetryType = "PeerConnFailure"
)

type TelemetryRequest struct {
Id int `json:"id"`
TelemetryType TelemetryType `json:"telemetry_type"`
TelemetryData *json.RawMessage `json:"telemetry_data"`
}

type PeerCount struct {
ID int `json:"id"`
CreatedAt int64 `json:"createdAt"`
Timestamp int64 `json:"timestamp"`
NodeName string `json:"nodeName"`
NodeKeyUid string `json:"nodeKeyUid"`
PeerID string `json:"peerId"`
PeerCount int `json:"peerCount"`
StatusVersion string `json:"statusVersion"`
}

type PeerConnFailure struct {
ID int `json:"id"`
CreatedAt int64 `json:"createdAt"`
Timestamp int64 `json:"timestamp"`
NodeName string `json:"nodeName"`
NodeKeyUid string `json:"nodeKeyUid"`
PeerId string `json:"peerId"`
StatusVersion string `json:"statusVersion"`
FailedPeerId string `json:"failedPeerId"`
FailureCount int `json:"failureCount"`
}

type SentEnvelope struct {
ID int `json:"id"`
MessageHash string `json:"messageHash"`
SentAt int64 `json:"sentAt"`
CreatedAt int64 `json:"createdAt"`
PubsubTopic string `json:"pubsubTopic"`
Topic string `json:"topic"`
SenderKeyUID string `json:"senderKeyUID"`
PeerID string `json:"peerId"`
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
PublishMethod string `json:"publishMethod"`
StatusVersion string `json:"statusVersion"`
}

type ErrorSendingEnvelope struct {
CreatedAt int64 `json:"createdAt"`
Error string `json:"error"`
SentEnvelope SentEnvelope `json:"sentEnvelope"`
}

type ReceivedEnvelope struct {
ID int `json:"id"`
MessageHash string `json:"messageHash"`
SentAt int64 `json:"sentAt"`
CreatedAt int64 `json:"createdAt"`
PubsubTopic string `json:"pubsubTopic"`
Topic string `json:"topic"`
ReceiverKeyUID string `json:"receiverKeyUID"`
PeerID string `json:"peerId"`
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
StatusVersion string `json:"statusVersion"`
}

type Metric struct {
TotalIn int64 `json:"totalIn"`
TotalOut int64 `json:"totalOut"`
RateIn float64 `json:"rateIn"`
RateOut float64 `json:"rateOut"`
}

type ProtocolStats struct {
PeerID string `json:"hostID"`
Relay Metric `json:"relay"`
Store Metric `json:"store"`
FilterPush Metric `json:"filter-push"`
FilterSubscribe Metric `json:"filter-subscribe"`
Lightpush Metric `json:"lightpush"`
}

type ReceivedMessage struct {
ID int `json:"id"`
ChatID string `json:"chatId"`
MessageHash string `json:"messageHash"`
MessageID string `json:"messageId"`
MessageType string `json:"messageType"`
MessageSize int `json:"messageSize"`
ReceiverKeyUID string `json:"receiverKeyUID"`
PeerID string `json:"peerId"`
NodeName string `json:"nodeName"`
SentAt int64 `json:"sentAt"`
Topic string `json:"topic"`
PubsubTopic string `json:"pubsubTopic"`
CreatedAt int64 `json:"createdAt"`
StatusVersion string `json:"statusVersion"`
}
45 changes: 31 additions & 14 deletions telemetry/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/status-im/dev-telemetry/pkg/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -97,11 +98,16 @@ func dropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = db.Exec("DROP INDEX IF EXISTS receivedMessages_unique")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

db.Close()
}

func updateCreatedAt(db *sql.DB, m *ReceivedMessage) error {
_, err := db.Exec("UPDATE receivedMessages SET createdAt = $1 WHERE id = $2", m.CreatedAt, m.ID)
_, err := db.Exec("UPDATE receivedMessages SET createdAt = $1 WHERE id = $2", m.data.CreatedAt, m.data.ID)
return err
}

Expand Down Expand Up @@ -134,38 +140,42 @@ func TestRunAggregatorSimple(t *testing.T) {
db := NewMock()
defer dropTables(db)

m := &ReceivedMessage{
mData := types.ReceivedMessage{
ChatID: "1",
MessageHash: "1",
ReceiverKeyUID: "1",
SentAt: time.Now().Unix(),
Topic: "1",
}

m := &ReceivedMessage{data: mData}
err := m.put(db)
require.NoError(t, err)

oneHourAndHalf := time.Hour + time.Minute*30
m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "2",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

twoHourAndHalf := 5*time.Hour + time.Minute*30
m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "3",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)
m.CreatedAt = m.SentAt
m.data.CreatedAt = m.data.SentAt
err = updateCreatedAt(db, m)
require.NoError(t, err)

Expand All @@ -189,81 +199,88 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
db := NewMock()
defer dropTables(db)

m := &ReceivedMessage{
mData := types.ReceivedMessage{
ChatID: "1",
MessageHash: "1",
ReceiverKeyUID: "1",
SentAt: time.Now().Unix(),
Topic: "1",
}
m := &ReceivedMessage{data: mData}
err := m.put(db)
require.NoError(t, err)

oneHourAndHalf := time.Hour + time.Minute*30
m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "2",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "3",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

twoHourAndHalf := 5*time.Hour + time.Minute*30
m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "4",
ReceiverKeyUID: "1",
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)
m.CreatedAt = m.SentAt
m.data.CreatedAt = m.data.SentAt
err = updateCreatedAt(db, m)
require.NoError(t, err)

m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "1",
MessageHash: "1",
ReceiverKeyUID: "2",
SentAt: time.Now().Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "2",
ReceiverKeyUID: "2",
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)

m = &ReceivedMessage{
mData = types.ReceivedMessage{
ChatID: "3",
MessageHash: "4",
ReceiverKeyUID: "2",
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
Topic: "1",
}
m = &ReceivedMessage{data: mData}
err = m.put(db)
require.NoError(t, err)
m.CreatedAt = m.SentAt
m.data.CreatedAt = m.data.SentAt
err = updateCreatedAt(db, m)
require.NoError(t, err)

Expand Down
45 changes: 45 additions & 0 deletions telemetry/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package telemetry

import (
"sync"

"go.uber.org/zap"
)

type ErrorDetail struct {
Id int `json:"id"`
Error string `json:"error"`
}

type MetricErrors struct {
logger *zap.Logger
mutex sync.Mutex
errors []ErrorDetail
}

func NewMetricErrors(logger *zap.Logger) *MetricErrors {
return &MetricErrors{
logger: logger,
}
}

func (me *MetricErrors) Append(id int, err string) {
if me.logger != nil {
me.logger.Error(err)
}
me.mutex.Lock()
defer me.mutex.Unlock()
me.errors = append(me.errors, ErrorDetail{Id: id, Error: err})
}

func (me *MetricErrors) Get() []ErrorDetail {
me.mutex.Lock()
defer me.mutex.Unlock()
return me.errors
}

func (me *MetricErrors) Len() int {
me.mutex.Lock()
defer me.mutex.Unlock()
return len(me.errors)
}
Loading

0 comments on commit 63f5395

Please sign in to comment.