Skip to content

Commit

Permalink
second approach
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuvaraj Kakaraparthi committed May 31, 2023
1 parent 82798f0 commit e1a1105
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 78 deletions.
127 changes: 85 additions & 42 deletions internal/controllers/machineset/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,51 +296,13 @@ func (r *Reconciler) reconcile(ctx context.Context, cluster *clusterv1.Cluster,
filteredMachines = append(filteredMachines, machine)
}

// Remediate failed Machines by deleting them.
var errs []error
machinesToRemediate := make([]*clusterv1.Machine, 0, len(filteredMachines))
for _, machine := range filteredMachines {
// filteredMachines contains machines in deleting status to calculate correct status.
// skip remediation for those in deleting status.
if !machine.DeletionTimestamp.IsZero() {
continue
}
if conditions.IsFalse(machine, clusterv1.MachineOwnerRemediatedCondition) {
machinesToRemediate = append(machinesToRemediate, machine)
}
}

result := ctrl.Result{}
if len(machinesToRemediate) > 0 {
preflightChecksResult, err := r.runPreflightChecks(ctx, cluster, machineSet, "Machine Remediation")
if err != nil {
return ctrl.Result{}, err
}
// Delete the machines only if the preflight checks have passed. Do not delete machines if we cannot
// guarantee creating new machines.
if preflightChecksResult.IsZero() {
for _, machine := range machinesToRemediate {
log.Info("Deleting machine because marked as unhealthy by the MachineHealthCheck controller")
patch := client.MergeFrom(machine.DeepCopy())
if err := r.Client.Delete(ctx, machine); err != nil {
errs = append(errs, errors.Wrap(err, "failed to delete"))
continue
}
conditions.MarkTrue(machine, clusterv1.MachineOwnerRemediatedCondition)
if err := r.Client.Status().Patch(ctx, machine, patch); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, errors.Wrap(err, "failed to update status"))
}
}
} else {
result = util.LowestNonZeroResult(result, preflightChecksResult)
}
}

err = kerrors.NewAggregate(errs)
reconcileUnhealthyMachinesResult, err := r.reconcileUnhealthyMachines(ctx, cluster, machineSet, filteredMachines)
if err != nil {
log.Info("Failed while deleting unhealthy machines", "err", err)
return ctrl.Result{}, errors.Wrap(err, "failed to remediate machines")
return ctrl.Result{}, errors.Wrap(err, "failed to reconcile unhealthy machines")
}
result = util.LowestNonZeroResult(result, reconcileUnhealthyMachinesResult)

if err := r.syncMachines(ctx, machineSet, filteredMachines); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to update Machines")
Expand Down Expand Up @@ -482,8 +444,13 @@ func (r *Reconciler) syncReplicas(ctx context.Context, cluster *clusterv1.Cluste
}
}

result, err := r.runPreflightChecks(ctx, cluster, ms, "Scale up")
result, preflightCheckErrMessage, err := r.runPreflightChecks(ctx, cluster, ms, "Scale up")
if err != nil || !result.IsZero() {
if err != nil {
// If the error is not nil use that as the message for the condition.
preflightCheckErrMessage = err.Error()
}
conditions.MarkFalse(ms, clusterv1.MachinesCreatedCondition, clusterv1.PreflightCheckFailedReason, clusterv1.ConditionSeverityWarning, preflightCheckErrMessage)
return result, err
}

Expand Down Expand Up @@ -991,6 +958,82 @@ func (r *Reconciler) getMachineNode(ctx context.Context, cluster *clusterv1.Clus
return node, nil
}

func (r *Reconciler) reconcileUnhealthyMachines(ctx context.Context, cluster *clusterv1.Cluster, ms *clusterv1.MachineSet, filteredMachines []*clusterv1.Machine) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
// List all unhealthy machines.
machinesToRemediate := make([]*clusterv1.Machine, 0, len(filteredMachines))
for _, m := range filteredMachines {
// filteredMachines contains machines in deleting status to calculate correct status.
// skip remediation for those in deleting status.
if !m.DeletionTimestamp.IsZero() {
continue
}
if conditions.IsFalse(m, clusterv1.MachineOwnerRemediatedCondition) {
machinesToRemediate = append(machinesToRemediate, m)
}
}

// If there are no machines to remediate return early.
if len(machinesToRemediate) == 0 {
return ctrl.Result{}, nil
}

preflightChecksPassed := false
preflightChecksResult, preflightCheckErrMessage, err := r.runPreflightChecks(ctx, cluster, ms, "Machine Remediation")
preflightChecksPassed = preflightChecksResult.IsZero() && err == nil
if err != nil {
// If err is not nil use that as the preflightCheckErrMessage
preflightCheckErrMessage = err.Error()
}

var aggregateErr error
if preflightChecksPassed {
// PreflightChecks passed, so it is safe to remediate unhealthy machines.
// Remediate unhealthy machines by deleting them.
var errs []error
for _, m := range machinesToRemediate {
log.Info(fmt.Sprintf("Deleting Machine %s because marked as unhealthy by the MachineHealthCheck controller", klog.KObj(m)))
patch := client.MergeFrom(m.DeepCopy())
if err := r.Client.Delete(ctx, m); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to delete Machine %s", klog.KObj(m)))
continue
}
conditions.MarkTrue(m, clusterv1.MachineOwnerRemediatedCondition)
if err := r.Client.Status().Patch(ctx, m, patch); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, errors.Wrapf(err, "failed to update status of Machine %s", klog.KObj(m)))
}
}
aggregateErr = kerrors.NewAggregate(errs)
if aggregateErr != nil {
log.Info("Failed while deleting unhealthy machines", "err", aggregateErr)
}
} else {
// PreflightChecks did not pass. Update the MachineOwnerRemediated condition on the unhealthy Machines with
// WaitingForRemediationReason reason.
var errs []error
for _, m := range machinesToRemediate {
patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to create patch helper for Machine %s", klog.KObj(m)))
continue
}
conditions.MarkFalse(m, clusterv1.MachineOwnerRemediatedCondition, clusterv1.WaitingForRemediationReason, clusterv1.ConditionSeverityWarning, preflightCheckErrMessage)
if err := patchHelper.Patch(ctx, m); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to patch Machine %s", klog.KObj(m)))
}
}
aggregateErr = kerrors.NewAggregate(errs)
if aggregateErr != nil {
log.Info("Failed while patching unhealthy machines", "err", aggregateErr)
}
}

if aggregateErr != nil {
return ctrl.Result{}, aggregateErr
}
return preflightChecksResult, nil
}

func reconcileExternalTemplateReference(ctx context.Context, c client.Client, cluster *clusterv1.Cluster, ref *corev1.ObjectReference) error {
if !strings.HasSuffix(ref.Kind, clusterv1.TemplateSuffix) {
return nil
Expand Down
189 changes: 189 additions & 0 deletions internal/controllers/machineset/machineset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/record"
utilfeature "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/contract"
"sigs.k8s.io/cluster-api/internal/test/builder"
"sigs.k8s.io/cluster-api/internal/util/ssa"
Expand Down Expand Up @@ -1326,6 +1328,193 @@ func TestMachineSetReconciler_syncMachines(t *testing.T) {
}, 5*time.Second).Should(Succeed())
}

func TestMachineSetReconciler_reconcileUnhealthyMachines(t *testing.T) {
t.Run("should delete unhealthy machines if preflight checks pass", func(t *testing.T) {
defer utilfeature.SetFeatureGateDuringTest(t, feature.Gates, feature.MachineSetPreflightChecks, true)()

g := NewWithT(t)

controlPlaneStable := builder.ControlPlane("default", "cp1").
WithVersion("v1.26.2").
WithStatusFields(map[string]interface{}{
"status.version": "v1.26.2",
}).
Build()
cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Namespace: "default",
},
Spec: clusterv1.ClusterSpec{
ControlPlaneRef: contract.ObjToRef(controlPlaneStable),
},
}
machineSet := &clusterv1.MachineSet{}

unhealthyMachine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: "unhealthy-machine",
Namespace: "default",
},
Status: clusterv1.MachineStatus{
Conditions: []clusterv1.Condition{
{
Type: clusterv1.MachineOwnerRemediatedCondition,
Status: corev1.ConditionFalse,
},
},
},
}
healthyMachine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: "healthy-machine",
Namespace: "default",
},
}

machines := []*clusterv1.Machine{unhealthyMachine, healthyMachine}

fakeClient := fake.NewClientBuilder().WithObjects(controlPlaneStable, unhealthyMachine, healthyMachine).Build()
r := &Reconciler{Client: fakeClient}
_, err := r.reconcileUnhealthyMachines(ctx, cluster, machineSet, machines)
g.Expect(err).To(BeNil())
// Verify the unhealthy machine is deleted.
g.Eventually(func(g Gomega) {
m := &clusterv1.Machine{}
err := r.Client.Get(ctx, client.ObjectKeyFromObject(unhealthyMachine), m)
g.Expect(apierrors.IsNotFound(err)).To(BeTrue())
}).Should(Succeed())
// Verify the healthy machine is not deleted.
g.Consistently(func(g Gomega) {
m := &clusterv1.Machine{}
g.Expect(r.Client.Get(ctx, client.ObjectKeyFromObject(healthyMachine), m)).Should(Succeed())
}).Should(Succeed())
})

t.Run("should update the unhealthy machine MachineOwnerRemediated condition if preflight checks did not pass", func(t *testing.T) {
defer utilfeature.SetFeatureGateDuringTest(t, feature.Gates, feature.MachineSetPreflightChecks, true)()

g := NewWithT(t)

// An upgrading control plane should cause the preflight checks to not pass.
controlPlaneUpgrading := builder.ControlPlane("default", "cp1").
WithVersion("v1.26.2").
WithStatusFields(map[string]interface{}{
"status.version": "v1.25.2",
}).
Build()
cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Namespace: "default",
},
Spec: clusterv1.ClusterSpec{
ControlPlaneRef: contract.ObjToRef(controlPlaneUpgrading),
},
}
machineSet := &clusterv1.MachineSet{}

unhealthyMachine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: "unhealthy-machine",
Namespace: "default",
},
Status: clusterv1.MachineStatus{
Conditions: []clusterv1.Condition{
{
Type: clusterv1.MachineOwnerRemediatedCondition,
Status: corev1.ConditionFalse,
},
},
},
}
healthyMachine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: "healthy-machine",
Namespace: "default",
},
}

machines := []*clusterv1.Machine{unhealthyMachine, healthyMachine}
fakeClient := fake.NewClientBuilder().WithObjects(controlPlaneUpgrading, unhealthyMachine, healthyMachine).WithStatusSubresource(&clusterv1.Machine{}).Build()
r := &Reconciler{Client: fakeClient}
_, err := r.reconcileUnhealthyMachines(ctx, cluster, machineSet, machines)
g.Expect(err).To(BeNil())
// Verify the unhealthy machine has the updated condition.
g.Eventually(func(g Gomega) {
m := &clusterv1.Machine{}
g.Expect(r.Client.Get(ctx, client.ObjectKeyFromObject(unhealthyMachine), m)).To(Succeed())
g.Expect(conditions.Has(m, clusterv1.MachineOwnerRemediatedCondition)).
To(BeTrue(), "Machine should have the %s condition set",
clusterv1.MachineOwnerRemediatedCondition)
machineOwnerRemediatedCondition := conditions.Get(m, clusterv1.MachineOwnerRemediatedCondition)
g.Expect(machineOwnerRemediatedCondition.Status).
To(Equal(corev1.ConditionFalse), "%s condition status should be false",
clusterv1.MachineOwnerRemediatedCondition)
g.Expect(machineOwnerRemediatedCondition.Reason).
To(Equal(clusterv1.WaitingForRemediationReason), "%s condition should have reason %s",
clusterv1.MachineOwnerRemediatedCondition, clusterv1.WaitingForRemediationReason)
}).Should(Succeed())
})
}

func TestMachineSetReconciler_syncReplicas(t *testing.T) {
t.Run("should hold off on creating new machines when preflight checks do not pass", func(t *testing.T) {
defer utilfeature.SetFeatureGateDuringTest(t, feature.Gates, feature.MachineSetPreflightChecks, true)()

g := NewWithT(t)

// An upgrading control plane should cause the preflight checks to not pass.
controlPlaneUpgrading := builder.ControlPlane("default", "test-cp").
WithVersion("v1.26.2").
WithStatusFields(map[string]interface{}{
"status.version": "v1.25.2",
}).
Build()
cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Namespace: "default",
},
Spec: clusterv1.ClusterSpec{
ControlPlaneRef: contract.ObjToRef(controlPlaneUpgrading),
},
}
machineSet := &clusterv1.MachineSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-machineset",
Namespace: "default",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32(1),
},
}

fakeClient := fake.NewClientBuilder().WithObjects(controlPlaneUpgrading, machineSet).WithStatusSubresource(&clusterv1.MachineSet{}).Build()
r := &Reconciler{Client: fakeClient}
result, err := r.syncReplicas(ctx, cluster, machineSet, nil)
g.Expect(err).To(BeNil())
g.Expect(result.IsZero()).To(BeFalse(), "syncReplicas should return a 'zero' result")
// Verify the proper condition is set on the MachineSet.
g.Expect(conditions.Has(machineSet, clusterv1.MachinesCreatedCondition)).
To(BeTrue(), "MachineSet should have the %s condition set",
clusterv1.MachinesCreatedCondition)
machinesCreatedCondition := conditions.Get(machineSet, clusterv1.MachinesCreatedCondition)
g.Expect(machinesCreatedCondition.Status).
To(Equal(corev1.ConditionFalse), "%s condition status should be %s",
clusterv1.MachinesCreatedCondition, corev1.ConditionFalse)
g.Expect(machinesCreatedCondition.Reason).
To(Equal(clusterv1.PreflightCheckFailedReason), "%s condition reason should be %s",
clusterv1.MachinesCreatedCondition, clusterv1.PreflightCheckFailedReason)
// Verify no new Machines are created.
g.Consistently(func(g Gomega) {
machineList := &clusterv1.MachineList{}
g.Expect(r.Client.List(ctx, machineList)).To(Succeed())
g.Expect(machineList.Items).To(BeEmpty(), "There should not be any machines")
})
})
}

func TestComputeDesiredMachine(t *testing.T) {
duration5s := &metav1.Duration{Duration: 5 * time.Second}
duration10s := &metav1.Duration{Duration: 10 * time.Second}
Expand Down
Loading

0 comments on commit e1a1105

Please sign in to comment.