Skip to content

Commit

Permalink
Merge pull request #2095 from gauravkghildiyal/migrator-metrics
Browse files Browse the repository at this point in the history
Add new metrics for NEG DualStack Migrator
  • Loading branch information
k8s-ci-robot authored May 10, 2023
2 parents 6cc961f + 231e662 commit 94a9c31
Show file tree
Hide file tree
Showing 7 changed files with 483 additions and 20 deletions.
46 changes: 46 additions & 0 deletions pkg/neg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ const (

NotInDegradedEndpoints = "not_in_degraded_endpoints"
OnlyInDegradedEndpoints = "only_in_degraded_endpoints"

// Classification of endpoints within a NEG.
ipv4EndpointType = "IPv4"
ipv6EndpointType = "IPv6"
dualStackEndpointType = "DualStack"
migrationEndpointType = "Migration"
)

type syncType string
Expand Down Expand Up @@ -169,6 +175,42 @@ var (
},
degradedModeCorrectnessLabels,
)

DualStackMigrationFinishedDurations = prometheus.NewHistogram(
prometheus.HistogramOpts{
Subsystem: negControllerSubsystem,
Name: "dual_stack_migration_finished_durations_seconds",
Help: "Time taken to migrate all endpoints within all NEGs for a service port",
// Buckets ~= [1s, 1.85s, 3.42s, 6s, 11s, 21s, 40s, 1m14s, 2m17s, 4m13s, 7m49s, 14m28s, 26m47s, 49m33s, 1h31m40s, 2h49m35s, 5h13m45s, 9h40m27s, +Inf]
Buckets: prometheus.ExponentialBuckets(1, 1.85, 18),
},
)

// A zero value for this metric means that there are no ongoing migrations.
DualStackMigrationLongestUnfinishedDuration = prometheus.NewGauge(
prometheus.GaugeOpts{
Subsystem: negControllerSubsystem,
Name: "dual_stack_migration_longest_unfinished_duration_seconds",
Help: "Longest time elapsed since a migration was started which hasn't yet completed",
},
)

DualStackMigrationServiceCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Subsystem: negControllerSubsystem,
Name: "dual_stack_migration_service_count",
Help: "Number of Services which have migration endpoints",
},
)

SyncerCountByEndpointType = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: negControllerSubsystem,
Name: "syncer_count_by_endpoint_type",
Help: "Number of Syncers managing NEGs containing endpoint of a particular kind",
},
[]string{"endpoint_type"},
)
)

var register sync.Once
Expand All @@ -188,6 +230,10 @@ func RegisterMetrics() {
prometheus.MustRegister(LabelNumber)
prometheus.MustRegister(AnnotationSize)
prometheus.MustRegister(DegradeModeCorrectness)
prometheus.MustRegister(DualStackMigrationFinishedDurations)
prometheus.MustRegister(DualStackMigrationLongestUnfinishedDuration)
prometheus.MustRegister(DualStackMigrationServiceCount)
prometheus.MustRegister(SyncerCountByEndpointType)

RegisterSyncerMetrics()
})
Expand Down
169 changes: 165 additions & 4 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ limitations under the License.
package metrics

import (
"fmt"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

type SyncerMetricsCollector interface {
Expand All @@ -35,6 +38,11 @@ type SyncerMetricsCollector interface {
}

type SyncerMetrics struct {
clock clock.Clock
// duration between metrics exports
metricsInterval time.Duration

mu sync.Mutex
// syncerStateMap tracks the status of each syncer
syncerStateMap map[negtypes.NegSyncerKey]negtypes.Reason
// syncerEndpointStateMap is a map between syncer and endpoint state counts.
Expand All @@ -43,10 +51,14 @@ type SyncerMetrics struct {
syncerEndpointSliceStateMap map[negtypes.NegSyncerKey]negtypes.StateCountMap
// syncerLabelProagationStats is a map between syncer and label propagation stats.
syncerLabelProagationStats map[negtypes.NegSyncerKey]LabelPropagationStats
// mu avoid race conditions and ensure correctness of metrics
mu sync.Mutex
// duration between metrics exports
metricsInterval time.Duration
// Stores the time when the migration started for each Syncer.
dualStackMigrationStartTime map[negtypes.NegSyncerKey]time.Time
// Stores the time when the migration finished for each Syncer.
dualStackMigrationEndTime map[negtypes.NegSyncerKey]time.Time
// Stores the count of various kinds of endpoints which each syncer manages.
// Refer neg/metrics.go for the kinds of endpoints.
endpointsCountPerType map[negtypes.NegSyncerKey]map[string]int

// logger logs message related to NegMetricsCollector
logger klog.Logger
}
Expand All @@ -58,6 +70,10 @@ func NewNegMetricsCollector(exportInterval time.Duration, logger klog.Logger) *S
syncerEndpointStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
syncerEndpointSliceStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
syncerLabelProagationStats: make(map[negtypes.NegSyncerKey]LabelPropagationStats),
dualStackMigrationStartTime: make(map[negtypes.NegSyncerKey]time.Time),
dualStackMigrationEndTime: make(map[negtypes.NegSyncerKey]time.Time),
endpointsCountPerType: make(map[negtypes.NegSyncerKey]map[string]int),
clock: clock.RealClock{},
metricsInterval: exportInterval,
logger: logger.WithName("NegMetricsCollector"),
}
Expand Down Expand Up @@ -104,6 +120,21 @@ func (sm *SyncerMetrics) export() {
}

sm.logger.V(3).Info("Exporting syncer related metrics", "Syncer count", syncerCount, "Number of Endpoints", lpMetrics.NumberOfEndpoints)

finishedDurations, longestUnfinishedDurations := sm.computeDualStackMigrationDurations()
for _, duration := range finishedDurations {
DualStackMigrationFinishedDurations.Observe(float64(duration))
}
DualStackMigrationLongestUnfinishedDuration.Set(float64(longestUnfinishedDurations))

syncerCountByEndpointType, migrationEndpointCount, migrationServicesCount := sm.computeDualStackMigrationCounts()
for endpointType, count := range syncerCountByEndpointType {
SyncerCountByEndpointType.WithLabelValues(endpointType).Set(float64(count))
}
syncerEndpointState.WithLabelValues(string(negtypes.DualStackMigration)).Set(float64(migrationEndpointCount))
DualStackMigrationServiceCount.Set(float64(migrationServicesCount))

sm.logger.V(3).Info("Exported DualStack Migration metrics")
}

// UpdateSyncerStatusInMetrics update the status of syncer based on the error
Expand Down Expand Up @@ -150,13 +181,18 @@ func (sm *SyncerMetrics) SetLabelPropagationStats(key negtypes.NegSyncerKey, lab
sm.syncerLabelProagationStats[key] = labelstatLabelPropagationStats
}

// DeleteSyncer will reset any metrics for the syncer corresponding to `key`. It
// should be invoked when a Syncer has been stopped.
func (sm *SyncerMetrics) DeleteSyncer(key negtypes.NegSyncerKey) {
sm.mu.Lock()
defer sm.mu.Unlock()
delete(sm.syncerStateMap, key)
delete(sm.syncerEndpointStateMap, key)
delete(sm.syncerEndpointSliceStateMap, key)
delete(sm.syncerLabelProagationStats, key)
delete(sm.dualStackMigrationStartTime, key)
delete(sm.dualStackMigrationEndTime, key)
delete(sm.endpointsCountPerType, key)
}

// computeLabelMetrics aggregates label propagation metrics.
Expand Down Expand Up @@ -205,3 +241,128 @@ func (sm *SyncerMetrics) computeEndpointStateMetrics(forDegradedMode bool) (negt
}
return epCounts, epsCounts
}

// CollectDualStackMigrationMetrics will be used by dualstack.Migrator to export
// metrics.
func (sm *SyncerMetrics) CollectDualStackMigrationMetrics(key negtypes.NegSyncerKey, committedEndpoints map[string]negtypes.NetworkEndpointSet, migrationCount int) {
sm.updateMigrationStartAndEndTime(key, migrationCount)
sm.updateEndpointsCountPerType(key, committedEndpoints, migrationCount)
}

func (sm *SyncerMetrics) updateMigrationStartAndEndTime(key negtypes.NegSyncerKey, migrationCount int) {
sm.mu.Lock()
defer sm.mu.Unlock()

_, hasStartTime := sm.dualStackMigrationStartTime[key]
_, hasEndTime := sm.dualStackMigrationEndTime[key]

if migrationCount == 0 {
//
// Migration has finished or it never started.
//
if !hasStartTime {
// Migration was never started.
return
}
if hasEndTime {
// Migration was already finished in some previous invocation.
return
}
sm.dualStackMigrationEndTime[key] = sm.clock.Now()
return
}

//
// Migration has started or it was already in progress.
//
if hasEndTime {
// A previous migration was completed but there are still migrating
// endpoints so extend the previous migration time.
delete(sm.dualStackMigrationEndTime, key)
}
if hasStartTime {
// Migration was already started in some previous invocation.
return
}
sm.dualStackMigrationStartTime[key] = sm.clock.Now()
}

func (sm *SyncerMetrics) updateEndpointsCountPerType(key negtypes.NegSyncerKey, committedEndpoints map[string]negtypes.NetworkEndpointSet, migrationCount int) {
sm.mu.Lock()
defer sm.mu.Unlock()

ipv4OnlyCount, ipv6OnlyCount, dualStackCount := 0, 0, 0
for _, endpointSet := range committedEndpoints {
for endpoint := range endpointSet {
if endpoint.IP != "" && endpoint.IPv6 != "" {
dualStackCount++
continue
}
if endpoint.IP != "" {
ipv4OnlyCount++
}
if endpoint.IPv6 != "" {
ipv6OnlyCount++
}
}
}
sm.endpointsCountPerType[key] = map[string]int{
ipv4EndpointType: ipv4OnlyCount,
ipv6EndpointType: ipv6OnlyCount,
dualStackEndpointType: dualStackCount,
migrationEndpointType: migrationCount,
}
}

func (sm *SyncerMetrics) computeDualStackMigrationDurations() ([]int, int) {
sm.mu.Lock()
defer sm.mu.Unlock()

finishedDurations, longestUnfinishedDuration := make([]int, 0), 0
for key, startTime := range sm.dualStackMigrationStartTime {
endTime, ok := sm.dualStackMigrationEndTime[key]
if !ok {
if curUnfinishedDuration := int(sm.clock.Since(startTime).Seconds()); curUnfinishedDuration > longestUnfinishedDuration {
longestUnfinishedDuration = curUnfinishedDuration
}
continue
}
finishedDurations = append(finishedDurations, int(endTime.Sub(startTime).Seconds()))
// Prevent metrics from being re-emitted by deleting the syncer key whose
// migrations have finished.
delete(sm.dualStackMigrationStartTime, key)
delete(sm.dualStackMigrationEndTime, key)
}

return finishedDurations, longestUnfinishedDuration
}

func (sm *SyncerMetrics) computeDualStackMigrationCounts() (map[string]int, int, int) {
sm.mu.Lock()
defer sm.mu.Unlock()

// It's important to explicitly initialize all types to zero so that their
// counts get reset when the metrics are published.
syncerCountByEndpointType := map[string]int{
ipv4EndpointType: 0,
ipv6EndpointType: 0,
dualStackEndpointType: 0,
migrationEndpointType: 0,
}
migrationEndpointCount := 0
migrationServices := sets.NewString()

for syncerKey, syncerEndpointsCountPerType := range sm.endpointsCountPerType {
for endpointType, count := range syncerEndpointsCountPerType {
if count != 0 {
syncerCountByEndpointType[endpointType]++
}
}

if count := syncerEndpointsCountPerType[migrationEndpointType]; count != 0 {
migrationServices.Insert(fmt.Sprintf("%s/%s", syncerKey.Namespace, syncerKey.Name))
migrationEndpointCount += count
}
}
return syncerCountByEndpointType, migrationEndpointCount, migrationServices.Len()
}
Loading

0 comments on commit 94a9c31

Please sign in to comment.