Skip to content

Commit

Permalink
feat: add metrics for peer count by shard and by origin
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Sep 11, 2024
1 parent e14f337 commit 4e937fa
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 42 deletions.
2 changes: 2 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func main() {
server.RegisterMetric(types.ReceivedEnvelopeMetric, &metrics.ReceivedEnvelope{})
server.RegisterMetric(types.ReceivedMessagesMetric, &metrics.ReceivedMessage{})
server.RegisterMetric(types.SentEnvelopeMetric, &metrics.SentEnvelope{})
server.RegisterMetric(types.PeerCountByShardMetric, &metrics.PeerCountByShard{})
server.RegisterMetric(types.PeerCountByOriginMetric, &metrics.PeerCountByOrigin{})

server.Start(*port)
}
12 changes: 12 additions & 0 deletions lib/common/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func DropTables(db *sql.DB) {
"peercount",
"peerconnfailure",
"errorsendingenvelope",
"peerCountByShard",
"peerCountByOrigin",
"schema_migrations",
}

Expand Down Expand Up @@ -84,6 +86,16 @@ func DropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = tx.Exec("DROP INDEX IF EXISTS peerCountByShard_unique")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = tx.Exec("DROP INDEX IF EXISTS peerCountByOrigin_unique")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the index", err)
}

_, err = tx.Exec("DROP TABLE IF EXISTS schema_migrations")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
Expand Down
93 changes: 58 additions & 35 deletions lib/database/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions lib/database/sql/000017_peer_count_shard_origin.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
CREATE TABLE IF NOT EXISTS peerCountByShard (
id SERIAL PRIMARY KEY,
recordId INTEGER NOT NULL,
count INTEGER NOT NULL,
shard INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
CONSTRAINT peerCountByShard_unique UNIQUE (recordId, count, shard, timestamp)
);

ALTER TABLE peerCountByShard ADD CONSTRAINT fk_peerCountByShard_telemetryRecord
FOREIGN KEY (recordId) REFERENCES telemetryRecord(id);

CREATE TABLE IF NOT EXISTS peerCountByOrigin (
id SERIAL PRIMARY KEY,
recordId INTEGER NOT NULL,
count INTEGER NOT NULL,
origin INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
CONSTRAINT peerCountByOrigin_unique UNIQUE (recordId, count, origin, timestamp)
);

ALTER TABLE peerCountByOrigin ADD CONSTRAINT fk_peerCountByOrigin_telemetryRecord
FOREIGN KEY (recordId) REFERENCES telemetryRecord(id);

CREATE TABLE IF NOT EXISTS origin_types (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL
);

INSERT INTO origin_types (id, name)
SELECT v.id, v.name
FROM (VALUES
(0, 'Unknown'),
(1, 'Discv5'),
(2, 'Static'),
(3, 'PeerExchange'),
(4, 'DNSDiscovery'),
(5, 'Rendezvous'),
(6, 'PeerManager')
) AS v(id, name)
WHERE NOT EXISTS (
SELECT 1 FROM origin_types WHERE id = v.id
);
100 changes: 100 additions & 0 deletions lib/metrics/peer_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,103 @@ func (r *PeerConnFailure) Process(ctx context.Context, db *sql.DB, errs *common.
func (r *PeerConnFailure) Clean(db *sql.DB, before int64) (int64, error) {
return common.Cleanup(db, "peerConnFailure", before)
}

type PeerCountByShard struct {
types.PeerCountByShard
}

type PeerCountByOrigin struct {
types.PeerCountByOrigin
}

func (r *PeerCountByShard) Process(ctx context.Context, db *sql.DB, errs *common.MetricErrors, data *types.TelemetryRequest) error {
if err := json.Unmarshal(*data.TelemetryData, &r); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding peer count by shard: %v", err))
return err
}

tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

recordId, err := InsertTelemetryRecord(tx, &r.TelemetryRecord)
if err != nil {
return err
}

result := tx.QueryRow(`
INSERT INTO peerCountByShard (recordId, count, shard, timestamp)
VALUES ($1, $2, $3, $4)
RETURNING id;
`, recordId, r.Count, r.Shard, r.Timestamp)
if result.Err() != nil {
errs.Append(data.ID, fmt.Sprintf("Error saving peer count by shard: %v", result.Err()))
return result.Err()
}

var lastInsertId int
err = result.Scan(&lastInsertId)
if err != nil {
return err
}
r.ID = int(lastInsertId)

if err := tx.Commit(); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error committing transaction: %v", err))
return err
}

return nil
}

func (r *PeerCountByShard) Clean(db *sql.DB, before int64) (int64, error) {
return common.Cleanup(db, "peerCountByShard", before)
}

func (r *PeerCountByOrigin) Process(ctx context.Context, db *sql.DB, errs *common.MetricErrors, data *types.TelemetryRequest) error {
if err := json.Unmarshal(*data.TelemetryData, &r); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding peer count by origin: %v", err))
return err
}

tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

recordId, err := InsertTelemetryRecord(tx, &r.TelemetryRecord)
if err != nil {
return err
}

result := tx.QueryRow(`
INSERT INTO peerCountByOrigin (recordId, count, origin, timestamp)
VALUES ($1, $2, $3, $4)
RETURNING id;
`, recordId, r.Count, r.Origin, r.Timestamp)
if result.Err() != nil {
errs.Append(data.ID, fmt.Sprintf("Error saving peer count by origin: %v", result.Err()))
return result.Err()
}

var lastInsertId int
err = result.Scan(&lastInsertId)
if err != nil {
return err
}
r.ID = int(lastInsertId)

if err := tx.Commit(); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error committing transaction: %v", err))
return err
}

return nil
}

func (r *PeerCountByOrigin) Clean(db *sql.DB, before int64) (int64, error) {
return common.Cleanup(db, "peerCountByOrigin", before)
}
4 changes: 2 additions & 2 deletions lib/metrics/receivedenvelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func (e *ErrorSendingEnvelope) Process(ctx context.Context, db *sql.DB, errs *co
if errors.Is(result.Err(), sql.ErrNoRows) {
return nil
} else {
errs.Append(data.ID, fmt.Sprintf("Error saving error sending envelope: %v", err))
return err
errs.Append(data.ID, fmt.Sprintf("Error saving error sending envelope: %v", result.Err()))
return result.Err()
}
}

Expand Down
13 changes: 8 additions & 5 deletions lib/metrics/receivedmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ type ReceivedMessage struct {
}

func (r *ReceivedMessage) Process(ctx context.Context, db *sql.DB, errs *common.MetricErrors, data *types.TelemetryRequest) error {
if err := json.Unmarshal(*data.TelemetryData, &r); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding received message failure: %v", err))
var messages []ReceivedMessage
if err := json.Unmarshal(*data.TelemetryData, &messages); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error decoding received messages: %v", err))
return err
}

if err := r.Put(ctx, db); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error saving received messages: %v", err))
return err
for _, message := range messages {
if err := message.Put(ctx, db); err != nil {
errs.Append(data.ID, fmt.Sprintf("Error saving received message: %v", err))
return err
}
}
return nil
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ const (
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailureMetric TelemetryType = "PeerConnFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
)

type Origin int64

const (
Unknown Origin = iota
Discv5
Static
PeerExchange
DNSDiscovery
Rendezvous
PeerManager
)

type TelemetryRequest struct {
Expand Down Expand Up @@ -103,3 +117,19 @@ type ReceivedMessage struct {
Topic string `json:"topic"`
PubsubTopic string `json:"pubsubTopic"`
}

type PeerCountByShard struct {
TelemetryRecord
ID int `json:"id"`
Count int `json:"count"`
Shard int `json:"shard"`
Timestamp int64 `json:"timestamp"`
}

type PeerCountByOrigin struct {
TelemetryRecord
ID int `json:"id"`
Count int `json:"count"`
Origin Origin `json:"origin"`
Timestamp int64 `json:"timestamp"`
}

0 comments on commit 4e937fa

Please sign in to comment.