Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use degraded mode results in NEG DualStack migration heuristics #2107

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions pkg/neg/syncers/dualstack/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type Migrator struct {
syncerKey types.NegSyncerKey
// metricsCollector is used for exporting metrics.
metricsCollector MetricsCollector
// errorStateChecker is used to check if the the transactionSyncer is in error
// state.
errorStateChecker errorStateChecker

// mu protects paused, continueInProgress and previousDetach.
mu sync.Mutex
Expand Down Expand Up @@ -94,16 +97,21 @@ type syncable interface {
Sync() bool
}

type errorStateChecker interface {
InErrorState() bool
}

type MetricsCollector interface {
CollectDualStackMigrationMetrics(key types.NegSyncerKey, committedEndpoints map[string]types.NetworkEndpointSet, migrationCount int)
}

func NewMigrator(enableDualStackNEG bool, syncer syncable, syncerKey types.NegSyncerKey, metricsCollector MetricsCollector, logger klog.Logger) *Migrator {
func NewMigrator(enableDualStackNEG bool, syncer syncable, syncerKey types.NegSyncerKey, metricsCollector MetricsCollector, errorStateChecker errorStateChecker, logger klog.Logger) *Migrator {
return &Migrator{
enableDualStack: enableDualStackNEG,
syncer: syncer,
syncerKey: syncerKey,
metricsCollector: metricsCollector,
errorStateChecker: errorStateChecker,
migrationWaitDuration: defaultMigrationWaitDuration,
previousDetachThreshold: defaultPreviousDetachThreshold,
fractionOfMigratingEndpoints: defaultFractionOfMigratingEndpoints,
Expand Down Expand Up @@ -227,19 +235,26 @@ func (d *Migrator) isPaused() bool {
// 3. If all zones have less than the desired number of endpoints, then all
// endpoints from the largest zone will be moved.
//
// 4. No endpoints will be moved if there are many endpoints waiting to be
// 4. No endpoints will be moved if (1) there are many endpoints waiting to be
// attached (as determined by the manyEndpointsWaitingToBeAttached()
// function) AND the previous successful detach was quite recent (as
// determined by the tooLongSincePreviousDetach() function)
// function) AND (2) we are in degraded mode OR the previous successful
// detach was quite recent (as determined by the
// tooLongSincePreviousDetach() function)
func (d *Migrator) calculateMigrationEndpointsToDetach(addEndpoints, removeEndpoints, committedEndpoints, migrationEndpoints map[string]types.NetworkEndpointSet) string {
addCount := endpointsCount(addEndpoints)
committedCount := endpointsCount(committedEndpoints)
migrationCount := endpointsCount(migrationEndpoints)

if d.manyEndpointsWaitingToBeAttached(addCount, committedCount, migrationCount) && !d.tooLongSincePreviousDetach() {
if d.manyEndpointsWaitingToBeAttached(addCount, committedCount, migrationCount) && (d.errorStateChecker.InErrorState() || !d.tooLongSincePreviousDetach()) {
d.logger.V(1).Info("Not starting migration detachments; Too many attachments are pending and the threshold for forceful detach hasn't been reached.",
"addCount", addCount, "committedCount", committedCount, "migrationCount", migrationCount, "fractionForPendingAttachmentThreshold", d.fractionForPendingAttachmentThreshold,
"previousDetach", d.previousDetach, "previousDetachThreshold", d.previousDetachThreshold)
"addCount", addCount,
"committedCount", committedCount,
"migrationCount", migrationCount,
"fractionForPendingAttachmentThreshold", d.fractionForPendingAttachmentThreshold,
"inErrorState", d.errorStateChecker.InErrorState(),
"previousDetach", d.previousDetach,
"previousDetachThreshold", d.previousDetachThreshold,
)
return ""
}

Expand Down
51 changes: 44 additions & 7 deletions pkg/neg/syncers/dualstack/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ func (f *fakeSyncable) Sync() bool {
return true
}

type fakeErrorStateChecker struct {
errorState bool
}

func (f *fakeErrorStateChecker) InErrorState() bool {
return f.errorState
}

func TestContinue_NoInputError_ShouldChangeTimeSincePreviousDetach(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -378,8 +386,8 @@ func TestCalculateMigrationEndpointsToDetach(t *testing.T) {
wantCurrentlyMigratingCount: 2,
},
{
// If there are many endpoints waiting to be attached and the most recent
// migration was not too long ago, then we will not start any new
// If there are many endpoints waiting to be attached AND the most recent
// migration was NOT too long ago, then we will not start any new
// detachments since we wait for the pending attaches to complete
desc: "many endpoints are waiting to be attached AND previous migration was quite recent",
addEndpoints: map[string]types.NetworkEndpointSet{
Expand All @@ -406,10 +414,39 @@ func TestCalculateMigrationEndpointsToDetach(t *testing.T) {
wantCurrentlyMigratingCount: 0,
},
{
// If there are many endpoints waiting to be attached but the most recent
// migration was too long ago, then we don't want to keep waiting
// indefinitely for the next detach and we proceed with the detachments.
desc: "many endpoints are waiting to be attached BUT previous migration was too long ago",
// If there are many endpoints waiting to be attached AND the most recent
// migration was too long ago BUT we are in error state, then we will not
// start any new detachments since we wait to get out of error state.
desc: "many endpoints are waiting to be attached AND previous migration was too long ago BUT in error state",
addEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "1"}, {IP: "2"}, {IP: "3"}, {IP: "4"}, {IP: "5"},
}...),
},
removeEndpoints: map[string]types.NetworkEndpointSet{},
committedEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "6"},
}...),
},
migrationEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "7"},
}...),
},
migrator: func() *Migrator {
m := newMigratorForTest(true)
m.errorStateChecker.(*fakeErrorStateChecker).errorState = true
return m
}(),
wantCurrentlyMigratingCount: 0,
},
{
// If there are many endpoints waiting to be attached BUT the most recent
// migration was too long ago AND we are not in error state, then we don't
// want to keep waiting indefinitely for the next detach and we proceed
// with the detachments.
desc: "many endpoints are waiting to be attached BUT previous migration was too long ago AND not in error state",
addEndpoints: map[string]types.NetworkEndpointSet{
"zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{
{IP: "1"}, {IP: "2"}, {IP: "3"}, {IP: "4"}, {IP: "5"},
Expand Down Expand Up @@ -774,5 +811,5 @@ func cloneZoneNetworkEndpointsMap(m map[string]types.NetworkEndpointSet) map[str
}

func newMigratorForTest(enableDualStackNEG bool) *Migrator {
return NewMigrator(enableDualStackNEG, &fakeSyncable{}, types.NegSyncerKey{}, metrics.FakeSyncerMetrics(), klog.Background())
return NewMigrator(enableDualStackNEG, &fakeSyncable{}, types.NegSyncerKey{}, metrics.FakeSyncerMetrics(), &fakeErrorStateChecker{}, klog.Background())
}
7 changes: 6 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func NewTransactionSyncer(
// transactionSyncer needs syncer interface for internals
ts.syncer = syncer
ts.retry = NewDelayRetryHandler(func() { syncer.Sync() }, NewExponentialBackendOffHandler(maxRetries, minRetryDelay, maxRetryDelay))
ts.dsMigrator = dualstack.NewMigrator(enableDualStackNEG, syncer, negSyncerKey, syncerMetrics, logger)
ts.dsMigrator = dualstack.NewMigrator(enableDualStackNEG, syncer, negSyncerKey, syncerMetrics, ts, logger)
return syncer
}

Expand Down Expand Up @@ -362,6 +362,11 @@ func (s *transactionSyncer) inErrorState() bool {
return s.errorState
}

// InErrorState is a wrapper for exporting inErrorState().
func (s *transactionSyncer) InErrorState() bool {
return s.inErrorState()
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) setErrorState() {
s.errorState = true
Expand Down