Skip to content

Commit

Permalink
fix: give up virtual IPs before the kubelet workloads are shut down
Browse files Browse the repository at this point in the history
This introduces a shutdown lock basically which makes sure that whatever
needs to be terminated before kubelet shutdown sequence is initiated has
a chance to do so.

With the VIP, as the kubelet graceful shutdown terminates the API server
static pod, the VIP might be still on the node while the API server is
down. With this fix, VIP gets moved away from the node before the
kubelet graceful shutdown sequence starts.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Mar 25, 2022
1 parent 856e133 commit 108fd03
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 0 deletions.
36 changes: 36 additions & 0 deletions internal/app/machined/pkg/controllers/network/operator/vip.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,23 @@ func (vip *VIP) waitForPreconditions(ctx context.Context) error {
return fmt.Errorf("etcd health wait failure: %w", err)
}

// wait for the kubelet lifecycle to be up, and not being torn down
_, err = vip.state.WatchFor(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletLifecycleType, k8s.KubeletLifecycleID, resource.VersionUndefined),
state.WithCondition(func(r resource.Resource) (bool, error) {
if resource.IsTombstone(r) {
return false, nil
}

if r.Metadata().Phase() == resource.PhaseTearingDown {
return false, nil
}

return true, nil
}))
if err != nil {
return fmt.Errorf("kubelet lifecycle wait failure: %w", err)
}

return nil
}

Expand All @@ -179,6 +196,16 @@ func (vip *VIP) campaign(ctx context.Context, notifyCh chan<- struct{}) error {
return fmt.Errorf("error waiting for preconditions: %w", err)
}

// put a finalizer on the kubelet lifecycle and remove once the campaign is done
kubeletLifecycle := resource.NewMetadata(k8s.NamespaceName, k8s.KubeletLifecycleType, k8s.KubeletLifecycleID, resource.VersionUndefined)
if err := vip.state.AddFinalizer(ctx, kubeletLifecycle, vip.Prefix()); err != nil {
return fmt.Errorf("error adding kubelet lifecycle finalizer: %w", err)
}

defer func() {
vip.state.RemoveFinalizer(ctx, kubeletLifecycle, vip.Prefix()) //nolint:errcheck
}()

hostname, err := os.Hostname() // TODO: this should be etcd nodename
if err != nil {
return fmt.Errorf("refusing to join election without a hostname")
Expand Down Expand Up @@ -258,6 +285,10 @@ func (vip *VIP) campaign(ctx context.Context, notifyCh chan<- struct{}) error {
return fmt.Errorf("error setting up etcd watch: %w", err)
}

if err = vip.state.Watch(ctx, kubeletLifecycle, watchCh); err != nil {
return fmt.Errorf("error setting up etcd watch: %w", err)
}

err = vip.state.WatchKind(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodStatusType, "", resource.VersionUndefined), watchCh)
if err != nil {
return fmt.Errorf("kube-apiserver health wait failure: %w", err)
Expand Down Expand Up @@ -291,6 +322,11 @@ observeLoop:
break observeLoop
}
}

// break the loop if the kubelet lifecycle is entering teardown phase
if event.Resource.Metadata().Type() == kubeletLifecycle.Type() && event.Resource.Metadata().ID() == kubeletLifecycle.ID() && event.Resource.Metadata().Phase() == resource.PhaseTearingDown {
break observeLoop
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/AlekSi/pointer"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
multierror "github.com/hashicorp/go-multierror"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/talos-systems/go-blockdevice/blockdevice"
Expand Down Expand Up @@ -67,6 +69,7 @@ import (
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/kernel"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
resourceruntime "github.com/talos-systems/talos/pkg/machinery/resources/runtime"
"github.com/talos-systems/talos/pkg/version"
)
Expand Down Expand Up @@ -1230,8 +1233,39 @@ func StopAllPods(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionF
return stopAndRemoveAllPods(cri.StopOnly), "stopAllPods"
}

func waitForKubeletLifecycleFinalizers(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
logger.Printf("waiting for kubelet lifecycle finalizers")

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

lifecycle := resource.NewMetadata(k8s.NamespaceName, k8s.KubeletLifecycleType, k8s.KubeletLifecycleID, resource.VersionUndefined)

for {
ok, err := r.State().V1Alpha2().Resources().Teardown(ctx, lifecycle)
if err != nil {
return err
}

if ok {
break
}

_, err = r.State().V1Alpha2().Resources().WatchFor(ctx, lifecycle, state.WithFinalizerEmpty())
if err != nil {
return err
}
}

return r.State().V1Alpha2().Resources().Destroy(ctx, lifecycle)
}

func stopAndRemoveAllPods(stopAction cri.StopAction) runtime.TaskExecutionFunc {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
if err = waitForKubeletLifecycleFinalizers(ctx, logger, r); err != nil {
logger.Printf("failed waiting for kubelet lifecycle finalizers: %s", err)
}

logger.Printf("shutting down kubelet gracefully")

shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, constants.KubeletShutdownGracePeriod*2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func NewState() (*State, error) {
&k8s.ConfigStatus{},
&k8s.Endpoint{},
&k8s.KubeletConfig{},
&k8s.KubeletLifecycle{},
&k8s.KubeletSpec{},
&k8s.Manifest{},
&k8s.ManifestStatus{},
Expand Down
7 changes: 7 additions & 0 deletions internal/app/machined/pkg/system/services/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
specs "github.com/opencontainers/runtime-spec/specs-go"

"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
Expand Down Expand Up @@ -66,6 +67,12 @@ func (k *Kubelet) PreFunc(ctx context.Context, r runtime.Runtime) error {
return err
}

// Create lifecycle resource to signal that the kubelet is about to start.
err = r.State().V1Alpha2().Resources().Create(ctx, k8s.NewKubeletLifecycle(k8s.NamespaceName, k8s.KubeletLifecycleID))
if err != nil && !state.IsConflictError(err) { // ignore if the lifecycle resource already exists
return err
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/machinery/resources/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestRegisterResource(t *testing.T) {
&k8s.ConfigStatus{},
&k8s.Endpoint{},
&k8s.KubeletConfig{},
&k8s.KubeletLifecycle{},
&k8s.KubeletSpec{},
&k8s.ManifestStatus{},
&k8s.Manifest{},
Expand Down
76 changes: 76 additions & 0 deletions pkg/machinery/resources/k8s/kubelet_lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package k8s

import (
"fmt"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/meta"
)

// KubeletLifecycleType is type of KubeletLifecycle resource.
const KubeletLifecycleType = resource.Type("KubeletLifecycles.kubernetes.talos.dev")

// KubeletLifecycleID is the singleton ID of the resource.
const KubeletLifecycleID = resource.ID("kubelet")

// KubeletLifecycle resource exists to signal that the kubelet pods are running.
//
// Components might put finalizers on the KubeletLifecycle resource to signal that additional
// actions should be taken before the kubelet is about to be shut down.
//
// KubeletLifecycle is mostly about status of the workloads kubelet is running vs.
// the actual status of the kubelet service itself.
type KubeletLifecycle struct {
md resource.Metadata
spec KubeletLifecycleSpec
}

// KubeletLifecycleSpec is empty.
type KubeletLifecycleSpec struct{}

// NewKubeletLifecycle initializes an empty KubeletLifecycle resource.
func NewKubeletLifecycle(namespace resource.Namespace, id resource.ID) *KubeletLifecycle {
r := &KubeletLifecycle{
md: resource.NewMetadata(namespace, KubeletLifecycleType, id, resource.VersionUndefined),
spec: KubeletLifecycleSpec{},
}

r.md.BumpVersion()

return r
}

// Metadata implements resource.Resource.
func (r *KubeletLifecycle) Metadata() *resource.Metadata {
return &r.md
}

// Spec implements resource.Resource.
func (r *KubeletLifecycle) Spec() interface{} {
return r.spec
}

func (r *KubeletLifecycle) String() string {
return fmt.Sprintf("k8s.KubeletLifecycle(%q)", r.md.ID())
}

// DeepCopy implements resource.Resource.
func (r *KubeletLifecycle) DeepCopy() resource.Resource {
return &KubeletLifecycle{
md: r.md,
spec: r.spec,
}
}

// ResourceDefinition implements meta.ResourceDefinitionProvider interface.
func (r *KubeletLifecycle) ResourceDefinition() meta.ResourceDefinitionSpec {
return meta.ResourceDefinitionSpec{
Type: KubeletLifecycleType,
Aliases: []resource.Type{},
DefaultNamespace: NamespaceName,
}
}

0 comments on commit 108fd03

Please sign in to comment.