Skip to content

Commit

Permalink
refactor opj active deadline seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
ColdsteelRail committed Oct 8, 2024
1 parent 4fa8b0e commit 18d0e7b
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 114 deletions.
6 changes: 5 additions & 1 deletion charts/templates/crd/apps.kusionstack.io_operationjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spec:
activeDeadlineSeconds:
description: |-
Specify the duration in seconds relative to the startTime
that the job may be active before the system tries to terminate it
that the target may be active before the system tries to terminate it
format: int32
type: integer
operationDelaySeconds:
Expand Down Expand Up @@ -143,6 +143,10 @@ spec:
progress:
description: operation progress of target pod
type: string
startTimestamp:
description: target operation start time
format: date-time
type: string
type: object
type: array
totalPodCount:
Expand Down
6 changes: 5 additions & 1 deletion config/crd/bases/apps.kusionstack.io_operationjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spec:
activeDeadlineSeconds:
description: |-
Specify the duration in seconds relative to the startTime
that the job may be active before the system tries to terminate it
that the target may be active before the system tries to terminate it
format: int32
type: integer
operationDelaySeconds:
Expand Down Expand Up @@ -143,6 +143,10 @@ spec:
progress:
description: operation progress of target pod
type: string
startTimestamp:
description: target operation start time
format: date-time
type: string
type: object
type: array
totalPodCount:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ require (
k8s.io/apiserver v0.22.6 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubectl v0.29.0
kusionstack.io/kube-api v0.6.0
kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,8 @@ k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
kusionstack.io/kube-api v0.6.0 h1:FSvvZvhpAul4mnCQXYI61dIQ8QT5txdPWNwWXdjJHME=
kusionstack.io/kube-api v0.6.0/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e h1:kgLQN99sgGl5vjCzDYqAhAxSh/1K2/W5FW9ba7oYr/U=
kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
kusionstack.io/resourceconsist v0.0.1 h1:+k/jriq5Ld7fQUYfWSMGynz/FesHtl3Rk2fmQPjBe0g=
kusionstack.io/resourceconsist v0.0.1/go.mod h1:816xS/fY6EOUbPFjXIWW/TGs8/YE46qP4ElKeIiwFdU=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
Expand Down
36 changes: 19 additions & 17 deletions pkg/controllers/operationjob/operationjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,14 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req
return reconcile.Result{Requeue: true}, nil
}

//actionHandler, enablePodOpsLifecycle := r.getActionHandler(instance)
candidates, err := r.listTargets(ctx, instance)
if err != nil {
return reconcile.Result{}, err
}

Check warning on line 124 in pkg/controllers/operationjob/operationjob_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/operationjob/operationjob_controller.go#L123-L124

Added lines #L123 - L124 were not covered by tests

if instance.DeletionTimestamp != nil {
if err := r.releaseTargets(ctx, instance); err != nil {
if err := r.releaseTargets(ctx, instance, candidates, true); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "ReleaseTargetFailed", fmt.Sprintf("failed to release targets when job deleting: %s", err.Error()))
return reconcile.Result{}, err
}
Expand All @@ -128,27 +134,17 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req
return reconcile.Result{}, err
}

jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, logger)
jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, candidates, logger)
if jobDeleted || err != nil {
return reconcile.Result{}, err
}

reconcileErr := r.doReconcile(ctx, instance)
updateErr := r.updateStatus(ctx, instance)
return requeueResult(requeueAfter), ctrlutils.AggregateErrors([]error{reconcileErr, updateErr})
}

func (r *ReconcileOperationJob) getActionHandlerAndTargets(ctx context.Context, instance *appsv1alpha1.OperationJob) (
actionHandler ActionHandler, enablePodOpsLifecycle bool, candidates []*OpsCandidate, err error) {
if actionHandler, enablePodOpsLifecycle, err = r.getActionHandler(instance); err != nil {
return
}
candidates, err = r.listTargets(ctx, instance)
return
err = r.doReconcile(ctx, instance, candidates)
return requeueResult(requeueAfter), err
}

func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob) error {
actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, instance)
func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) error {
actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(instance)
if err != nil {
return err
}
Expand All @@ -160,7 +156,9 @@ func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv
getErr := r.getTargetsOpsStatus(ctx, actionHandler, selectedCandidates, enablePodOpsLifecycle, instance)
// calculate opsStatus of all candidates
instance.Status = r.calculateStatus(instance, candidates)
return controllerutils.AggregateErrors([]error{opsErr, getErr})
// update operationjob status
updateErr := r.updateStatus(ctx, instance)
return controllerutils.AggregateErrors([]error{opsErr, getErr, updateErr})
}

func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) (jobStatus appsv1alpha1.OperationJobStatus) {
Expand Down Expand Up @@ -197,6 +195,10 @@ func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.Operation
if !ojutils.IsJobFinished(&appsv1alpha1.OperationJob{Status: jobStatus}) {
jobStatus.Progress = appsv1alpha1.OperationProgressProcessing

if jobStatus.StartTimestamp == nil {
jobStatus.StartTimestamp = &now
}

if pendingPodCount == totalPodCount {
jobStatus.Progress = appsv1alpha1.OperationProgressPending
}
Expand Down
134 changes: 57 additions & 77 deletions pkg/controllers/operationjob/operationjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,63 @@ var (

var _ = Describe("operationjob controller", func() {

It("deadline and ttl", func() {
testcase := "test-deadline-ttl"
Expect(createNamespace(c, testcase)).Should(BeNil())
cs := createCollaSetWithReplicas("foo", testcase, 3)
podNames := getPodNamesFromCollaSet(cs)

oj := &appsv1alpha1.OperationJob{
ObjectMeta: metav1.ObjectMeta{
Namespace: testcase,
Name: "foo",
},
Spec: appsv1alpha1.OperationJobSpec{
Partition: int32Pointer(0),
Action: appsv1alpha1.OpsActionReplace,
Targets: []appsv1alpha1.PodOpsTarget{
{
Name: podNames[0],
},
{
Name: podNames[1],
},
{
Name: podNames[2],
},
},
ActiveDeadlineSeconds: int32Pointer(3),
TTLSecondsAfterFinished: int32Pointer(5),
},
}

Expect(c.Create(ctx, oj)).Should(BeNil())

for _, partition := range []int32{0, 1, 2, 3} {
// update partition
Eventually(func() error {
return c.Get(context.TODO(), types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj)
}, time.Second*5, time.Second).Should(BeNil())
Expect(updateOperationJobWithRetry(oj.Namespace, oj.Name, func(job *appsv1alpha1.OperationJob) bool {
job.Spec.Partition = &partition
return true
})).Should(BeNil())
// wait for replace failed after ActiveDeadlineSeconds
assertFailedReplicas(oj, partition, time.Second*1000)
}

// wait for operationJob deleted after TTL
Eventually(func() bool {
err := c.Get(ctx, types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj)
if errors.IsNotFound(err) {
return true
} else {
Expect(err).Should(BeNil())
}
return false
}, time.Second*20, time.Second).Should(BeTrue())
})

It("[replace] reconcile", func() {
testcase := "test-replace"
Expect(createNamespace(c, testcase)).Should(BeNil())
Expand Down Expand Up @@ -432,83 +489,6 @@ var _ = Describe("operationjob controller", func() {
}, time.Second*10, time.Second).Should(BeTrue())
})

It("deadline and ttl", func() {
testcase := "test-deadline-ttl"
Expect(createNamespace(c, testcase)).Should(BeNil())
cs := createCollaSetWithReplicas("foo", testcase, 2)
podNames := getPodNamesFromCollaSet(cs)

oj := &appsv1alpha1.OperationJob{
ObjectMeta: metav1.ObjectMeta{
Namespace: testcase,
Name: "foo",
},
Spec: appsv1alpha1.OperationJobSpec{
Action: appsv1alpha1.OpsActionReplace,
Targets: []appsv1alpha1.PodOpsTarget{
{
Name: podNames[0],
},
{
Name: podNames[1],
},
},
ActiveDeadlineSeconds: int32Pointer(10),
TTLSecondsAfterFinished: int32Pointer(5),
},
}

Expect(c.Create(ctx, oj)).Should(BeNil())

// wait for new pod created
podList := &corev1.PodList{}
Eventually(func() bool {
Expect(c.List(ctx, podList, client.InNamespace(cs.Namespace))).Should(BeNil())
return len(podList.Items) == 4
}, time.Second*10, time.Second).Should(BeTrue())

// mock only 1 new pod serviceAvailable
Expect(c.List(ctx, podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for i := range podList.Items {
if _, exist := podList.Items[i].Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
Expect(updatePodWithRetry(podList.Items[i].Namespace, podList.Items[i].Name, func(pod *corev1.Pod) bool {
pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true"
return true
})).Should(BeNil())
break
}
}

// allow origin pod to be deleted
for i := range podList.Items {
pod := &podList.Items[i]
if _, exist := pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]; exist {
Expect(updatePodWithRetry(pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, poddeletion.OpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())
}
}
assertSucceededReplicas(oj, 1, time.Second*10)

// wait for replace failed after ActiveDeadlineSeconds
assertJobProgressFailed(oj, time.Second*10)
assertFailedReplicas(oj, 1, time.Second*10)

// wait for operationJob deleted after TTL
Eventually(func() bool {
err := c.Get(ctx, types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj)
if errors.IsNotFound(err) {
return true
} else {
Expect(err).Should(BeNil())
}
return false
}, time.Second*20, time.Second).Should(BeTrue())
})

})

func assertFailedReplicas(oj *appsv1alpha1.OperationJob, failedPodCount int32, timeout time.Duration) {
Expand Down
45 changes: 30 additions & 15 deletions pkg/controllers/operationjob/operationjob_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"kusionstack.io/kuperator/pkg/controllers/operationjob/replace"
ojutils "kusionstack.io/kuperator/pkg/controllers/operationjob/utils"
controllerutils "kusionstack.io/kuperator/pkg/controllers/utils"
ctrlutils "kusionstack.io/kuperator/pkg/controllers/utils"
"kusionstack.io/kuperator/pkg/controllers/utils/podopslifecycle"
)

Expand Down Expand Up @@ -149,6 +150,10 @@ func (r *ReconcileOperationJob) filterAndOperateAllowOpsTargets(
} else {
candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressProcessing
}
if candidate.OpsStatus.StartTimestamp == nil {
now := ctrlutils.FormatTimeNow()
candidate.OpsStatus.StartTimestamp = &now
}
}

if isAllowedOps {
Expand Down Expand Up @@ -237,27 +242,36 @@ func (r *ReconcileOperationJob) getTargetsOpsStatus(
}

// ensureActiveDeadlineAndTTL calculate time to ActiveDeadlineSeconds and TTLSecondsAfterFinished and release targets
func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, logger logr.Logger) (bool, *time.Duration, error) {
isFailed := operationJob.Status.Progress == appsv1alpha1.OperationProgressFailed
isSucceeded := operationJob.Status.Progress == appsv1alpha1.OperationProgressSucceeded

func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, logger logr.Logger) (bool, *time.Duration, error) {
if operationJob.Spec.ActiveDeadlineSeconds != nil {
if !isFailed && !isSucceeded {
leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(operationJob.CreationTimestamp.Time)
var allowReleaseCandidates []*OpsCandidate
for i := range candidates {
candidate := candidates[i]
// just skip if target operation already finished, or not started
if IsCandidateOpsFinished(candidate) || candidate.OpsStatus.StartTimestamp == nil {
continue
}
leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(candidate.OpsStatus.StartTimestamp.Time)
if leftTime > 0 {
return false, &leftTime, nil
} else {
logger.Info("should end but still processing")
r.Recorder.Eventf(operationJob, corev1.EventTypeNormal, "Timeout", "Try to fail OperationJob for timeout...")
// mark operationjob and targets failed and release targets
ojutils.MarkOperationJobFailed(operationJob)
return false, nil, r.releaseTargets(ctx, operationJob)
MarkCandidateFailed(candidate)
allowReleaseCandidates = append(allowReleaseCandidates, candidate)
}
}
if len(allowReleaseCandidates) > 0 {
releaseErr := r.releaseTargets(ctx, operationJob, allowReleaseCandidates, false)
operationJob.Status = r.calculateStatus(operationJob, candidates)
updateErr := r.updateStatus(ctx, operationJob)
return false, nil, controllerutils.AggregateErrors([]error{releaseErr, updateErr})
}
}

if operationJob.Spec.TTLSecondsAfterFinished != nil {
if isFailed || isSucceeded {
if ojutils.IsJobFinished(operationJob) {
leftTime := time.Duration(*operationJob.Spec.TTLSecondsAfterFinished)*time.Second - time.Since(operationJob.Status.EndTimestamp.Time)
if leftTime > 0 {
return false, &leftTime, nil
Expand All @@ -269,13 +283,12 @@ func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context,
}
}
}

return false, nil, nil
}

// releaseTargets try to release the targets from operation when the operationJob is deleted
func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob) error {
actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, operationJob)
func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, needUpdateStatus bool) error {
actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(operationJob)
if err != nil {
return err
}
Expand All @@ -288,12 +301,14 @@ func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob
releaseErr = controllerutils.AggregateErrors([]error{releaseErr, err})
}
// mark candidate as failed if not finished
if IsCandidateOpsFinished(candidate) {
return nil
if !IsCandidateOpsFinished(candidate) {
candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed
}
candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed
return nil
})
if !needUpdateStatus {
return releaseErr
}
operationJob.Status = r.calculateStatus(operationJob, candidates)
updateErr := r.updateStatus(ctx, operationJob)
return controllerutils.AggregateErrors([]error{releaseErr, updateErr})
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/operationjob/opscore/candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@ func IsCandidateServiceAvailable(candidate *OpsCandidate) bool {
_, serviceAvailable := candidate.Pod.Labels[appsv1alpha1.PodServiceAvailableLabel]
return serviceAvailable
}

func MarkCandidateFailed(candidate *OpsCandidate) {
if candidate.OpsStatus.Progress != appsv1alpha1.OperationProgressSucceeded {
candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed
}
}

0 comments on commit 18d0e7b

Please sign in to comment.