Skip to content

Commit

Permalink
Merge pull request #486 from jackfrancis/retry-cordon-drain
Browse files Browse the repository at this point in the history
retry cordon + drain if fail, keep lock
  • Loading branch information
jackfrancis authored May 6, 2022
2 parents 4ab3bf9 + 93d6a78 commit d965e7f
Showing 1 changed file with 71 additions and 41 deletions.
112 changes: 71 additions & 41 deletions cmd/kured/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func release(lock *daemonsetlock.DaemonSetLock) {
}
}

func drain(client *kubernetes.Clientset, node *v1.Node) {
func drain(client *kubernetes.Clientset, node *v1.Node) error {
nodename := node.GetName()

log.Infof("Draining node %s", nodename)
Expand All @@ -433,23 +433,18 @@ func drain(client *kubernetes.Clientset, node *v1.Node) {
}

if err := kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil {
if !forceReboot {
log.Fatalf("Error cordonning %s: %v", nodename, err)
}
log.Errorf("Error cordonning %s: %v, continuing with reboot anyway", nodename, err)
return
log.Errorf("Error cordonning %s: %v", nodename, err)
return err
}

if err := kubectldrain.RunNodeDrain(drainer, nodename); err != nil {
if !forceReboot {
log.Fatalf("Error draining %s: %v", nodename, err)
}
log.Errorf("Error draining %s: %v, continuing with reboot anyway", nodename, err)
return
log.Errorf("Error draining %s: %v", nodename, err)
return err
}
return nil
}

func uncordon(client *kubernetes.Clientset, node *v1.Node) {
func uncordon(client *kubernetes.Clientset, node *v1.Node) error {
nodename := node.GetName()
log.Infof("Uncordoning node %s", nodename)
drainer := &kubectldrain.Helper{
Expand All @@ -460,7 +455,9 @@ func uncordon(client *kubernetes.Clientset, node *v1.Node) {
}
if err := kubectldrain.RunCordonOrUncordon(drainer, node, false); err != nil {
log.Fatalf("Error uncordonning %s: %v", nodename, err)
return err
}
return nil
}

func invokeReboot(nodeID string, rebootCommand []string) {
Expand Down Expand Up @@ -493,10 +490,11 @@ type nodeMeta struct {
Unschedulable bool `json:"unschedulable"`
}

func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) {
func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Fatalf("Error retrieving node object via k8s API: %s", err)
log.Errorf("Error retrieving node object via k8s API: %s", err)
return err
}
for k, v := range annotations {
node.Annotations[k] = v
Expand All @@ -505,7 +503,8 @@ func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations

bytes, err := json.Marshal(node)
if err != nil {
log.Fatalf("Error marshalling node object into JSON: %v", err)
log.Errorf("Error marshalling node object into JSON: %v", err)
return err
}

_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
Expand All @@ -514,11 +513,13 @@ func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations
for k, v := range annotations {
annotationsErr += fmt.Sprintf("%s=%s ", k, v)
}
log.Fatalf("Error adding node annotations %s via k8s API: %v", annotationsErr, err)
log.Errorf("Error adding node annotations %s via k8s API: %v", annotationsErr, err)
return err
}
return nil
}

func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) {
func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) error {
log.Infof("Deleting node %s annotation %s", nodeID, key)

// JSON Patch takes as path input a JSON Pointer, defined in RFC6901
Expand All @@ -527,8 +528,10 @@ func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) {
patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1")))
_, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
log.Fatalf("Error deleting node annotation %s via k8s API: %v", key, err)
log.Errorf("Error deleting node annotation %s via k8s API: %v", key, err)
return err
}
return nil
}

func rebootAsRequired(nodeID string, rebootCommand []string, sentinelCommand []string, window *timewindow.TimeWindow, TTL time.Duration, releaseDelay time.Duration) {
Expand All @@ -545,26 +548,41 @@ func rebootAsRequired(nodeID string, rebootCommand []string, sentinelCommand []s
lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation)

nodeMeta := nodeMeta{}
if holding(lock, &nodeMeta) {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Fatalf("Error retrieving node object via k8s API: %v", err)
}
if !nodeMeta.Unschedulable {
uncordon(client, node)
}
// If we're holding the lock we know we've tried, in a prior run, to reboot
// So (1) we want to confirm that the reboot succeeded practically ( !rebootRequired() )
// And (2) check if we previously annotated the node that it was in the process of being rebooted,
// And finally (3) if it has that annotation, to delete it.
// This indicates to other node tools running on the cluster that this node may be a candidate for maintenance
if annotateNodes && !rebootRequired(sentinelCommand) {
if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok {
deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
source := rand.NewSource(time.Now().UnixNano())
tick := delaytick.New(source, 1*time.Minute)
for range tick {
if holding(lock, &nodeMeta) {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Errorf("Error retrieving node object via k8s API: %v", err)
continue
}
if !nodeMeta.Unschedulable {
err = uncordon(client, node)
if err != nil {
log.Errorf("Unable to uncordon %s: %v, will continue to hold lock and retry uncordon", node.GetName(), err)
continue
}
}
// If we're holding the lock we know we've tried, in a prior run, to reboot
// So (1) we want to confirm that the reboot succeeded practically ( !rebootRequired() )
// And (2) check if we previously annotated the node that it was in the process of being rebooted,
// And finally (3) if it has that annotation, to delete it.
// This indicates to other node tools running on the cluster that this node may be a candidate for maintenance
if annotateNodes && !rebootRequired(sentinelCommand) {
if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok {
err := deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
if err != nil {
continue
}
}
}
throttle(releaseDelay)
release(lock)
break
} else {
break
}
throttle(releaseDelay)
release(lock)
}

preferNoScheduleTaint := taints.New(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule)
Expand All @@ -580,8 +598,8 @@ func rebootAsRequired(nodeID string, rebootCommand []string, sentinelCommand []s
log.Fatal("Unable to create prometheus client: ", err)
}

source := rand.NewSource(time.Now().UnixNano())
tick := delaytick.New(source, period)
source = rand.NewSource(time.Now().UnixNano())
tick = delaytick.New(source, period)
for range tick {
if !window.Contains(time.Now()) {
// Remove taint outside the reboot time window to allow for normal operation.
Expand Down Expand Up @@ -623,17 +641,29 @@ func rebootAsRequired(nodeID string, rebootCommand []string, sentinelCommand []s
annotations := map[string]string{KuredRebootInProgressAnnotation: timeNowString}
// & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot
annotations[KuredMostRecentRebootNeededAnnotation] = timeNowString
addNodeAnnotations(client, nodeID, annotations)
err := addNodeAnnotations(client, nodeID, annotations)
if err != nil {
continue
}
}
}

if !acquire(lock, &nodeMeta, TTL) {
if !holding(lock, &nodeMeta) && !acquire(lock, &nodeMeta, TTL) {
// Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
preferNoScheduleTaint.Enable()
continue
}

drain(client, node)
err = drain(client, node)
if err != nil {
if !forceReboot {
log.Errorf("Unable to cordon or drain %s: %v, will release lock and retry cordon and drain before rebooting when lock is next acquired", node.GetName(), err)
release(lock)
log.Infof("Performing a best-effort uncordon after failed cordon and drain")
uncordon(client, node)
continue
}
}

if rebootDelay > 0 {
log.Infof("Delaying reboot for %v", rebootDelay)
Expand Down

0 comments on commit d965e7f

Please sign in to comment.