From 18d0e7b64e07b9ba361f93cc050d8f2d54a4da06 Mon Sep 17 00:00:00 2001 From: ColdsteelRail <574252631@qq.com> Date: Mon, 23 Sep 2024 15:37:51 +0800 Subject: [PATCH] refactor opj active deadline seconds --- .../apps.kusionstack.io_operationjobs.yaml | 6 +- .../apps.kusionstack.io_operationjobs.yaml | 6 +- go.mod | 2 +- go.sum | 4 +- .../operationjob/operationjob_controller.go | 36 ++--- .../operationjob_controller_test.go | 134 ++++++++---------- .../operationjob/operationjob_manager.go | 45 ++++-- .../operationjob/opscore/candidate.go | 6 + 8 files changed, 125 insertions(+), 114 deletions(-) diff --git a/charts/templates/crd/apps.kusionstack.io_operationjobs.yaml b/charts/templates/crd/apps.kusionstack.io_operationjobs.yaml index 2750ef50..1d249663 100644 --- a/charts/templates/crd/apps.kusionstack.io_operationjobs.yaml +++ b/charts/templates/crd/apps.kusionstack.io_operationjobs.yaml @@ -59,7 +59,7 @@ spec: activeDeadlineSeconds: description: |- Specify the duration in seconds relative to the startTime - that the job may be active before the system tries to terminate it + that the target may be active before the system tries to terminate it format: int32 type: integer operationDelaySeconds: @@ -143,6 +143,10 @@ spec: progress: description: operation progress of target pod type: string + startTimestamp: + description: target operation start time + format: date-time + type: string type: object type: array totalPodCount: diff --git a/config/crd/bases/apps.kusionstack.io_operationjobs.yaml b/config/crd/bases/apps.kusionstack.io_operationjobs.yaml index 2750ef50..1d249663 100644 --- a/config/crd/bases/apps.kusionstack.io_operationjobs.yaml +++ b/config/crd/bases/apps.kusionstack.io_operationjobs.yaml @@ -59,7 +59,7 @@ spec: activeDeadlineSeconds: description: |- Specify the duration in seconds relative to the startTime - that the job may be active before the system tries to terminate it + that the target may be active before the system tries to terminate it format: int32 type: integer operationDelaySeconds: @@ -143,6 +143,10 @@ spec: progress: description: operation progress of target pod type: string + startTimestamp: + description: target operation start time + format: date-time + type: string type: object type: array totalPodCount: diff --git a/go.mod b/go.mod index 3f7a7b9d..e3fe56e0 100644 --- a/go.mod +++ b/go.mod @@ -94,7 +94,7 @@ require ( k8s.io/apiserver v0.22.6 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/kubectl v0.29.0 - kusionstack.io/kube-api v0.6.0 + kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index d608ab8e..c2e28fea 100644 --- a/go.sum +++ b/go.sum @@ -1139,8 +1139,8 @@ k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -kusionstack.io/kube-api v0.6.0 h1:FSvvZvhpAul4mnCQXYI61dIQ8QT5txdPWNwWXdjJHME= -kusionstack.io/kube-api v0.6.0/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y= +kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e h1:kgLQN99sgGl5vjCzDYqAhAxSh/1K2/W5FW9ba7oYr/U= +kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y= kusionstack.io/resourceconsist v0.0.1 h1:+k/jriq5Ld7fQUYfWSMGynz/FesHtl3Rk2fmQPjBe0g= kusionstack.io/resourceconsist v0.0.1/go.mod h1:816xS/fY6EOUbPFjXIWW/TGs8/YE46qP4ElKeIiwFdU= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= diff --git a/pkg/controllers/operationjob/operationjob_controller.go b/pkg/controllers/operationjob/operationjob_controller.go index 4ab94254..4538fc32 100644 --- a/pkg/controllers/operationjob/operationjob_controller.go +++ b/pkg/controllers/operationjob/operationjob_controller.go @@ -117,8 +117,14 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req return reconcile.Result{Requeue: true}, nil } + //actionHandler, enablePodOpsLifecycle := r.getActionHandler(instance) + candidates, err := r.listTargets(ctx, instance) + if err != nil { + return reconcile.Result{}, err + } + if instance.DeletionTimestamp != nil { - if err := r.releaseTargets(ctx, instance); err != nil { + if err := r.releaseTargets(ctx, instance, candidates, true); err != nil { r.Recorder.Eventf(instance, corev1.EventTypeWarning, "ReleaseTargetFailed", fmt.Sprintf("failed to release targets when job deleting: %s", err.Error())) return reconcile.Result{}, err } @@ -128,27 +134,17 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req return reconcile.Result{}, err } - jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, logger) + jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, candidates, logger) if jobDeleted || err != nil { return reconcile.Result{}, err } - reconcileErr := r.doReconcile(ctx, instance) - updateErr := r.updateStatus(ctx, instance) - return requeueResult(requeueAfter), ctrlutils.AggregateErrors([]error{reconcileErr, updateErr}) -} - -func (r *ReconcileOperationJob) getActionHandlerAndTargets(ctx context.Context, instance *appsv1alpha1.OperationJob) ( - actionHandler ActionHandler, enablePodOpsLifecycle bool, candidates []*OpsCandidate, err error) { - if actionHandler, enablePodOpsLifecycle, err = r.getActionHandler(instance); err != nil { - return - } - candidates, err = r.listTargets(ctx, instance) - return + err = r.doReconcile(ctx, instance, candidates) + return requeueResult(requeueAfter), err } -func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob) error { - actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, instance) +func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) error { + actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(instance) if err != nil { return err } @@ -160,7 +156,9 @@ func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv getErr := r.getTargetsOpsStatus(ctx, actionHandler, selectedCandidates, enablePodOpsLifecycle, instance) // calculate opsStatus of all candidates instance.Status = r.calculateStatus(instance, candidates) - return controllerutils.AggregateErrors([]error{opsErr, getErr}) + // update operationjob status + updateErr := r.updateStatus(ctx, instance) + return controllerutils.AggregateErrors([]error{opsErr, getErr, updateErr}) } func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) (jobStatus appsv1alpha1.OperationJobStatus) { @@ -197,6 +195,10 @@ func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.Operation if !ojutils.IsJobFinished(&appsv1alpha1.OperationJob{Status: jobStatus}) { jobStatus.Progress = appsv1alpha1.OperationProgressProcessing + if jobStatus.StartTimestamp == nil { + jobStatus.StartTimestamp = &now + } + if pendingPodCount == totalPodCount { jobStatus.Progress = appsv1alpha1.OperationProgressPending } diff --git a/pkg/controllers/operationjob/operationjob_controller_test.go b/pkg/controllers/operationjob/operationjob_controller_test.go index 556141b0..5203c4d7 100644 --- a/pkg/controllers/operationjob/operationjob_controller_test.go +++ b/pkg/controllers/operationjob/operationjob_controller_test.go @@ -61,6 +61,63 @@ var ( var _ = Describe("operationjob controller", func() { + It("deadline and ttl", func() { + testcase := "test-deadline-ttl" + Expect(createNamespace(c, testcase)).Should(BeNil()) + cs := createCollaSetWithReplicas("foo", testcase, 3) + podNames := getPodNamesFromCollaSet(cs) + + oj := &appsv1alpha1.OperationJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testcase, + Name: "foo", + }, + Spec: appsv1alpha1.OperationJobSpec{ + Partition: int32Pointer(0), + Action: appsv1alpha1.OpsActionReplace, + Targets: []appsv1alpha1.PodOpsTarget{ + { + Name: podNames[0], + }, + { + Name: podNames[1], + }, + { + Name: podNames[2], + }, + }, + ActiveDeadlineSeconds: int32Pointer(3), + TTLSecondsAfterFinished: int32Pointer(5), + }, + } + + Expect(c.Create(ctx, oj)).Should(BeNil()) + + for _, partition := range []int32{0, 1, 2, 3} { + // update partition + Eventually(func() error { + return c.Get(context.TODO(), types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj) + }, time.Second*5, time.Second).Should(BeNil()) + Expect(updateOperationJobWithRetry(oj.Namespace, oj.Name, func(job *appsv1alpha1.OperationJob) bool { + job.Spec.Partition = &partition + return true + })).Should(BeNil()) + // wait for replace failed after ActiveDeadlineSeconds + assertFailedReplicas(oj, partition, time.Second*1000) + } + + // wait for operationJob deleted after TTL + Eventually(func() bool { + err := c.Get(ctx, types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj) + if errors.IsNotFound(err) { + return true + } else { + Expect(err).Should(BeNil()) + } + return false + }, time.Second*20, time.Second).Should(BeTrue()) + }) + It("[replace] reconcile", func() { testcase := "test-replace" Expect(createNamespace(c, testcase)).Should(BeNil()) @@ -432,83 +489,6 @@ var _ = Describe("operationjob controller", func() { }, time.Second*10, time.Second).Should(BeTrue()) }) - It("deadline and ttl", func() { - testcase := "test-deadline-ttl" - Expect(createNamespace(c, testcase)).Should(BeNil()) - cs := createCollaSetWithReplicas("foo", testcase, 2) - podNames := getPodNamesFromCollaSet(cs) - - oj := &appsv1alpha1.OperationJob{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testcase, - Name: "foo", - }, - Spec: appsv1alpha1.OperationJobSpec{ - Action: appsv1alpha1.OpsActionReplace, - Targets: []appsv1alpha1.PodOpsTarget{ - { - Name: podNames[0], - }, - { - Name: podNames[1], - }, - }, - ActiveDeadlineSeconds: int32Pointer(10), - TTLSecondsAfterFinished: int32Pointer(5), - }, - } - - Expect(c.Create(ctx, oj)).Should(BeNil()) - - // wait for new pod created - podList := &corev1.PodList{} - Eventually(func() bool { - Expect(c.List(ctx, podList, client.InNamespace(cs.Namespace))).Should(BeNil()) - return len(podList.Items) == 4 - }, time.Second*10, time.Second).Should(BeTrue()) - - // mock only 1 new pod serviceAvailable - Expect(c.List(ctx, podList, client.InNamespace(cs.Namespace))).Should(BeNil()) - for i := range podList.Items { - if _, exist := podList.Items[i].Labels[appsv1alpha1.PodReplacePairOriginName]; exist { - Expect(updatePodWithRetry(podList.Items[i].Namespace, podList.Items[i].Name, func(pod *corev1.Pod) bool { - pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true" - return true - })).Should(BeNil()) - break - } - } - - // allow origin pod to be deleted - for i := range podList.Items { - pod := &podList.Items[i] - if _, exist := pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]; exist { - Expect(updatePodWithRetry(pod.Namespace, pod.Name, func(pod *corev1.Pod) bool { - labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, poddeletion.OpsLifecycleAdapter.GetID()) - pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano()) - pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey] = fmt.Sprintf("%d", time.Now().UnixNano()) - return true - })).Should(BeNil()) - } - } - assertSucceededReplicas(oj, 1, time.Second*10) - - // wait for replace failed after ActiveDeadlineSeconds - assertJobProgressFailed(oj, time.Second*10) - assertFailedReplicas(oj, 1, time.Second*10) - - // wait for operationJob deleted after TTL - Eventually(func() bool { - err := c.Get(ctx, types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj) - if errors.IsNotFound(err) { - return true - } else { - Expect(err).Should(BeNil()) - } - return false - }, time.Second*20, time.Second).Should(BeTrue()) - }) - }) func assertFailedReplicas(oj *appsv1alpha1.OperationJob, failedPodCount int32, timeout time.Duration) { diff --git a/pkg/controllers/operationjob/operationjob_manager.go b/pkg/controllers/operationjob/operationjob_manager.go index c77a01f5..863391c0 100644 --- a/pkg/controllers/operationjob/operationjob_manager.go +++ b/pkg/controllers/operationjob/operationjob_manager.go @@ -32,6 +32,7 @@ import ( "kusionstack.io/kuperator/pkg/controllers/operationjob/replace" ojutils "kusionstack.io/kuperator/pkg/controllers/operationjob/utils" controllerutils "kusionstack.io/kuperator/pkg/controllers/utils" + ctrlutils "kusionstack.io/kuperator/pkg/controllers/utils" "kusionstack.io/kuperator/pkg/controllers/utils/podopslifecycle" ) @@ -149,6 +150,10 @@ func (r *ReconcileOperationJob) filterAndOperateAllowOpsTargets( } else { candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressProcessing } + if candidate.OpsStatus.StartTimestamp == nil { + now := ctrlutils.FormatTimeNow() + candidate.OpsStatus.StartTimestamp = &now + } } if isAllowedOps { @@ -237,27 +242,36 @@ func (r *ReconcileOperationJob) getTargetsOpsStatus( } // ensureActiveDeadlineAndTTL calculate time to ActiveDeadlineSeconds and TTLSecondsAfterFinished and release targets -func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, logger logr.Logger) (bool, *time.Duration, error) { - isFailed := operationJob.Status.Progress == appsv1alpha1.OperationProgressFailed - isSucceeded := operationJob.Status.Progress == appsv1alpha1.OperationProgressSucceeded - +func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, logger logr.Logger) (bool, *time.Duration, error) { if operationJob.Spec.ActiveDeadlineSeconds != nil { - if !isFailed && !isSucceeded { - leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(operationJob.CreationTimestamp.Time) + var allowReleaseCandidates []*OpsCandidate + for i := range candidates { + candidate := candidates[i] + // just skip if target operation already finished, or not started + if IsCandidateOpsFinished(candidate) || candidate.OpsStatus.StartTimestamp == nil { + continue + } + leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(candidate.OpsStatus.StartTimestamp.Time) if leftTime > 0 { return false, &leftTime, nil } else { logger.Info("should end but still processing") r.Recorder.Eventf(operationJob, corev1.EventTypeNormal, "Timeout", "Try to fail OperationJob for timeout...") // mark operationjob and targets failed and release targets - ojutils.MarkOperationJobFailed(operationJob) - return false, nil, r.releaseTargets(ctx, operationJob) + MarkCandidateFailed(candidate) + allowReleaseCandidates = append(allowReleaseCandidates, candidate) } } + if len(allowReleaseCandidates) > 0 { + releaseErr := r.releaseTargets(ctx, operationJob, allowReleaseCandidates, false) + operationJob.Status = r.calculateStatus(operationJob, candidates) + updateErr := r.updateStatus(ctx, operationJob) + return false, nil, controllerutils.AggregateErrors([]error{releaseErr, updateErr}) + } } if operationJob.Spec.TTLSecondsAfterFinished != nil { - if isFailed || isSucceeded { + if ojutils.IsJobFinished(operationJob) { leftTime := time.Duration(*operationJob.Spec.TTLSecondsAfterFinished)*time.Second - time.Since(operationJob.Status.EndTimestamp.Time) if leftTime > 0 { return false, &leftTime, nil @@ -269,13 +283,12 @@ func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, } } } - return false, nil, nil } // releaseTargets try to release the targets from operation when the operationJob is deleted -func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob) error { - actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, operationJob) +func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, needUpdateStatus bool) error { + actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(operationJob) if err != nil { return err } @@ -288,12 +301,14 @@ func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob releaseErr = controllerutils.AggregateErrors([]error{releaseErr, err}) } // mark candidate as failed if not finished - if IsCandidateOpsFinished(candidate) { - return nil + if !IsCandidateOpsFinished(candidate) { + candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed } - candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed return nil }) + if !needUpdateStatus { + return releaseErr + } operationJob.Status = r.calculateStatus(operationJob, candidates) updateErr := r.updateStatus(ctx, operationJob) return controllerutils.AggregateErrors([]error{releaseErr, updateErr}) diff --git a/pkg/controllers/operationjob/opscore/candidate.go b/pkg/controllers/operationjob/opscore/candidate.go index 8e8a6882..af7ecc85 100644 --- a/pkg/controllers/operationjob/opscore/candidate.go +++ b/pkg/controllers/operationjob/opscore/candidate.go @@ -88,3 +88,9 @@ func IsCandidateServiceAvailable(candidate *OpsCandidate) bool { _, serviceAvailable := candidate.Pod.Labels[appsv1alpha1.PodServiceAvailableLabel] return serviceAvailable } + +func MarkCandidateFailed(candidate *OpsCandidate) { + if candidate.OpsStatus.Progress != appsv1alpha1.OperationProgressSucceeded { + candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed + } +}