Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller, vmop): wait for the desired state of the vm #84

Merged
merged 4 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion images/virtualization-artifact/hack/mirrord.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ if ! kubectl -n "${NAMESPACE}" get "deployment/${NEW_NAME}" &>/dev/null; then
(.spec.template.spec.containers[] | select(.name == $CONTAINER_NAME) ) |= (.command= [ "/bin/bash", "-c", "--" ] | .args = [ "while true; do sleep 60; done;" ] ) |
.spec.replicas = 1 |
.spec.template.metadata.labels.mirror = "true" |
.spec.template.metadata.labels.ownerName = $NEW_NAME' \
.spec.template.metadata.labels.ownerName = $NEW_NAME' | \
kubectl create -f -
fi

Expand Down
121 changes: 89 additions & 32 deletions images/virtualization-artifact/pkg/controller/vmop/vmop_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand All @@ -27,62 +30,104 @@ func NewReconciler() *Reconciler {
}

func (r *Reconciler) SetupController(_ context.Context, mgr manager.Manager, ctr controller.Controller) error {
return ctr.Watch(source.Kind(mgr.GetCache(), &virtv2.VirtualMachineOperation{}), &handler.EnqueueRequestForObject{})
err := ctr.Watch(source.Kind(mgr.GetCache(), &virtv2.VirtualMachineOperation{}), &handler.EnqueueRequestForObject{})
if err != nil {
return fmt.Errorf("error setting watch on VMOP: %w", err)
}
// Subscribe on VirtualMachines.
if err = ctr.Watch(
source.Kind(mgr.GetCache(), &virtv2.VirtualMachine{}),
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, vm client.Object) []reconcile.Request {
c := mgr.GetClient()
vmops := &virtv2.VirtualMachineOperationList{}
if err := c.List(ctx, vmops, client.InNamespace(vm.GetNamespace())); err != nil {
return nil
}
var requests []reconcile.Request
for _, vmop := range vmops.Items {
if vmop.Spec.VirtualMachine == vm.GetName() && vmop.Status.Phase == virtv2.VMOPPhaseInProgress {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: vmop.GetNamespace(),
Name: vmop.GetName(),
},
})
break
}
}
return requests
}),
predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
oldVM := e.ObjectOld.(*virtv2.VirtualMachine)
newVM := e.ObjectNew.(*virtv2.VirtualMachine)
return oldVM.Status.Phase != newVM.Status.Phase
},
},
); err != nil {
return fmt.Errorf("error setting watch on VirtualMachine: %w", err)
}
return nil
}

func (r *Reconciler) Sync(ctx context.Context, req reconcile.Request, state *ReconcilerState, opts two_phase_reconciler.ReconcilerOptions) error {
log := opts.Log.WithValues("vmop.name", state.VMOP.Current().GetName())

switch {
case state.IsDeletion():
log.V(1).Info("Delete VMOP, remove protective finalizers")
return r.cleanupOnDeletion(ctx, state, opts)
return r.removeFinalizers(ctx, state, opts)
case state.IsCompleted():
log.V(2).Info("VMOP completed", "namespacedName", req.String())
return r.removeFinalizers(ctx, state, opts)

case state.IsFailed():
log.V(2).Info("VMOP failed", "namespacedName", req.String())
return r.removeFinalizers(ctx, state, opts)
case !state.IsProtected():
// Set protective finalizer atomically.
if controllerutil.AddFinalizer(state.VMOP.Changed(), virtv2.FinalizerVMOPCleanup) {
state.SetReconcilerResult(&reconcile.Result{Requeue: true})
return nil
}
case state.IsCompleted():
log.V(2).Info("VMOP completed", "namespacedName", req.String())
return r.removeVMFinalizers(ctx, state, opts)

case state.IsFailed():
log.V(2).Info("VMOP failed", "namespacedName", req.String())
return r.removeVMFinalizers(ctx, state, opts)
case state.VmIsEmpty():
state.SetReconcilerResult(&reconcile.Result{RequeueAfter: 2 * time.Second})
return nil
}
found, err := state.OtherVMOPInProgress(ctx)
if err != nil {
return err
}
if found {
state.SetReconcilerResult(&reconcile.Result{Requeue: true})
state.SetReconcilerResult(&reconcile.Result{RequeueAfter: 15 * time.Second})
return nil
}
if !state.IsInProgress() {
state.SetInProgress()
state.SetReconcilerResult(&reconcile.Result{Requeue: true})
return r.ensureVMFinalizers(ctx, state, opts)
}

if !r.isOperationAllowed(state.VMOP.Current().Spec.Type, state) {
return nil
}
err = r.doOperation(ctx, state.VMOP.Current().Spec, state)
if err != nil {
msg := "The operation completed with an error."
state.SetOperationResult(false, fmt.Sprintf("%s %s", msg, err.Error()))
opts.Recorder.Event(state.VMOP.Current(), corev1.EventTypeWarning, virtv2.ReasonErrVMOPFailed, msg)
log.V(1).Error(err, msg, "vmop.name", state.VMOP.Current().GetName(), "vmop.namespace", state.VMOP.Current().GetNamespace())
} else {
err = r.ensureVMFinalizers(ctx, state, opts)
if err != nil {
return err
}
if !r.isOperationAllowed(state.VMOP.Current().Spec.Type, state) {
return nil
}
err = r.doOperation(ctx, state.VMOP.Current().Spec, state)
if err != nil {
msg := "The operation completed with an error."
state.SetOperationResult(false, fmt.Sprintf("%s %s", msg, err.Error()))
opts.Recorder.Event(state.VMOP.Current(), corev1.EventTypeWarning, virtv2.ReasonErrVMOPFailed, msg)
log.V(1).Error(err, msg, "vmop.name", state.VMOP.Current().GetName(), "vmop.namespace", state.VMOP.Current().GetNamespace())
return nil
}
state.SetOperationResult(true, "")
msg := "The operation completed without errors."
opts.Recorder.Event(state.VMOP.Current(), corev1.EventTypeNormal, virtv2.ReasonVMOPSucceeded, msg)
log.V(2).Info(msg, "vmop.name", state.VMOP.Current().GetName(), "vmop.namespace", state.VMOP.Current().GetNamespace())
return nil
}
if r.IsCompleted(state.VMOP.Current().Spec.Type, state.VM.Status.Phase) {
return nil
}
state.SetReconcilerResult(&reconcile.Result{RequeueAfter: 60 * time.Second})
return nil
}

Expand All @@ -97,7 +142,7 @@ func (r *Reconciler) UpdateStatus(_ context.Context, _ reconcile.Request, state
vmopStatus := state.VMOP.Current().Status.DeepCopy()

switch {
case state.IsFailed(), state.IsCompleted():
case state.IsFailed(), state.IsCompleted(), state.IsInProgress():
// No need to update status.
break
case vmopStatus.Phase == "":
Expand All @@ -113,19 +158,20 @@ func (r *Reconciler) UpdateStatus(_ context.Context, _ reconcile.Request, state
vmopStatus.Phase = virtv2.VMOPPhaseFailed
vmopStatus.FailureReason = virtv2.ReasonErrVMOPNotPermitted
vmopStatus.FailureMessage = fmt.Sprintf("operation %q not permitted for vm.status.phase=%q", state.VMOP.Current().Spec.Type, state.VM.Status.Phase)
case state.GetInProgress():
vmopStatus.Phase = virtv2.VMOPPhaseInProgress
}

if result := state.GetOperationResult(); result != nil {
if result.WasSuccessful() {
vmopStatus.Phase = virtv2.VMOPPhaseCompleted
vmopStatus.Phase = virtv2.VMOPPhaseInProgress
} else {
vmopStatus.Phase = virtv2.VMOPPhaseFailed
vmopStatus.FailureReason = virtv2.ReasonErrVMOPFailed
vmopStatus.FailureMessage = result.Message()
}
}
if state.IsInProgress() && r.IsCompleted(state.VMOP.Current().Spec.Type, state.VM.Status.Phase) {
vmopStatus.Phase = virtv2.VMOPPhaseCompleted
}
state.VMOP.Changed().Status = *vmopStatus
return nil
}
Expand All @@ -152,7 +198,7 @@ func (r *Reconciler) removeVMFinalizers(ctx context.Context, state *ReconcilerSt
return nil
}

func (r *Reconciler) cleanupOnDeletion(ctx context.Context, state *ReconcilerState, opts two_phase_reconciler.ReconcilerOptions) error {
func (r *Reconciler) removeFinalizers(ctx context.Context, state *ReconcilerState, opts two_phase_reconciler.ReconcilerOptions) error {
if err := r.removeVMFinalizers(ctx, state, opts); err != nil {
return err
}
Expand Down Expand Up @@ -240,3 +286,14 @@ func (r *Reconciler) isOperationAllowedForVmPhase(op virtv2.VMOPOperation, phase
return false
}
}

func (r *Reconciler) IsCompleted(op virtv2.VMOPOperation, phase virtv2.MachinePhase) bool {
switch op {
case virtv2.VMOPOperationTypeRestart, virtv2.VMOPOperationTypeStart:
return phase == virtv2.MachineRunning
case virtv2.VMOPOperationTypeStop:
return phase == virtv2.MachineStopped
default:
return false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type ReconcilerState struct {
VM *virtv2.VirtualMachine

operationResult *OperationResult
inProgress bool
}

type OperationResult struct {
Expand Down Expand Up @@ -102,7 +101,10 @@ func (state *ReconcilerState) IsDeletion() bool {
if state.VMOP.IsEmpty() {
return false
}
return state.VMOP.Current().DeletionTimestamp != nil
if !state.VmIsEmpty() && state.VM.DeletionTimestamp != nil {
return true
}
return state.VMOP.Current().DeletionTimestamp != nil && !state.IsInProgress()
}

func (state *ReconcilerState) IsProtected() bool {
Expand Down Expand Up @@ -130,13 +132,17 @@ func (state *ReconcilerState) IsInProgress() bool {
return state.VMOP.Current().Status.Phase == virtv2.VMOPPhaseInProgress
}

func (state *ReconcilerState) IsFinish() bool {
return state.IsCompleted() || state.IsFailed()
}

func (state *ReconcilerState) VmIsEmpty() bool {
return state.VM == nil
}

func (state *ReconcilerState) OtherVMOPInProgress(ctx context.Context) (bool, error) {
vmops := virtv2.VirtualMachineOperationList{}
err := state.Client.List(ctx, &vmops, &client.ListOptions{Namespace: state.VMOP.Current().Namespace})
var vmops virtv2.VirtualMachineOperationList
err := state.Client.List(ctx, &vmops, client.InNamespace(state.VMOP.Current().GetNamespace()))
if err != nil {
return false, err
}
Expand All @@ -161,14 +167,6 @@ func (state *ReconcilerState) GetOperationResult() *OperationResult {
return state.operationResult
}

func (state *ReconcilerState) SetInProgress() {
state.inProgress = true
}

func (state *ReconcilerState) GetInProgress() bool {
return state.inProgress
}

func (state *ReconcilerState) GetKVVM(ctx context.Context) (*virtv1.VirtualMachine, error) {
if state.VmIsEmpty() {
return nil, fmt.Errorf("VM %s not found", state.VMOP.Current().Spec.VirtualMachine)
Expand Down