diff --git a/pkg/controller/job/BUILD b/pkg/controller/job/BUILD index fb07cf0ee3dc0..fbcbe01f5c9e4 100644 --- a/pkg/controller/job/BUILD +++ b/pkg/controller/job/BUILD @@ -33,6 +33,7 @@ go_library( "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index d1e24fc5ac8e0..510b41525de55 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -39,6 +39,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" @@ -49,6 +50,13 @@ import ( // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = batch.SchemeGroupVersion.WithKind("Job") +const ( + // DefaultJobBackOff is the max backoff period, exported for the e2e test + DefaultJobBackOff = 10 * time.Second + // MaxJobBackOff is the max backoff period, exported for the e2e test + MaxJobBackOff = 3600 * time.Second +) + type JobController struct { kubeClient clientset.Interface podControl controller.PodControlInterface @@ -75,6 +83,9 @@ type JobController struct { // Jobs that need to be updated queue workqueue.RateLimitingInterface + // backoff + backoff *flowcontrol.Backoff + recorder record.EventRecorder } @@ -100,9 +111,17 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: jm.enqueueController, + AddFunc: func(obj interface{}) { + if job := obj.(*batch.Job); job != nil { + jm.enqueueJob(job) + } + }, UpdateFunc: jm.updateJob, - DeleteFunc: jm.enqueueController, + DeleteFunc: func(obj interface{}) { + if job := obj.(*batch.Job); job != nil { + jm.enqueueJob(job) + } + }, }) jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced @@ -117,6 +136,9 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob + + jm.backoff = flowcontrol.NewBackOff(DefaultJobBackOff, MaxJobBackOff) + return jm } @@ -136,6 +158,8 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { go wait.Until(jm.worker, time.Second, stopCh) } + flowcontrol.StartBackoffGC(jm.backoff, stopCh) + <-stopCh } @@ -199,7 +223,7 @@ func (jm *JobController) addPod(obj interface{}) { return } jm.expectations.CreationObserved(jobKey) - jm.enqueueController(job) + jm.enqueueJob(job) return } @@ -208,7 +232,7 @@ func (jm *JobController) addPod(obj interface{}) { // DO NOT observe creation because no controller should be waiting for an // orphan. for _, job := range jm.getPodJobs(pod) { - jm.enqueueController(job) + jm.enqueueJob(job) } } @@ -240,7 +264,7 @@ func (jm *JobController) updatePod(old, cur interface{}) { if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { - jm.enqueueController(job) + jm.enqueueJob(job) } } @@ -250,7 +274,7 @@ func (jm *JobController) updatePod(old, cur interface{}) { if job == nil { return } - jm.enqueueController(job) + jm.enqueueJob(job) return } @@ -258,7 +282,7 @@ func (jm *JobController) updatePod(old, cur interface{}) { // to see if anyone wants to adopt it now. if labelChanged || controllerRefChanged { for _, job := range jm.getPodJobs(curPod) { - jm.enqueueController(job) + jm.enqueueJob(job) } } } @@ -299,7 +323,7 @@ func (jm *JobController) deletePod(obj interface{}) { return } jm.expectations.DeletionObserved(jobKey) - jm.enqueueController(job) + jm.enqueueJob(job) } func (jm *JobController) updateJob(old, cur interface{}) { @@ -311,7 +335,7 @@ func (jm *JobController) updateJob(old, cur interface{}) { if err != nil { return } - jm.queue.Add(key) + jm.enqueueJob(curJob) // check if need to add a new rsync for ActiveDeadlineSeconds if curJob.Status.StartTime != nil { curADS := curJob.Spec.ActiveDeadlineSeconds @@ -332,20 +356,23 @@ func (jm *JobController) updateJob(old, cur interface{}) { } // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. -func (jm *JobController) enqueueController(obj interface{}) { - key, err := controller.KeyFunc(obj) +func (jm *JobController) enqueueJob(job *batch.Job) { + key, err := controller.KeyFunc(job) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err)) return } + // Retrieves the backoff duration for this Job + backoff, _ := jm.backoff.GetWithRetryNumber(key) + // TODO: Handle overlapping controllers better. Either disallow them at admission time or // deterministically avoid syncing controllers that fight over pods. Currently, we only // ensure that the same controller is synced for a given pod. When we periodically relist // all controllers there will still be some replica instability. One way to handle this is // by querying the store for all controllers that this rc overlaps, as well as all // controllers that overlap this rc, and sorting them. - jm.queue.Add(key) + jm.queue.AddAfter(key, backoff) } // worker runs a worker thread that just dequeues items, processes them, and marks them done. @@ -431,6 +458,15 @@ func (jm *JobController) syncJob(key string) error { } job := *sharedJob + // if job was finished previously, we don't want to redo the termination + if IsJobFinished(&job) { + jm.backoff.Reset(key) + return nil + } + + // retrieve the previous number of retry + _, previousRetry := jm.backoff.GetWithRetryNumber(key) + // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. @@ -456,34 +492,28 @@ func (jm *JobController) syncJob(key string) error { jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) } } - // if job was finished previously, we don't want to redo the termination - if IsJobFinished(&job) { - return nil - } var manageJobErr error - if pastActiveDeadline(&job) { - // TODO: below code should be replaced with pod termination resulting in - // pod failures, rather than killing pods. Unfortunately none such solution - // exists ATM. There's an open discussion in the topic in - // https://github.com/kubernetes/kubernetes/issues/14602 which might give - // some sort of solution to above problem. - // kill remaining active pods - wait := sync.WaitGroup{} - errCh := make(chan error, int(active)) - wait.Add(int(active)) - for i := int32(0); i < active; i++ { - go func(ix int32) { - defer wait.Done() - if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil { - defer utilruntime.HandleError(err) - glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", activePods[ix].Name, job.Namespace, job.Name) - errCh <- err - } - }(i) - } - wait.Wait() - + jobFailed := false + var failureReason string + var failureMessage string + + jobHaveNewFailure := failed > job.Status.Failed + + // check if the number of failed jobs increased since the last syncJob + if jobHaveNewFailure && (previousRetry+1 > *job.Spec.BackoffLimit) { + jobFailed = true + failureReason = "BackoffLimitExceeded" + failureMessage = "Job has reach the specified backoff limit" + } else if pastActiveDeadline(&job) { + jobFailed = true + failureReason = "DeadlineExceeded" + failureMessage = "Job was active longer than specified deadline" + } + + if jobFailed { + errCh := make(chan error, active) + jm.deleteJobPods(&job, activePods, errCh) select { case manageJobErr = <-errCh: if manageJobErr != nil { @@ -495,8 +525,8 @@ func (jm *JobController) syncJob(key string) error { // update status values accordingly failed += active active = 0 - job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline")) - jm.recorder.Event(&job, v1.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline") + job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage)) + jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) } else { if jobNeedsSync && job.DeletionTimestamp == nil { active, manageJobErr = jm.manageJob(activePods, succeeded, &job) @@ -545,9 +575,42 @@ func (jm *JobController) syncJob(key string) error { return err } } + + if jobHaveNewFailure { + jm.backoff.NextWithInitDuration(key, time.Duration(*job.Spec.BackoffSeconds), time.Now()) + // re-enqueue Job after the backoff period + jm.enqueueJob(&job) + } else { + // if no new Failure the job backoff period can be reset + jm.backoff.Reset(key) + } + return manageJobErr } +func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) { + // TODO: below code should be replaced with pod termination resulting in + // pod failures, rather than killing pods. Unfortunately none such solution + // exists ATM. There's an open discussion in the topic in + // https://github.com/kubernetes/kubernetes/issues/14602 which might give + // some sort of solution to above problem. + // kill remaining active pods + wait := sync.WaitGroup{} + nbPods := len(pods) + wait.Add(nbPods) + for i := int32(0); i < int32(nbPods); i++ { + go func(ix int32) { + defer wait.Done() + if err := jm.podControl.DeletePod(job.Namespace, pods[ix].Name, job); err != nil { + defer utilruntime.HandleError(err) + glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", pods[ix].Name, job.Namespace, job.Name) + errCh <- err + } + }(i) + } + wait.Wait() +} + // pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. func pastActiveDeadline(job *batch.Job) bool { if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 51e909a93642c..b64f349b7bc11 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -43,7 +43,7 @@ import ( var alwaysReady = func() bool { return true } -func newJob(parallelism, completions int32) *batch.Job { +func newJob(parallelism, completions, backoffLimit int32, backoffSeconds int64) *batch.Job { j := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", @@ -80,6 +80,9 @@ func newJob(parallelism, completions int32) *batch.Job { } else { j.Spec.Parallelism = nil } + j.Spec.BackoffLimit = &backoffLimit + j.Spec.BackoffSeconds = &backoffSeconds + return j } @@ -119,11 +122,16 @@ func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod { } func TestControllerSyncJob(t *testing.T) { + jobConditionComplete := batch.JobComplete + jobConditionFailed := batch.JobFailed + testCases := map[string]struct { // job setup - parallelism int32 - completions int32 - deleting bool + parallelism int32 + completions int32 + backoffLimit int32 + backoffSeconds int64 + deleting bool // pod setup podControllerError error @@ -133,102 +141,109 @@ func TestControllerSyncJob(t *testing.T) { failedPods int32 // expectations - expectedCreations int32 - expectedDeletions int32 - expectedActive int32 - expectedSucceeded int32 - expectedFailed int32 - expectedComplete bool + expectedCreations int32 + expectedDeletions int32 + expectedActive int32 + expectedSucceeded int32 + expectedFailed int32 + expectedCondition *batch.JobConditionType + expectedConditionReason string }{ "job start": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 0, 0, 0, 0, - 2, 0, 2, 0, 0, false, + 2, 0, 2, 0, 0, nil, "", }, "WQ job start": { - 2, -1, false, + 2, -1, 6, 10, false, nil, 0, 0, 0, 0, - 2, 0, 2, 0, 0, false, + 2, 0, 2, 0, 0, nil, "", }, "pending pods": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 2, 0, 0, 0, - 0, 0, 2, 0, 0, false, + 0, 0, 2, 0, 0, nil, "", }, "correct # of pods": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 0, 2, 0, 0, - 0, 0, 2, 0, 0, false, + 0, 0, 2, 0, 0, nil, "", }, "WQ job: correct # of pods": { - 2, -1, false, + 2, -1, 6, 10, false, nil, 0, 2, 0, 0, - 0, 0, 2, 0, 0, false, + 0, 0, 2, 0, 0, nil, "", }, "too few active pods": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 0, 1, 1, 0, - 1, 0, 2, 1, 0, false, + 1, 0, 2, 1, 0, nil, "", }, "too few active pods with a dynamic job": { - 2, -1, false, + 2, -1, 6, 10, false, nil, 0, 1, 0, 0, - 1, 0, 2, 0, 0, false, + 1, 0, 2, 0, 0, nil, "", }, "too few active pods, with controller error": { - 2, 5, false, + 2, 5, 6, 10, false, fmt.Errorf("Fake error"), 0, 1, 1, 0, - 1, 0, 1, 1, 0, false, + 1, 0, 1, 1, 0, nil, "", }, "too many active pods": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 0, 3, 0, 0, - 0, 1, 2, 0, 0, false, + 0, 1, 2, 0, 0, nil, "", }, "too many active pods, with controller error": { - 2, 5, false, + 2, 5, 6, 10, false, fmt.Errorf("Fake error"), 0, 3, 0, 0, - 0, 1, 3, 0, 0, false, + 0, 1, 3, 0, 0, nil, "", }, "failed pod": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 0, 1, 1, 1, - 1, 0, 2, 1, 1, false, + 1, 0, 2, 1, 1, nil, "", }, "job finish": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 0, 0, 5, 0, - 0, 0, 0, 5, 0, true, + 0, 0, 0, 5, 0, nil, "", }, "WQ job finishing": { - 2, -1, false, + 2, -1, 6, 10, false, nil, 0, 1, 1, 0, - 0, 0, 1, 1, 0, false, + 0, 0, 1, 1, 0, nil, "", }, "WQ job all finished": { - 2, -1, false, + 2, -1, 6, 10, false, nil, 0, 0, 2, 0, - 0, 0, 0, 2, 0, true, + 0, 0, 0, 2, 0, &jobConditionComplete, "", }, "WQ job all finished despite one failure": { - 2, -1, false, + 2, -1, 6, 10, false, nil, 0, 0, 1, 1, - 0, 0, 0, 1, 1, true, + 0, 0, 0, 1, 1, &jobConditionComplete, "", }, "more active pods than completions": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 0, 10, 0, 0, - 0, 8, 2, 0, 0, false, + 0, 8, 2, 0, 0, nil, "", }, "status change": { - 2, 5, false, + 2, 5, 6, 10, false, nil, 0, 2, 2, 0, - 0, 0, 2, 2, 0, false, + 0, 0, 2, 2, 0, nil, "", }, "deleting job": { - 2, 5, true, + 2, 5, 6, 10, true, nil, 1, 1, 1, 0, - 0, 0, 2, 1, 0, false, + 0, 0, 2, 1, 0, nil, "", + }, + + "to many job sync failure": { + 2, 5, 0, 10, true, + nil, 0, 0, 0, 1, + 0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, } @@ -247,7 +262,7 @@ func TestControllerSyncJob(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.backoffSeconds) if tc.deleting { now := metav1.Now() job.DeletionTimestamp = &now @@ -324,7 +339,7 @@ func TestControllerSyncJob(t *testing.T) { t.Errorf("%s: .status.startTime was not set", name) } // validate conditions - if tc.expectedComplete && !getCondition(actual, batch.JobComplete) { + if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) { t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions) } } @@ -337,6 +352,8 @@ func TestSyncJobPastDeadline(t *testing.T) { completions int32 activeDeadlineSeconds int64 startTime int64 + backoffLimit int32 + backoffSeconds int64 // pod setup activePods int32 @@ -344,25 +361,31 @@ func TestSyncJobPastDeadline(t *testing.T) { failedPods int32 // expectations - expectedDeletions int32 - expectedActive int32 - expectedSucceeded int32 - expectedFailed int32 + expectedDeletions int32 + expectedActive int32 + expectedSucceeded int32 + expectedFailed int32 + expectedConditionReason string }{ "activeDeadlineSeconds less than single pod execution": { - 1, 1, 10, 15, + 1, 1, 10, 15, 6, 10, 1, 0, 0, - 1, 0, 0, 1, + 1, 0, 0, 1, "DeadlineExceeded", }, "activeDeadlineSeconds bigger than single pod execution": { - 1, 2, 10, 15, + 1, 2, 10, 15, 6, 10, 1, 1, 0, - 1, 0, 1, 1, + 1, 0, 1, 1, "DeadlineExceeded", }, "activeDeadlineSeconds times-out before any pod starts": { - 1, 1, 10, 10, + 1, 1, 10, 10, 6, 10, 0, 0, 0, - 0, 0, 0, 0, + 0, 0, 0, 0, "DeadlineExceeded", + }, + "activeDeadlineSeconds with backofflimit reach": { + 1, 1, 1, 10, 0, 1, + 1, 0, 2, + 1, 0, 0, 3, "BackoffLimitExceeded", }, } @@ -381,7 +404,7 @@ func TestSyncJobPastDeadline(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.backoffSeconds) job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0) job.Status.StartTime = &start @@ -424,15 +447,15 @@ func TestSyncJobPastDeadline(t *testing.T) { t.Errorf("%s: .status.startTime was not set", name) } // validate conditions - if !getCondition(actual, batch.JobFailed) { + if !getCondition(actual, batch.JobFailed, tc.expectedConditionReason) { t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions) } } } -func getCondition(job *batch.Job, condition batch.JobConditionType) bool { +func getCondition(job *batch.Job, condition batch.JobConditionType, reason string) bool { for _, v := range job.Status.Conditions { - if v.Type == condition && v.Status == v1.ConditionTrue { + if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason { return true } } @@ -452,7 +475,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { return nil } - job := newJob(1, 1) + job := newJob(1, 1, 6, 10) activeDeadlineSeconds := int64(10) job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds start := metav1.Unix(metav1.Now().Time.Unix()-15, 0) @@ -482,7 +505,7 @@ func TestSyncJobComplete(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - job := newJob(1, 1) + job := newJob(1, 1, 6, 10) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(getKey(job, t)) @@ -507,7 +530,7 @@ func TestSyncJobDeleted(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } - job := newJob(2, 2) + job := newJob(2, 2, 6, 10) err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) @@ -532,7 +555,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { manager.queue.AddRateLimited(getKey(job, t)) return updateError } - job := newJob(2, 2) + job := newJob(2, 2, 6, 10) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(getKey(job, t)) if err == nil || err != updateError { @@ -645,9 +668,9 @@ func TestGetPodsForJob(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -686,7 +709,7 @@ func TestGetPodsForJob(t *testing.T) { } func TestGetPodsForJobAdopt(t *testing.T) { - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" clientset := fake.NewSimpleClientset(job1) jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) @@ -712,7 +735,7 @@ func TestGetPodsForJobAdopt(t *testing.T) { } func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" job1.DeletionTimestamp = &metav1.Time{} clientset := fake.NewSimpleClientset(job1) @@ -742,7 +765,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { } func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) { - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" // The up-to-date object says it's being deleted. job1.DeletionTimestamp = &metav1.Time{} @@ -781,7 +804,7 @@ func TestGetPodsForJobRelease(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -810,9 +833,9 @@ func TestAddPod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -855,11 +878,11 @@ func TestAddPodOrphan(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" - job3 := newJob(1, 1) + job3 := newJob(1, 1, 6, 10) job3.Name = "job3" job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -883,9 +906,9 @@ func TestUpdatePod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -932,9 +955,9 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -959,9 +982,9 @@ func TestUpdatePodChangeControllerRef(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -985,9 +1008,9 @@ func TestUpdatePodRelease(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1011,9 +1034,9 @@ func TestDeletePod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1056,11 +1079,11 @@ func TestDeletePodOrphan(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6, 10) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6, 10) job2.Name = "job2" - job3 := newJob(1, 1) + job3 := newJob(1, 1, 6, 10) job3.Name = "job3" job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -1099,7 +1122,7 @@ func TestSyncJobExpectations(t *testing.T) { manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } - job := newJob(2, 2) + job := newJob(2, 2, 6, 10) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) pods := newPodList(2, v1.PodPending, job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() @@ -1167,7 +1190,7 @@ func TestWatchJobs(t *testing.T) { } func TestWatchPods(t *testing.T) { - testJob := newJob(2, 2) + testJob := newJob(2, 2, 6, 10) clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) @@ -1217,3 +1240,9 @@ func bumpResourceVersion(obj metav1.Object) { ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) } + +func newInt(val int32) *int32 { + p := new(int32) + *p = val + return p +} diff --git a/staging/src/k8s.io/client-go/util/flowcontrol/backoff.go b/staging/src/k8s.io/client-go/util/flowcontrol/backoff.go index 71d442a62b4b2..540073849af32 100644 --- a/staging/src/k8s.io/client-go/util/flowcontrol/backoff.go +++ b/staging/src/k8s.io/client-go/util/flowcontrol/backoff.go @@ -27,6 +27,7 @@ import ( type backoffEntry struct { backoff time.Duration lastUpdate time.Time + retry int32 } type Backoff struct { @@ -57,25 +58,39 @@ func NewBackOff(initial, max time.Duration) *Backoff { // Get the current backoff Duration func (p *Backoff) Get(id string) time.Duration { + delay, _ := p.GetWithRetryNumber(id) + return delay +} + +// GetWithRetryNumber the current backoff Duration and the number of previous retry +func (p *Backoff) GetWithRetryNumber(id string) (time.Duration, int32) { p.Lock() defer p.Unlock() var delay time.Duration + var nbRetry int32 entry, ok := p.perItemBackoff[id] if ok { delay = entry.backoff + nbRetry = entry.retry } - return delay + return delay, nbRetry } // move backoff to the next mark, capping at maxDuration func (p *Backoff) Next(id string, eventTime time.Time) { + p.NextWithInitDuration(id, p.defaultDuration, eventTime) +} + +// NextWithInitDuration move backoff to the next mark and increment the number of retry, capping at maxDuration +func (p *Backoff) NextWithInitDuration(id string, initial time.Duration, eventTime time.Time) { p.Lock() defer p.Unlock() entry, ok := p.perItemBackoff[id] if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { - entry = p.initEntryUnsafe(id) + entry = p.initEntryUnsafe(id, initial) } else { delay := entry.backoff * 2 // exponential + entry.retry++ entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration))) } entry.lastUpdate = p.Clock.Now() @@ -136,9 +151,26 @@ func (p *Backoff) DeleteEntry(id string) { delete(p.perItemBackoff, id) } +// StartBackoffGC used to start the Backoff garbage collection mechanism +// Backoff.GC() will be executed each minute +func StartBackoffGC(backoff *Backoff, stopCh <-chan struct{}) { + go func() { + for { + select { + case <-time.After(time.Minute): + backoff.GC() + case <-stopCh: + return + } + } + }() +} + // Take a lock on *Backoff, before calling initEntryUnsafe -func (p *Backoff) initEntryUnsafe(id string) *backoffEntry { - entry := &backoffEntry{backoff: p.defaultDuration} +func (p *Backoff) initEntryUnsafe(id string, backoff time.Duration) *backoffEntry { + entry := &backoffEntry{ + backoff: backoff, + } p.perItemBackoff[id] = entry return entry } diff --git a/staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go b/staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go index 23a6cbfa309cc..415f0f8ee3a26 100644 --- a/staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go +++ b/staging/src/k8s.io/client-go/util/flowcontrol/backoff_test.go @@ -193,3 +193,78 @@ func TestIsInBackOffSinceUpdate(t *testing.T) { } } } + +func TestSlowBackoffWithNbRetry(t *testing.T) { + id := "_idSlow" + tc := clock.NewFakeClock(time.Now()) + step := time.Second + maxDuration := 50 * step + + b := NewFakeBackOff(step, maxDuration, tc) + cases := []struct { + //expected + expectedRetry int32 + expectedDelay time.Duration + }{ + { + 0, + time.Duration(0) * time.Second, + }, + { + 0, + time.Duration(1) * time.Second, + }, + { + 1, + time.Duration(2) * time.Second, + }, + { + 2, + time.Duration(4) * time.Second, + }, + { + 3, + time.Duration(8) * time.Second, + }, + { + 4, + time.Duration(16) * time.Second, + }, + { + 5, + time.Duration(32) * time.Second, + }, + { + 6, + time.Duration(50) * time.Second, + }, + { + 7, + time.Duration(50) * time.Second, + }, + { + 8, + time.Duration(50) * time.Second, + }, + } + for ix, c := range cases { + tc.Step(step) + w, retry := b.GetWithRetryNumber(id) + if retry != c.expectedRetry { + t.Errorf("input: '%d': retry expected %d, got %d", ix, ix, retry) + } + if w != c.expectedDelay { + t.Errorf("input: '%d': expected %s, got %s", ix, c.expectedDelay, w) + } + b.NextWithInitDuration(id, step, tc.Now()) + } + + //Now confirm that the Reset cancels backoff. + b.NextWithInitDuration(id, step, tc.Now()) + b.Reset(id) + backoff, counter := b.GetWithRetryNumber(id) + if backoff != 0 || counter != 0 { + t.Errorf("Reset didn't clear the backoff.") + } + +}