From 1eac111c3cde97cab494a1edcf3220cf25bd0c16 Mon Sep 17 00:00:00 2001 From: Poorna Date: Mon, 21 Aug 2023 15:28:28 -0700 Subject: [PATCH] replication: additional fields for metrics (#1874) --- pkg/replication/replication.go | 186 +++++++++++++++++++++------------ 1 file changed, 120 insertions(+), 66 deletions(-) diff --git a/pkg/replication/replication.go b/pkg/replication/replication.go index 6776a0ba8..eb0c72c1f 100644 --- a/pkg/replication/replication.go +++ b/pkg/replication/replication.go @@ -689,49 +689,63 @@ func (e ExistingObjectReplication) Validate() error { // TargetMetrics represents inline replication metrics // such as pending, failed and completed bytes in total for a bucket remote target type TargetMetrics struct { - // Pending size in bytes - PendingSize uint64 `json:"pendingReplicationSize,omitempty"` + // Completed count + ReplicatedCount uint64 `json:"replicationCount,omitempty"` // Completed size in bytes ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"` - // Total Replica size in bytes - ReplicaSize uint64 `json:"replicaSize,omitempty"` - // Failed size in bytes - FailedSize uint64 `json:"failedReplicationSize,omitempty"` - // Total number of pending operations including metadata updates - PendingCount uint64 `json:"pendingReplicationCount,omitempty"` - // Total number of failed operations including metadata updates - FailedCount uint64 `json:"failedReplicationCount,omitempty"` // Bandwidth limit in bytes/sec for this target BandWidthLimitInBytesPerSecond int64 `json:"limitInBits,omitempty"` // Current bandwidth used in bytes/sec for this target CurrentBandwidthInBytesPerSecond float64 `json:"currentBandwidth,omitempty"` - // Completed count - ReplicatedCount uint64 `json:"replicationCount,omitempty"` - // transfer rate for large uploads - XferRateLrg XferStats `json:"largeTransferRate"` - // transfer rate for small uploads - XferRateSml XferStats `json:"smallTransferRate"` + // errors seen in replication in last minute, hour and total + Failed TimedErrStats `json:"failed,omitempty"` } // Metrics represents inline replication metrics for a bucket. type Metrics struct { Stats map[string]TargetMetrics - // Total Pending size in bytes across targets - PendingSize uint64 `json:"pendingReplicationSize,omitempty"` // Completed size in bytes across targets ReplicatedSize uint64 `json:"completedReplicationSize,omitempty"` // Total Replica size in bytes across targets ReplicaSize uint64 `json:"replicaSize,omitempty"` - // Failed size in bytes across targets - FailedSize uint64 `json:"failedReplicationSize,omitempty"` - // Total number of pending operations including metadata updates across targets - PendingCount uint64 `json:"pendingReplicationCount,omitempty"` - // Total number of failed operations including metadata updates across targets - FailedCount uint64 `json:"failedReplicationCount,omitempty"` // Total Replica counts ReplicaCount int64 `json:"replicaCount,omitempty"` // Total Replicated count ReplicatedCount int64 `json:"replicationCount,omitempty"` + // errors seen in replication in last minute, hour and total + Errors TimedErrStats `json:"failed,omitempty"` + // Total number of entries that are queued for replication + QStats InQueueMetric `json:"queued"` +} + +// RStat - has count and bytes for replication metrics +type RStat struct { + Count float64 `json:"count"` + Bytes int64 `json:"bytes"` +} + +// Add two RStat +func (r RStat) Add(r1 RStat) RStat { + return RStat{ + Count: r.Count + r1.Count, + Bytes: r.Bytes + r1.Bytes, + } +} + +// TimedErrStats holds error stats for a time period +type TimedErrStats struct { + LastMinute RStat `json:"lastMinute"` + LastHour RStat `json:"lastHour"` + Totals RStat `json:"totals"` +} + +// Add two TimedErrStats +func (te TimedErrStats) Add(o TimedErrStats) TimedErrStats { + return TimedErrStats{ + LastMinute: te.LastMinute.Add(o.LastMinute), + LastHour: te.LastHour.Add(o.LastHour), + Totals: te.Totals.Add(o.Totals), + } } // ResyncTargetsInfo provides replication target information to resync replicated data. @@ -767,10 +781,30 @@ type XferStats struct { CurrRate float64 `json:"currRate"` } -// InQueueStats holds stats for objects in replication queue -type InQueueStats struct { - Count int32 `json:"count"` - Bytes int64 `json:"bytes"` +// Merge two XferStats +func (x *XferStats) Merge(x1 XferStats) { + x.AvgRate += x1.AvgRate + x.PeakRate += x1.PeakRate + x.CurrRate += x1.CurrRate +} + +// QStat holds count and bytes for objects in replication queue +type QStat struct { + Count float64 `json:"count"` + Bytes float64 `json:"bytes"` +} + +// Add 2 QStat entries +func (q *QStat) Add(q1 QStat) { + q.Count += q1.Count + q.Bytes += q1.Bytes +} + +// InQueueMetric holds stats for objects in replication queue +type InQueueMetric struct { + Curr QStat `json:"curr" msg:"cq"` + Avg QStat `json:"avg" msg:"aq"` + Max QStat `json:"peak" msg:"pq"` } // MetricName name of replication metric @@ -785,16 +819,34 @@ const ( Total MetricName = "Total" ) +// WorkerStat has stats on number of replication workers +type WorkerStat struct { + Curr int32 `json:"curr"` + Avg float32 `json:"avg"` + Max int32 `json:"max"` +} + +// ReplMRFStats holds stats of MRF backlog saved to disk in the last 5 minutes +// and number of entries that failed replication after 3 retries +type ReplMRFStats struct { + LastFailedCount uint64 `json:"failedCount_last5min"` + // Count of unreplicated entries that were dropped after MRF retry limit reached since cluster start. + TotalDroppedCount uint64 `json:"droppedCount_since_uptime"` + // Bytes of unreplicated entries that were dropped after MRF retry limit reached since cluster start. + TotalDroppedBytes uint64 `json:"droppedBytes_since_uptime"` +} + // ReplQNodeStats holds stats for a node in replication queue type ReplQNodeStats struct { - NodeName string `json:"nodeName"` - Uptime int64 `json:"uptime"` - ActiveWorkers int32 `json:"activeWorkers"` + NodeName string `json:"nodeName"` + Uptime int64 `json:"uptime"` + Workers WorkerStat `json:"activeWorkers"` - XferStats map[MetricName]XferStats `json:"xferStats"` - TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"` + XferStats map[MetricName]XferStats `json:"transferSummary"` + TgtXferStats map[string]map[MetricName]XferStats `json:"tgtTransferStats"` - QStats map[MetricName]InQueueStats `json:"qStats"` + QStats InQueueMetric `json:"queueStats"` + MRFStats ReplMRFStats `json:"mrfStats"` } // ReplQueueStats holds stats for replication queue across nodes @@ -803,33 +855,54 @@ type ReplQueueStats struct { } // Workers returns number of workers across all nodes -func (q ReplQueueStats) Workers() int64 { - var workers int64 +func (q ReplQueueStats) Workers() (tot WorkerStat) { for _, node := range q.Nodes { - workers += int64(node.ActiveWorkers) + tot.Avg += node.Workers.Avg + tot.Curr += node.Workers.Curr + if tot.Max < node.Workers.Max { + tot.Max = node.Workers.Max + } + } + if len(q.Nodes) > 0 { + tot.Avg /= float32(len(q.Nodes)) + tot.Curr /= int32(len(q.Nodes)) } - return workers + return tot +} + +// qStatSummary returns cluster level stats for objects in replication queue +func (q ReplQueueStats) qStatSummary() InQueueMetric { + m := InQueueMetric{} + for _, v := range q.Nodes { + m.Avg.Add(v.QStats.Avg) + m.Curr.Add(v.QStats.Curr) + if m.Max.Count < v.QStats.Max.Count { + m.Max.Add(v.QStats.Max) + } + } + return m } // ReplQStats holds stats for objects in replication queue type ReplQStats struct { - Uptime int64 `json:"uptime"` - Workers int64 `json:"workers"` + Uptime int64 `json:"uptime"` + Workers WorkerStat `json:"workers"` XferStats map[MetricName]XferStats `json:"xferStats"` TgtXferStats map[string]map[MetricName]XferStats `json:"tgtXferStats"` - QStats map[MetricName]InQueueStats `json:"qStats"` + QStats InQueueMetric `json:"qStats"` + MRFStats ReplMRFStats `json:"mrfStats"` } // QStats returns cluster level stats for objects in replication queue func (q ReplQueueStats) QStats() (r ReplQStats) { - r.QStats = make(map[MetricName]InQueueStats) + r.QStats = q.qStatSummary() r.XferStats = make(map[MetricName]XferStats) r.TgtXferStats = make(map[string]map[MetricName]XferStats) + r.Workers = q.Workers() for _, node := range q.Nodes { - r.Workers += int64(node.ActiveWorkers) for arn := range node.TgtXferStats { xmap, ok := node.TgtXferStats[arn] if !ok { @@ -859,39 +932,20 @@ func (q ReplQueueStats) QStats() (r ReplQStats) { st.PeakRate = math.Max(st.PeakRate, v.PeakRate) r.XferStats[k] = st } - for k, v := range node.QStats { - st, ok := r.QStats[k] - if !ok { - st = InQueueStats{} - } - st.Count += v.Count - st.Bytes += v.Bytes - r.QStats[k] = st - } + r.MRFStats.LastFailedCount += node.MRFStats.LastFailedCount + r.MRFStats.TotalDroppedCount += node.MRFStats.TotalDroppedCount + r.MRFStats.TotalDroppedBytes += node.MRFStats.TotalDroppedBytes r.Uptime += node.Uptime } if len(q.Nodes) > 0 { - for k := range r.XferStats { - st := r.XferStats[k] - st.AvgRate /= float64(len(q.Nodes)) - st.CurrRate /= float64(len(q.Nodes)) - r.XferStats[k] = st - } - for arn := range r.TgtXferStats { - for m, v := range r.TgtXferStats[arn] { - v.AvgRate /= float64(len(q.Nodes)) - v.CurrRate /= float64(len(q.Nodes)) - r.TgtXferStats[arn][m] = v - } - } r.Uptime /= int64(len(q.Nodes)) // average uptime } - return } // MetricsV2 represents replication metrics for a bucket. type MetricsV2 struct { + Uptime int64 `json:"uptime"` CurrentStats Metrics `json:"currStats"` QueueStats ReplQueueStats `json:"queueStats"` }