Skip to content

Commit

Permalink
feat: implement D-Bus systemd-compatible shutdown for kubelet
Browse files Browse the repository at this point in the history
Add a mock D-Bus daemon and a mock logind implementation over D-Bus.

Kubelet gets a handle to the D-Bus socket, connects over it to our
logind mock and negotiates shutdown activities.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Mar 16, 2022
1 parent 6bec084 commit caf800f
Show file tree
Hide file tree
Showing 17 changed files with 858 additions and 8 deletions.
9 changes: 7 additions & 2 deletions hack/release.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ with a single `--mode` flag that can take the following values:
title = "Kubelet"
description="""\
Kubelet configuration can now be overridden with the `.machine.kubelet.extraConfig` machine configuration field.
As most of the kubelet command line arguments are being depreacted, it is recommended to migrate to `extraConfig`
As most of the kubelet command line arguments are being deprecated, it is recommended to migrate to `extraConfig`
instead of using `extraArgs`.
A number of conformance tweaks have been made to the `kubelet` to allow it to run without
`protectKernelDefaults`.
This includes both kubelet configuration options and sysctls.
Of particular note is that Talos now sets the `kernel.panic` reboot interval to 10s instead of 1s.
If your kubelet fails to start after the upgrade, please check the `kubelet` logs to determine the problem.
Talos now performs graceful kubelet shutdown by default on node reboot/shutdown.
Default shutdown timeouts: 20s for regular priority pods and 10s for critical priority pods.
Timeouts can be overridden with the `.machine.kubelet.extraConfig` machine configuration key:
`shutdownGracePeriod` and `shutdownGracePeriodCriticalPods`.
"""

[notes.auditlog]
Expand Down Expand Up @@ -144,7 +149,7 @@ Old behavior can be achieved by specifying empty flag value: `--kubernetes-versi
[notes.admission]
title = "Admission Plugin Configuration"
description="""\
Talos now supports Kubernetes API server admission plugin configuration via the `.cluster.apiServer.admissonControl` machine configuration field.
Talos now supports Kubernetes API server admission plugin configuration via the `.cluster.apiServer.admissionControl` machine configuration field.
This configuration can be used to enable [Pod Security Admission](https://kubernetes.io/docs/concepts/security/pod-security-admission/) plugin and
define cluster-wide default [Pod Security Standards](https://kubernetes.io/docs/concepts/security/pod-security-standards/).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (ctrl *K8sControlPlaneController) Run(ctx context.Context, r controller.Run

for _, f := range []func(context.Context, controller.Runtime, *zap.Logger, talosconfig.Provider) error{
ctrl.manageAPIServerConfig,
ctrl.manageAdmissonControlConfig,
ctrl.manageAdmissionControlConfig,
ctrl.manageControllerManagerConfig,
ctrl.manageSchedulerConfig,
ctrl.manageManifestsConfig,
Expand Down Expand Up @@ -159,7 +159,7 @@ func (ctrl *K8sControlPlaneController) manageAPIServerConfig(ctx context.Context
})
}

func (ctrl *K8sControlPlaneController) manageAdmissonControlConfig(ctx context.Context, r controller.Runtime, logger *zap.Logger, cfgProvider talosconfig.Provider) error {
func (ctrl *K8sControlPlaneController) manageAdmissionControlConfig(ctx context.Context, r controller.Runtime, logger *zap.Logger, cfgProvider talosconfig.Provider) error {
spec := config.K8sAdmissionControlSpec{}

for _, cfg := range cfgProvider.Cluster().APIServer().AdmissionControl() {
Expand Down
8 changes: 8 additions & 0 deletions internal/app/machined/pkg/controllers/k8s/kubelet_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ func NewKubeletConfiguration(clusterDNS []string, dnsDomain string, extraConfig
config.Logging.Format = "json"
}

if config.ShutdownGracePeriod.Duration == 0 {
config.ShutdownGracePeriod = metav1.Duration{Duration: constants.KubeletShutdownGracePeriod}
}

if config.ShutdownGracePeriodCriticalPods.Duration == 0 {
config.ShutdownGracePeriodCriticalPods = metav1.Duration{Duration: constants.KubeletShutdownGracePeriodCriticalPods}
}

if config.StreamingConnectionIdleTimeout.Duration == 0 {
config.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 5 * time.Minute}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,11 @@ func TestNewKubeletConfigurationSuccess(t *testing.T) {
Logging: v1alpha1.LoggingConfiguration{
Format: "json",
},

StreamingConnectionIdleTimeout: metav1.Duration{Duration: 5 * time.Minute},
TLSMinVersion: "VersionTLS13",
EnableDebuggingHandlers: pointer.ToBool(true),
ShutdownGracePeriod: metav1.Duration{Duration: constants.KubeletShutdownGracePeriod},
ShutdownGracePeriodCriticalPods: metav1.Duration{Duration: constants.KubeletShutdownGracePeriodCriticalPods},
StreamingConnectionIdleTimeout: metav1.Duration{Duration: 5 * time.Minute},
TLSMinVersion: "VersionTLS13",
EnableDebuggingHandlers: pointer.ToBool(true),
},
config)
}
10 changes: 10 additions & 0 deletions internal/app/machined/pkg/runtime/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package runtime

import (
"context"

"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/registry"
"github.com/talos-systems/go-blockdevice/blockdevice/probe"
Expand Down Expand Up @@ -37,6 +39,7 @@ type MachineState interface {
StagedInstallOptions() []byte
KexecPrepared(bool)
IsKexecPrepared() bool
DBus() DBusState
}

// ClusterState defines the cluster state.
Expand All @@ -51,3 +54,10 @@ type V1Alpha2State interface {

SetConfig(config.Provider) error
}

// DBusState defines the D-Bus logind mock.
type DBusState interface {
Start() error
Stop() error
WaitShutdown(ctx context.Context) error
}
83 changes: 83 additions & 0 deletions internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_dbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package v1alpha1

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/talos-systems/talos/internal/pkg/logind"
"github.com/talos-systems/talos/pkg/machinery/constants"
)

// DBusState implements the logind mock.
type DBusState struct {
broker *logind.DBusBroker
logindMock *logind.ServiceMock
errCh chan error
cancel context.CancelFunc
}

// Start the D-Bus broker and logind mock.
func (dbus *DBusState) Start() error {
for _, path := range []string{constants.DBusServiceSocketPath, constants.DBusClientSocketPath} {
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
return err
}
}

var err error

dbus.broker, err = logind.NewBroker(constants.DBusServiceSocketPath, constants.DBusClientSocketPath)
if err != nil {
return err
}

var ctx context.Context

ctx, dbus.cancel = context.WithCancel(context.Background())

dbus.errCh = make(chan error)

go func() {
dbus.errCh <- dbus.broker.Run(ctx)
}()

dbus.logindMock, err = logind.NewServiceMock(constants.DBusServiceSocketPath)

return err
}

// Stop the D-Bus broker and logind mock.
func (dbus *DBusState) Stop() error {
dbus.cancel()

if err := dbus.logindMock.Close(); err != nil {
return err
}

if err := dbus.broker.Close(); err != nil {
return err
}

select {
case <-time.After(time.Second):
return fmt.Errorf("timed out stopping D-Bus broker")
case err := <-dbus.errCh:
return err
}
}

// WaitShutdown signals the shutdown over the D-Bus and waits for the inhibit lock to be released.
func (dbus *DBusState) WaitShutdown(ctx context.Context) error {
if err := dbus.logindMock.EmitShutdown(); err != nil {
return err
}

return dbus.logindMock.WaitLockRelease(ctx)
}
18 changes: 18 additions & 0 deletions internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func (*Sequencer) Boot(r runtime.Runtime) []runtime.Phase {
).Append(
"containerd",
StartContainerd,
).Append(
"dbus",
StartDBus,
).AppendWhen(
r.State().Platform().Mode() == runtime.ModeContainer,
"sharedFilesystems",
Expand Down Expand Up @@ -263,6 +266,9 @@ func (*Sequencer) Reboot(r runtime.Runtime) []runtime.Phase {
phases := PhaseList{}.Append(
"cleanup",
StopAllPods,
).Append(
"dbus",
StopDBus,
).
AppendList(stopAllPhaselist(r, true)).
Append("reboot", Reboot)
Expand Down Expand Up @@ -294,6 +300,9 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph
!in.GetGraceful(),
"cleanup",
StopAllPods,
).Append(
"dbus",
StopDBus,
).AppendWhen(
in.GetGraceful() && (r.Config().Machine().Type() != machine.TypeWorker),
"leave",
Expand Down Expand Up @@ -331,6 +340,9 @@ func (*Sequencer) Shutdown(r runtime.Runtime, in *machineapi.ShutdownRequest) []
).Append(
"cleanup",
StopAllPods,
).Append(
"dbus",
StopDBus,
).
AppendList(stopAllPhaselist(r, false)).
Append("shutdown", Shutdown)
Expand All @@ -349,6 +361,9 @@ func (*Sequencer) StageUpgrade(r runtime.Runtime, in *machineapi.UpgradeRequest)
phases = phases.Append(
"cleanup",
StopAllPods,
).Append(
"dbus",
StopDBus,
).AppendWhen(
!in.GetPreserve() && (r.Config().Machine().Type() != machine.TypeWorker),
"leave",
Expand Down Expand Up @@ -383,6 +398,9 @@ func (*Sequencer) Upgrade(r runtime.Runtime, in *machineapi.UpgradeRequest) []ru
in.GetPreserve(),
"cleanup",
StopAllPods,
).Append(
"dbus",
StopDBus,
).AppendWhen(
!in.GetPreserve() && (r.Config().Machine().Type() != machine.TypeWorker),
"leave",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,15 @@ func StopAllPods(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionF

func stopAndRemoveAllPods(stopAction cri.StopAction) runtime.TaskExecutionFunc {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
logger.Printf("shutting down kubelet gracefully")

shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, constants.KubeletShutdownGracePeriod*2)
defer shutdownCtxCancel()

if err = r.State().Machine().DBus().WaitShutdown(shutdownCtx); err != nil {
logger.Printf("failed waiting for inhibit shutdown lock: %s", err)
}

if err = system.Services(nil).Stop(ctx, "kubelet"); err != nil {
return err
}
Expand Down Expand Up @@ -1821,3 +1830,21 @@ func KexecPrepare(seq runtime.Sequence, data interface{}) (runtime.TaskExecution
return nil
}, "kexecPrepare"
}

// StartDBus starts the D-Bus mock.
func StartDBus(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
return r.State().Machine().DBus().Start()
}, "startDBus"
}

// StopDBus stops the D-Bus mock.
func StopDBus(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
if err := r.State().Machine().DBus().Stop(); err != nil {
logger.Printf("error stopping D-Bus: %s, ignored", err)
}

return nil
}, "stopDBus"
}
7 changes: 7 additions & 0 deletions internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type MachineState struct {
stagedInstallOptions []byte

kexecPrepared bool

dbus DBusState
}

// ClusterState represents the cluster's state.
Expand Down Expand Up @@ -229,3 +231,8 @@ func (s *MachineState) KexecPrepared(prepared bool) {
func (s *MachineState) IsKexecPrepared() bool {
return s.kexecPrepared
}

// DBus implements the machine state interface.
func (s *MachineState) DBus() runtime.DBusState {
return &s.dbus
}
1 change: 1 addition & 0 deletions internal/app/machined/pkg/system/services/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (k *Kubelet) Runner(r runtime.Runtime) (runner.Runner, error) {
{Type: "bind", Destination: "/etc/cni", Source: "/etc/cni", Options: []string{"rbind", "rshared", "rw"}},
{Type: "bind", Destination: "/usr/libexec/kubernetes", Source: "/usr/libexec/kubernetes", Options: []string{"rbind", "rshared", "rw"}},
{Type: "bind", Destination: "/var/run", Source: "/run", Options: []string{"rbind", "rshared", "rw"}},
{Type: "bind", Destination: "/var/run/dbus/system_bus_socket", Source: constants.DBusClientSocketPath, Options: []string{"bind", "rw"}},
{Type: "bind", Destination: "/var/lib/containerd", Source: "/var/lib/containerd", Options: []string{"rbind", "rshared", "rw"}},
{Type: "bind", Destination: "/var/lib/kubelet", Source: "/var/lib/kubelet", Options: []string{"rbind", "rshared", "rw"}},
{Type: "bind", Destination: "/var/log/containers", Source: "/var/log/containers", Options: []string{"rbind", "rshared", "rw"}},
Expand Down
Loading

0 comments on commit caf800f

Please sign in to comment.