Skip to content

Commit

Permalink
Make locks more generic
Browse files Browse the repository at this point in the history
Implementation details of lock should not leak into the calling
methods.

Without this path, calls are a bit more complex
and error handling is harder to find.

This is a problem for long term maintenance, as it
is tougher to refactor the locks without impacting the main.

Decoupling the two (main usage of the lock, and the lock
themselves) will allow us to introduce other kinds of locks
easily.

I solve this by inlining into the daemonsetlock package:
- including all the methods for managing locks from the main.go
  functions. Those were mostly doing error handling
  where code became no-op by introducing multiple
  daemonsetlock types
- adding the lock release delay part of lock info

I also did not like the pattern include in Test method,
which added a reference to nodeMeta: It was not very clear
that Test was storing the current metadata of the node,
or was returning the current state. (Metadata here only means unschedulable).

The problem I saw was that the metadata was silently
mutated from a lock Test method, which was very not obvious.

Instead, I picked to explicitly return the lock data instead.

I also made it explicit that the Acquire lock method
is passing the node metadata as structured information,
rather than an interface{}. This is a bit more fragile
at runtime, but I prefer having very explicit errors if
the locks are incorrect, rather than having to deal with
unvalidated data.

For the lock release delay, it was part of the rebootasrequired
loop, where I believe it makes more sense to be part of the
Release method itself, for readability. Yet, it hides the
delay into the implementation detail, but it keeps the
reboot as required goroutine more readable.

Instead of passing the argument rebootDelay as parameter of the
rebootasrequired method, this refactor took creation of the lock
object in the main loop, close to all the variables, and then
pass the lock object to the rebootasrequired. This makes the
call for rebootasrequired more clear, and lock is now
encompassing everything needed to acquire, release, or get
info about the lock.

Signed-off-by: Jean-Philippe Evrard <[email protected]>
  • Loading branch information
evrardjp committed Oct 17, 2024
1 parent 97c8994 commit 1e8b592
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 229 deletions.
163 changes: 60 additions & 103 deletions cmd/kured/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,6 @@ func main() {
log.Warnf(err.Error())
}

log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation)
if lockTTL > 0 {
log.Infof("Lock TTL set, lock will expire after: %v", lockTTL)
} else {
log.Info("Lock TTL not set, lock will remain until being released")
}
if lockReleaseDelay > 0 {
log.Infof("Lock release delay set, lock release will be delayed by: %v", lockReleaseDelay)
} else {
log.Info("Lock release delay not set, lock will be released immediately after rebooting")
}

log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName)

// This should be printed from blocker list instead of only blocking pod selectors
Expand Down Expand Up @@ -278,7 +266,30 @@ func main() {
checker = checkers.NewFileRebootChecker(rebootSentinelFile)
}

go rebootAsRequired(nodeID, rebooter, checker, window, lockTTL, lockReleaseDelay)
config, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err)
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}

log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation)
if lockTTL > 0 {
log.Infof("Lock TTL set, lock will expire after: %v", lockTTL)
} else {
log.Info("Lock TTL not set, lock will remain until being released")
}
if lockReleaseDelay > 0 {
log.Infof("Lock release delay set, lock release will be delayed by: %v", lockReleaseDelay)
} else {
log.Info("Lock release delay not set, lock will be released immediately after rebooting")
}
lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation, lockTTL, concurrency, lockReleaseDelay)

go rebootAsRequired(nodeID, rebooter, checker, window, lock, client)
go maintainRebootRequiredMetric(nodeID, checker)

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -400,68 +411,6 @@ func stripQuotes(str string) string {
return str
}

func holding(lock *daemonsetlock.DaemonSetLock, metadata interface{}, isMultiLock bool) bool {
var holding bool
var err error
if isMultiLock {
holding, err = lock.TestMultiple()
} else {
holding, err = lock.Test(metadata)
}
if err != nil {
log.Fatalf("Error testing lock: %v", err)
}
if holding {
log.Infof("Holding lock")
}
return holding
}

func acquire(lock *daemonsetlock.DaemonSetLock, metadata interface{}, TTL time.Duration, maxOwners int) bool {
var holding bool
var holder string
var err error
if maxOwners > 1 {
var holders []string
holding, holders, err = lock.AcquireMultiple(metadata, TTL, maxOwners)
holder = strings.Join(holders, ",")
} else {
holding, holder, err = lock.Acquire(metadata, TTL)
}
switch {
case err != nil:
log.Fatalf("Error acquiring lock: %v", err)
return false
case !holding:
log.Warnf("Lock already held: %v", holder)
return false
default:
log.Infof("Acquired reboot lock")
return true
}
}

func throttle(releaseDelay time.Duration) {
if releaseDelay > 0 {
log.Infof("Delaying lock release by %v", releaseDelay)
time.Sleep(releaseDelay)
}
}

func release(lock *daemonsetlock.DaemonSetLock, isMultiLock bool) {
log.Infof("Releasing lock")

var err error
if isMultiLock {
err = lock.ReleaseMultiple()
} else {
err = lock.Release()
}
if err != nil {
log.Fatalf("Error releasing lock: %v", err)
}
}

func drain(client *kubernetes.Clientset, node *v1.Node) error {
nodename := node.GetName()

Expand Down Expand Up @@ -537,11 +486,6 @@ func maintainRebootRequiredMetric(nodeID string, checker checkers.Checker) {
}
}

// nodeMeta is used to remember information across reboots
type nodeMeta struct {
Unschedulable bool `json:"unschedulable"`
}

func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -616,30 +560,23 @@ func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []stri
}
}

func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, window *timewindow.TimeWindow, TTL time.Duration, releaseDelay time.Duration) {
config, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err)
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}

lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation)
func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset) {

nodeMeta := nodeMeta{}
source := rand.NewSource(time.Now().UnixNano())
tick := delaytick.New(source, 1*time.Minute)
for range tick {
if holding(lock, &nodeMeta, concurrency > 1) {
holding, lockData, err := lock.Holding()
if err != nil {
log.Errorf("Error testing lock: %v", err)
}
if holding {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Errorf("Error retrieving node object via k8s API: %v", err)
continue
}
if !nodeMeta.Unschedulable {

if !lockData.Metadata.Unschedulable {
err = uncordon(client, node)
if err != nil {
log.Errorf("Unable to uncordon %s: %v, will continue to hold lock and retry uncordon", node.GetName(), err)
Expand All @@ -665,8 +602,12 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
}
}
}
throttle(releaseDelay)
release(lock, concurrency > 1)

err = lock.Release()
if err != nil {
log.Errorf("Error releasing lock, will retry: %v", err)
continue
}
break
} else {
break
Expand Down Expand Up @@ -705,7 +646,8 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
if err != nil {
log.Fatalf("Error retrieving node object via k8s API: %v", err)
}
nodeMeta.Unschedulable = node.Spec.Unschedulable

nodeMeta := daemonsetlock.NodeMeta{Unschedulable: node.Spec.Unschedulable}

var timeNowString string
if annotateNodes {
Expand Down Expand Up @@ -738,17 +680,32 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
}
log.Infof("Reboot required%s", rebootRequiredBlockCondition)

if !holding(lock, &nodeMeta, concurrency > 1) && !acquire(lock, &nodeMeta, TTL, concurrency) {
// Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
preferNoScheduleTaint.Enable()
continue
holding, _, err := lock.Holding()
if err != nil {
log.Errorf("Error testing lock: %v", err)
}

if !holding {
acquired, holder, err := lock.Acquire(nodeMeta)
if err != nil {
log.Errorf("Error acquiring lock: %v", err)
}
if !acquired {
log.Warnf("Lock already held: %v", holder)
// Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
preferNoScheduleTaint.Enable()
continue
}
}

err = drain(client, node)
if err != nil {
if !forceReboot {
log.Errorf("Unable to cordon or drain %s: %v, will release lock and retry cordon and drain before rebooting when lock is next acquired", node.GetName(), err)
release(lock, concurrency > 1)
err = lock.Release()
if err != nil {
log.Errorf("Error releasing lock: %v", err)
}
log.Infof("Performing a best-effort uncordon after failed cordon and drain")
uncordon(client, node)
continue
Expand Down
Loading

0 comments on commit 1e8b592

Please sign in to comment.