Skip to content

Commit

Permalink
refactor activeDeadlineSeconds to target-level (#284)
Browse files Browse the repository at this point in the history
* refactor opj active deadline seconds

* remove unused codes

* refactor timestamp to time

* fix checkdiff

* revert ut
  • Loading branch information
ColdsteelRail authored Oct 11, 2024
1 parent 268f2a6 commit b24b8b9
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 93 deletions.
14 changes: 11 additions & 3 deletions charts/templates/crd/apps.kusionstack.io_operationjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spec:
activeDeadlineSeconds:
description: |-
Specify the duration in seconds relative to the startTime
that the job may be active before the system tries to terminate it
that the target may be active before the system tries to terminate it
format: int32
type: integer
operationDelaySeconds:
Expand Down Expand Up @@ -92,7 +92,7 @@ spec:
status:
description: OperationJobStatus defines the observed state of OperationJob
properties:
endTimestamp:
endTime:
description: Operation end time
format: date-time
type: string
Expand All @@ -109,7 +109,7 @@ spec:
progress:
description: Phase indicates the of the OperationJob
type: string
startTimestamp:
startTime:
description: Operation start time
format: date-time
type: string
Expand All @@ -121,6 +121,10 @@ spec:
description: Operation details of the target pods
items:
properties:
endTime:
description: target operation end time
format: date-time
type: string
error:
description: error indicates the error info of progressing
properties:
Expand All @@ -143,6 +147,10 @@ spec:
progress:
description: operation progress of target pod
type: string
startTime:
description: target operation start time
format: date-time
type: string
type: object
type: array
totalPodCount:
Expand Down
14 changes: 11 additions & 3 deletions config/crd/bases/apps.kusionstack.io_operationjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spec:
activeDeadlineSeconds:
description: |-
Specify the duration in seconds relative to the startTime
that the job may be active before the system tries to terminate it
that the target may be active before the system tries to terminate it
format: int32
type: integer
operationDelaySeconds:
Expand Down Expand Up @@ -92,7 +92,7 @@ spec:
status:
description: OperationJobStatus defines the observed state of OperationJob
properties:
endTimestamp:
endTime:
description: Operation end time
format: date-time
type: string
Expand All @@ -109,7 +109,7 @@ spec:
progress:
description: Phase indicates the of the OperationJob
type: string
startTimestamp:
startTime:
description: Operation start time
format: date-time
type: string
Expand All @@ -121,6 +121,10 @@ spec:
description: Operation details of the target pods
items:
properties:
endTime:
description: target operation end time
format: date-time
type: string
error:
description: error indicates the error info of progressing
properties:
Expand All @@ -143,6 +147,10 @@ spec:
progress:
description: operation progress of target pod
type: string
startTime:
description: target operation start time
format: date-time
type: string
type: object
type: array
totalPodCount:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ require (
k8s.io/apiserver v0.22.6 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubectl v0.29.0
kusionstack.io/kube-api v0.6.0
kusionstack.io/kube-api v0.6.1-0.20241010064700-c805b4e9064d
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,8 @@ k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
kusionstack.io/kube-api v0.6.0 h1:FSvvZvhpAul4mnCQXYI61dIQ8QT5txdPWNwWXdjJHME=
kusionstack.io/kube-api v0.6.0/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
kusionstack.io/kube-api v0.6.1-0.20241010064700-c805b4e9064d h1:vs0NGK0ZxHBkWn+47LxvvMMPkuv/wjNRq9gJRy8dOTI=
kusionstack.io/kube-api v0.6.1-0.20241010064700-c805b4e9064d/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
kusionstack.io/resourceconsist v0.0.1 h1:+k/jriq5Ld7fQUYfWSMGynz/FesHtl3Rk2fmQPjBe0g=
kusionstack.io/resourceconsist v0.0.1/go.mod h1:816xS/fY6EOUbPFjXIWW/TGs8/YE46qP4ElKeIiwFdU=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
Expand Down
46 changes: 25 additions & 21 deletions pkg/controllers/operationjob/operationjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req
return reconcile.Result{Requeue: true}, nil
}

candidates, err := r.listTargets(ctx, instance)
if err != nil {
return reconcile.Result{}, err
}

if instance.DeletionTimestamp != nil {
if err := r.releaseTargets(ctx, instance); err != nil {
if err := r.releaseTargets(ctx, instance, candidates, true); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "ReleaseTargetFailed", fmt.Sprintf("failed to release targets when job deleting: %s", err.Error()))
return reconcile.Result{}, err
}
Expand All @@ -128,27 +133,17 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req
return reconcile.Result{}, err
}

jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, logger)
jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, candidates, logger)
if jobDeleted || err != nil {
return reconcile.Result{}, err
}

reconcileErr := r.doReconcile(ctx, instance)
updateErr := r.updateStatus(ctx, instance)
return requeueResult(requeueAfter), ctrlutils.AggregateErrors([]error{reconcileErr, updateErr})
err = r.doReconcile(ctx, instance, candidates)
return requeueResult(requeueAfter), err
}

func (r *ReconcileOperationJob) getActionHandlerAndTargets(ctx context.Context, instance *appsv1alpha1.OperationJob) (
actionHandler ActionHandler, enablePodOpsLifecycle bool, candidates []*OpsCandidate, err error) {
if actionHandler, enablePodOpsLifecycle, err = r.getActionHandler(instance); err != nil {
return
}
candidates, err = r.listTargets(ctx, instance)
return
}

func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob) error {
actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, instance)
func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) error {
actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(instance)
if err != nil {
return err
}
Expand All @@ -160,19 +155,24 @@ func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv
getErr := r.getTargetsOpsStatus(ctx, actionHandler, selectedCandidates, enablePodOpsLifecycle, instance)
// calculate opsStatus of all candidates
instance.Status = r.calculateStatus(instance, candidates)
return controllerutils.AggregateErrors([]error{opsErr, getErr})
// update operationjob status
updateErr := r.updateStatus(ctx, instance)
return controllerutils.AggregateErrors([]error{opsErr, getErr, updateErr})
}

func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) (jobStatus appsv1alpha1.OperationJobStatus) {
now := ctrlutils.FormatTimeNow()
jobStatus = appsv1alpha1.OperationJobStatus{
StartTimestamp: instance.Status.StartTimestamp,
EndTimestamp: instance.Status.EndTimestamp,
StartTime: instance.Status.StartTime,
EndTime: instance.Status.EndTime,
Progress: instance.Status.Progress,
ObservedGeneration: instance.Generation,
}

for _, candidate := range candidates {
if candidate.OpsStatus.EndTime == nil && IsCandidateOpsFinished(candidate) {
candidate.OpsStatus.EndTime = &now
}
jobStatus.TargetDetails = append(jobStatus.TargetDetails, *candidate.OpsStatus)
}

Expand All @@ -197,6 +197,10 @@ func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.Operation
if !ojutils.IsJobFinished(&appsv1alpha1.OperationJob{Status: jobStatus}) {
jobStatus.Progress = appsv1alpha1.OperationProgressProcessing

if jobStatus.StartTime == nil {
jobStatus.StartTime = &now
}

if pendingPodCount == totalPodCount {
jobStatus.Progress = appsv1alpha1.OperationProgressPending
}
Expand All @@ -208,8 +212,8 @@ func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.Operation
jobStatus.Progress = appsv1alpha1.OperationProgressSucceeded
}

if jobStatus.EndTimestamp == nil {
jobStatus.EndTimestamp = &now
if jobStatus.EndTime == nil {
jobStatus.EndTime = &now
}
}
}
Expand Down
56 changes: 18 additions & 38 deletions pkg/controllers/operationjob/operationjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ var _ = Describe("operationjob controller", func() {
It("deadline and ttl", func() {
testcase := "test-deadline-ttl"
Expect(createNamespace(c, testcase)).Should(BeNil())
cs := createCollaSetWithReplicas("foo", testcase, 2)
cs := createCollaSetWithReplicas("foo", testcase, 3)
podNames := getPodNamesFromCollaSet(cs)

oj := &appsv1alpha1.OperationJob{
Expand All @@ -507,58 +507,38 @@ var _ = Describe("operationjob controller", func() {
Name: "foo",
},
Spec: appsv1alpha1.OperationJobSpec{
Action: appsv1alpha1.OpsActionReplace,
Partition: int32Pointer(0),
Action: appsv1alpha1.OpsActionReplace,
Targets: []appsv1alpha1.PodOpsTarget{
{
Name: podNames[0],
},
{
Name: podNames[1],
},
{
Name: podNames[2],
},
},
ActiveDeadlineSeconds: int32Pointer(10),
ActiveDeadlineSeconds: int32Pointer(3),
TTLSecondsAfterFinished: int32Pointer(5),
},
}

Expect(c.Create(ctx, oj)).Should(BeNil())

// wait for new pod created
podList := &corev1.PodList{}
Eventually(func() bool {
Expect(c.List(ctx, podList, client.InNamespace(cs.Namespace))).Should(BeNil())
return len(podList.Items) == 4
}, time.Second*10, time.Second).Should(BeTrue())

// mock only 1 new pod serviceAvailable
Expect(c.List(ctx, podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for i := range podList.Items {
if _, exist := podList.Items[i].Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
Expect(updatePodWithRetry(podList.Items[i].Namespace, podList.Items[i].Name, func(pod *corev1.Pod) bool {
pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true"
return true
})).Should(BeNil())
break
}
}

// allow origin pod to be deleted
for i := range podList.Items {
pod := &podList.Items[i]
if _, exist := pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]; exist {
Expect(updatePodWithRetry(pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, poddeletion.OpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())
}
for _, partition := range []int32{0, 1, 2, 3} {
// update partition
Eventually(func() error {
return c.Get(context.TODO(), types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj)
}, time.Second*5, time.Second).Should(BeNil())
Expect(updateOperationJobWithRetry(oj.Namespace, oj.Name, func(job *appsv1alpha1.OperationJob) bool {
job.Spec.Partition = &partition
return true
})).Should(BeNil())
// wait for replace failed after ActiveDeadlineSeconds
assertFailedReplicas(oj, partition, time.Second*1000)
}
assertSucceededReplicas(oj, 1, time.Second*10)

// wait for replace failed after ActiveDeadlineSeconds
assertJobProgressFailed(oj, time.Second*10)
assertFailedReplicas(oj, 1, time.Second*10)

// wait for operationJob deleted after TTL
Eventually(func() bool {
Expand Down
Loading

0 comments on commit b24b8b9

Please sign in to comment.