From 93d6a783a17d09962f96c9d7a8794e08547b9b01 Mon Sep 17 00:00:00 2001 From: Jack Date: Thu, 16 Dec 2021 11:59:05 -0800 Subject: [PATCH] retry cordon + drain if fail, keep lock --- cmd/kured/main.go | 112 +++++++++++++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 41 deletions(-) diff --git a/cmd/kured/main.go b/cmd/kured/main.go index c96136045..059773122 100644 --- a/cmd/kured/main.go +++ b/cmd/kured/main.go @@ -408,7 +408,7 @@ func release(lock *daemonsetlock.DaemonSetLock) { } } -func drain(client *kubernetes.Clientset, node *v1.Node) { +func drain(client *kubernetes.Clientset, node *v1.Node) error { nodename := node.GetName() log.Infof("Draining node %s", nodename) @@ -433,23 +433,18 @@ func drain(client *kubernetes.Clientset, node *v1.Node) { } if err := kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil { - if !forceReboot { - log.Fatalf("Error cordonning %s: %v", nodename, err) - } - log.Errorf("Error cordonning %s: %v, continuing with reboot anyway", nodename, err) - return + log.Errorf("Error cordonning %s: %v", nodename, err) + return err } if err := kubectldrain.RunNodeDrain(drainer, nodename); err != nil { - if !forceReboot { - log.Fatalf("Error draining %s: %v", nodename, err) - } - log.Errorf("Error draining %s: %v, continuing with reboot anyway", nodename, err) - return + log.Errorf("Error draining %s: %v", nodename, err) + return err } + return nil } -func uncordon(client *kubernetes.Clientset, node *v1.Node) { +func uncordon(client *kubernetes.Clientset, node *v1.Node) error { nodename := node.GetName() log.Infof("Uncordoning node %s", nodename) drainer := &kubectldrain.Helper{ @@ -460,7 +455,9 @@ func uncordon(client *kubernetes.Clientset, node *v1.Node) { } if err := kubectldrain.RunCordonOrUncordon(drainer, node, false); err != nil { log.Fatalf("Error uncordonning %s: %v", nodename, err) + return err } + return nil } func invokeReboot(nodeID string, rebootCommand []string) { @@ -493,10 +490,11 @@ type nodeMeta struct { Unschedulable bool `json:"unschedulable"` } -func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) { +func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error { node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) if err != nil { - log.Fatalf("Error retrieving node object via k8s API: %s", err) + log.Errorf("Error retrieving node object via k8s API: %s", err) + return err } for k, v := range annotations { node.Annotations[k] = v @@ -505,7 +503,8 @@ func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations bytes, err := json.Marshal(node) if err != nil { - log.Fatalf("Error marshalling node object into JSON: %v", err) + log.Errorf("Error marshalling node object into JSON: %v", err) + return err } _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{}) @@ -514,11 +513,13 @@ func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations for k, v := range annotations { annotationsErr += fmt.Sprintf("%s=%s ", k, v) } - log.Fatalf("Error adding node annotations %s via k8s API: %v", annotationsErr, err) + log.Errorf("Error adding node annotations %s via k8s API: %v", annotationsErr, err) + return err } + return nil } -func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) { +func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) error { log.Infof("Deleting node %s annotation %s", nodeID, key) // JSON Patch takes as path input a JSON Pointer, defined in RFC6901 @@ -527,8 +528,10 @@ func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) { patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1"))) _, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{}) if err != nil { - log.Fatalf("Error deleting node annotation %s via k8s API: %v", key, err) + log.Errorf("Error deleting node annotation %s via k8s API: %v", key, err) + return err } + return nil } func rebootAsRequired(nodeID string, rebootCommand []string, sentinelCommand []string, window *timewindow.TimeWindow, TTL time.Duration, releaseDelay time.Duration) { @@ -545,26 +548,41 @@ func rebootAsRequired(nodeID string, rebootCommand []string, sentinelCommand []s lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation) nodeMeta := nodeMeta{} - if holding(lock, &nodeMeta) { - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) - if err != nil { - log.Fatalf("Error retrieving node object via k8s API: %v", err) - } - if !nodeMeta.Unschedulable { - uncordon(client, node) - } - // If we're holding the lock we know we've tried, in a prior run, to reboot - // So (1) we want to confirm that the reboot succeeded practically ( !rebootRequired() ) - // And (2) check if we previously annotated the node that it was in the process of being rebooted, - // And finally (3) if it has that annotation, to delete it. - // This indicates to other node tools running on the cluster that this node may be a candidate for maintenance - if annotateNodes && !rebootRequired(sentinelCommand) { - if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok { - deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation) + source := rand.NewSource(time.Now().UnixNano()) + tick := delaytick.New(source, 1*time.Minute) + for range tick { + if holding(lock, &nodeMeta) { + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{}) + if err != nil { + log.Errorf("Error retrieving node object via k8s API: %v", err) + continue + } + if !nodeMeta.Unschedulable { + err = uncordon(client, node) + if err != nil { + log.Errorf("Unable to uncordon %s: %v, will continue to hold lock and retry uncordon", node.GetName(), err) + continue + } + } + // If we're holding the lock we know we've tried, in a prior run, to reboot + // So (1) we want to confirm that the reboot succeeded practically ( !rebootRequired() ) + // And (2) check if we previously annotated the node that it was in the process of being rebooted, + // And finally (3) if it has that annotation, to delete it. + // This indicates to other node tools running on the cluster that this node may be a candidate for maintenance + if annotateNodes && !rebootRequired(sentinelCommand) { + if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok { + err := deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation) + if err != nil { + continue + } + } } + throttle(releaseDelay) + release(lock) + break + } else { + break } - throttle(releaseDelay) - release(lock) } preferNoScheduleTaint := taints.New(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule) @@ -580,8 +598,8 @@ func rebootAsRequired(nodeID string, rebootCommand []string, sentinelCommand []s log.Fatal("Unable to create prometheus client: ", err) } - source := rand.NewSource(time.Now().UnixNano()) - tick := delaytick.New(source, period) + source = rand.NewSource(time.Now().UnixNano()) + tick = delaytick.New(source, period) for range tick { if !window.Contains(time.Now()) { // Remove taint outside the reboot time window to allow for normal operation. @@ -623,17 +641,29 @@ func rebootAsRequired(nodeID string, rebootCommand []string, sentinelCommand []s annotations := map[string]string{KuredRebootInProgressAnnotation: timeNowString} // & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot annotations[KuredMostRecentRebootNeededAnnotation] = timeNowString - addNodeAnnotations(client, nodeID, annotations) + err := addNodeAnnotations(client, nodeID, annotations) + if err != nil { + continue + } } } - if !acquire(lock, &nodeMeta, TTL) { + if !holding(lock, &nodeMeta) && !acquire(lock, &nodeMeta, TTL) { // Prefer to not schedule pods onto this node to avoid draing the same pod multiple times. preferNoScheduleTaint.Enable() continue } - drain(client, node) + err = drain(client, node) + if err != nil { + if !forceReboot { + log.Errorf("Unable to cordon or drain %s: %v, will release lock and retry cordon and drain before rebooting when lock is next acquired", node.GetName(), err) + release(lock) + log.Infof("Performing a best-effort uncordon after failed cordon and drain") + uncordon(client, node) + continue + } + } if rebootDelay > 0 { log.Infof("Delaying reboot for %v", rebootDelay)