diff --git a/hack/release.toml b/hack/release.toml index 4b10fa5702..ef77e9b5b3 100644 --- a/hack/release.toml +++ b/hack/release.toml @@ -45,7 +45,7 @@ with a single `--mode` flag that can take the following values: title = "Kubelet" description="""\ Kubelet configuration can now be overridden with the `.machine.kubelet.extraConfig` machine configuration field. -As most of the kubelet command line arguments are being depreacted, it is recommended to migrate to `extraConfig` +As most of the kubelet command line arguments are being deprecated, it is recommended to migrate to `extraConfig` instead of using `extraArgs`. A number of conformance tweaks have been made to the `kubelet` to allow it to run without @@ -53,6 +53,11 @@ A number of conformance tweaks have been made to the `kubelet` to allow it to ru This includes both kubelet configuration options and sysctls. Of particular note is that Talos now sets the `kernel.panic` reboot interval to 10s instead of 1s. If your kubelet fails to start after the upgrade, please check the `kubelet` logs to determine the problem. + +Talos now performs graceful kubelet shutdown by default on node reboot/shutdown. +Default shutdown timeouts: 20s for regular priority pods and 10s for critical priority pods. +Timeouts can be overridden with the `.machine.kubelet.extraConfig` machine configuration key: +`shutdownGracePeriod` and `shutdownGracePeriodCriticalPods`. """ [notes.auditlog] @@ -144,7 +149,7 @@ Old behavior can be achieved by specifying empty flag value: `--kubernetes-versi [notes.admission] title = "Admission Plugin Configuration" description="""\ -Talos now supports Kubernetes API server admission plugin configuration via the `.cluster.apiServer.admissonControl` machine configuration field. +Talos now supports Kubernetes API server admission plugin configuration via the `.cluster.apiServer.admissionControl` machine configuration field. This configuration can be used to enable [Pod Security Admission](https://kubernetes.io/docs/concepts/security/pod-security-admission/) plugin and define cluster-wide default [Pod Security Standards](https://kubernetes.io/docs/concepts/security/pod-security-standards/). diff --git a/internal/app/machined/pkg/controllers/config/k8s_control_plane.go b/internal/app/machined/pkg/controllers/config/k8s_control_plane.go index d8bdd675b5..59cf6a73db 100644 --- a/internal/app/machined/pkg/controllers/config/k8s_control_plane.go +++ b/internal/app/machined/pkg/controllers/config/k8s_control_plane.go @@ -107,7 +107,7 @@ func (ctrl *K8sControlPlaneController) Run(ctx context.Context, r controller.Run for _, f := range []func(context.Context, controller.Runtime, *zap.Logger, talosconfig.Provider) error{ ctrl.manageAPIServerConfig, - ctrl.manageAdmissonControlConfig, + ctrl.manageAdmissionControlConfig, ctrl.manageControllerManagerConfig, ctrl.manageSchedulerConfig, ctrl.manageManifestsConfig, @@ -159,7 +159,7 @@ func (ctrl *K8sControlPlaneController) manageAPIServerConfig(ctx context.Context }) } -func (ctrl *K8sControlPlaneController) manageAdmissonControlConfig(ctx context.Context, r controller.Runtime, logger *zap.Logger, cfgProvider talosconfig.Provider) error { +func (ctrl *K8sControlPlaneController) manageAdmissionControlConfig(ctx context.Context, r controller.Runtime, logger *zap.Logger, cfgProvider talosconfig.Provider) error { spec := config.K8sAdmissionControlSpec{} for _, cfg := range cfgProvider.Cluster().APIServer().AdmissionControl() { diff --git a/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go b/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go index 53a3dcbf22..fa79533937 100644 --- a/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go +++ b/internal/app/machined/pkg/controllers/k8s/kubelet_spec.go @@ -294,6 +294,14 @@ func NewKubeletConfiguration(clusterDNS []string, dnsDomain string, extraConfig config.Logging.Format = "json" } + if config.ShutdownGracePeriod.Duration == 0 { + config.ShutdownGracePeriod = metav1.Duration{Duration: constants.KubeletShutdownGracePeriod} + } + + if config.ShutdownGracePeriodCriticalPods.Duration == 0 { + config.ShutdownGracePeriodCriticalPods = metav1.Duration{Duration: constants.KubeletShutdownGracePeriodCriticalPods} + } + if config.StreamingConnectionIdleTimeout.Duration == 0 { config.StreamingConnectionIdleTimeout = metav1.Duration{Duration: 5 * time.Minute} } diff --git a/internal/app/machined/pkg/controllers/k8s/kubelet_spec_test.go b/internal/app/machined/pkg/controllers/k8s/kubelet_spec_test.go index 3776c40860..2978120a6a 100644 --- a/internal/app/machined/pkg/controllers/k8s/kubelet_spec_test.go +++ b/internal/app/machined/pkg/controllers/k8s/kubelet_spec_test.go @@ -333,10 +333,11 @@ func TestNewKubeletConfigurationSuccess(t *testing.T) { Logging: v1alpha1.LoggingConfiguration{ Format: "json", }, - - StreamingConnectionIdleTimeout: metav1.Duration{Duration: 5 * time.Minute}, - TLSMinVersion: "VersionTLS13", - EnableDebuggingHandlers: pointer.ToBool(true), + ShutdownGracePeriod: metav1.Duration{Duration: constants.KubeletShutdownGracePeriod}, + ShutdownGracePeriodCriticalPods: metav1.Duration{Duration: constants.KubeletShutdownGracePeriodCriticalPods}, + StreamingConnectionIdleTimeout: metav1.Duration{Duration: 5 * time.Minute}, + TLSMinVersion: "VersionTLS13", + EnableDebuggingHandlers: pointer.ToBool(true), }, config) } diff --git a/internal/app/machined/pkg/runtime/state.go b/internal/app/machined/pkg/runtime/state.go index 3ed16528d1..1bc82844ed 100644 --- a/internal/app/machined/pkg/runtime/state.go +++ b/internal/app/machined/pkg/runtime/state.go @@ -5,6 +5,8 @@ package runtime import ( + "context" + "github.com/cosi-project/runtime/pkg/state" "github.com/cosi-project/runtime/pkg/state/registry" "github.com/talos-systems/go-blockdevice/blockdevice/probe" @@ -37,6 +39,7 @@ type MachineState interface { StagedInstallOptions() []byte KexecPrepared(bool) IsKexecPrepared() bool + DBus() DBusState } // ClusterState defines the cluster state. @@ -51,3 +54,10 @@ type V1Alpha2State interface { SetConfig(config.Provider) error } + +// DBusState defines the D-Bus logind mock. +type DBusState interface { + Start() error + Stop() error + WaitShutdown(ctx context.Context) error +} diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_dbus.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_dbus.go new file mode 100644 index 0000000000..e424e379ff --- /dev/null +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_dbus.go @@ -0,0 +1,83 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package v1alpha1 + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/talos-systems/talos/internal/pkg/logind" + "github.com/talos-systems/talos/pkg/machinery/constants" +) + +// DBusState implements the logind mock. +type DBusState struct { + broker *logind.DBusBroker + logindMock *logind.ServiceMock + errCh chan error + cancel context.CancelFunc +} + +// Start the D-Bus broker and logind mock. +func (dbus *DBusState) Start() error { + for _, path := range []string{constants.DBusServiceSocketPath, constants.DBusClientSocketPath} { + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return err + } + } + + var err error + + dbus.broker, err = logind.NewBroker(constants.DBusServiceSocketPath, constants.DBusClientSocketPath) + if err != nil { + return err + } + + var ctx context.Context + + ctx, dbus.cancel = context.WithCancel(context.Background()) + + dbus.errCh = make(chan error) + + go func() { + dbus.errCh <- dbus.broker.Run(ctx) + }() + + dbus.logindMock, err = logind.NewServiceMock(constants.DBusServiceSocketPath) + + return err +} + +// Stop the D-Bus broker and logind mock. +func (dbus *DBusState) Stop() error { + dbus.cancel() + + if err := dbus.logindMock.Close(); err != nil { + return err + } + + if err := dbus.broker.Close(); err != nil { + return err + } + + select { + case <-time.After(time.Second): + return fmt.Errorf("timed out stopping D-Bus broker") + case err := <-dbus.errCh: + return err + } +} + +// WaitShutdown signals the shutdown over the D-Bus and waits for the inhibit lock to be released. +func (dbus *DBusState) WaitShutdown(ctx context.Context) error { + if err := dbus.logindMock.EmitShutdown(); err != nil { + return err + } + + return dbus.logindMock.WaitLockRelease(ctx) +} diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go index 6ecbb7ee72..f7517b65b5 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer.go @@ -192,6 +192,9 @@ func (*Sequencer) Boot(r runtime.Runtime) []runtime.Phase { ).Append( "containerd", StartContainerd, + ).Append( + "dbus", + StartDBus, ).AppendWhen( r.State().Platform().Mode() == runtime.ModeContainer, "sharedFilesystems", @@ -263,6 +266,9 @@ func (*Sequencer) Reboot(r runtime.Runtime) []runtime.Phase { phases := PhaseList{}.Append( "cleanup", StopAllPods, + ).Append( + "dbus", + StopDBus, ). AppendList(stopAllPhaselist(r, true)). Append("reboot", Reboot) @@ -294,6 +300,9 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph !in.GetGraceful(), "cleanup", StopAllPods, + ).Append( + "dbus", + StopDBus, ).AppendWhen( in.GetGraceful() && (r.Config().Machine().Type() != machine.TypeWorker), "leave", @@ -331,6 +340,9 @@ func (*Sequencer) Shutdown(r runtime.Runtime, in *machineapi.ShutdownRequest) [] ).Append( "cleanup", StopAllPods, + ).Append( + "dbus", + StopDBus, ). AppendList(stopAllPhaselist(r, false)). Append("shutdown", Shutdown) @@ -349,6 +361,9 @@ func (*Sequencer) StageUpgrade(r runtime.Runtime, in *machineapi.UpgradeRequest) phases = phases.Append( "cleanup", StopAllPods, + ).Append( + "dbus", + StopDBus, ).AppendWhen( !in.GetPreserve() && (r.Config().Machine().Type() != machine.TypeWorker), "leave", @@ -383,6 +398,9 @@ func (*Sequencer) Upgrade(r runtime.Runtime, in *machineapi.UpgradeRequest) []ru in.GetPreserve(), "cleanup", StopAllPods, + ).Append( + "dbus", + StopDBus, ).AppendWhen( !in.GetPreserve() && (r.Config().Machine().Type() != machine.TypeWorker), "leave", diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go index 82c19070c8..c18c19976b 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go @@ -1237,6 +1237,15 @@ func StopAllPods(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionF func stopAndRemoveAllPods(stopAction cri.StopAction) runtime.TaskExecutionFunc { return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) { + logger.Printf("shutting down kubelet gracefully") + + shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, constants.KubeletShutdownGracePeriod*2) + defer shutdownCtxCancel() + + if err = r.State().Machine().DBus().WaitShutdown(shutdownCtx); err != nil { + logger.Printf("failed waiting for inhibit shutdown lock: %s", err) + } + if err = system.Services(nil).Stop(ctx, "kubelet"); err != nil { return err } @@ -1821,3 +1830,21 @@ func KexecPrepare(seq runtime.Sequence, data interface{}) (runtime.TaskExecution return nil }, "kexecPrepare" } + +// StartDBus starts the D-Bus mock. +func StartDBus(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { + return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error { + return r.State().Machine().DBus().Start() + }, "startDBus" +} + +// StopDBus stops the D-Bus mock. +func StopDBus(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) { + return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error { + if err := r.State().Machine().DBus().Stop(); err != nil { + logger.Printf("error stopping D-Bus: %s, ignored", err) + } + + return nil + }, "stopDBus" +} diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_state.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_state.go index a88d75358f..99ed2d571c 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_state.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_state.go @@ -39,6 +39,8 @@ type MachineState struct { stagedInstallOptions []byte kexecPrepared bool + + dbus DBusState } // ClusterState represents the cluster's state. @@ -229,3 +231,8 @@ func (s *MachineState) KexecPrepared(prepared bool) { func (s *MachineState) IsKexecPrepared() bool { return s.kexecPrepared } + +// DBus implements the machine state interface. +func (s *MachineState) DBus() runtime.DBusState { + return &s.dbus +} diff --git a/internal/app/machined/pkg/system/services/kubelet.go b/internal/app/machined/pkg/system/services/kubelet.go index d96667b2e4..8c94c307a6 100644 --- a/internal/app/machined/pkg/system/services/kubelet.go +++ b/internal/app/machined/pkg/system/services/kubelet.go @@ -113,6 +113,7 @@ func (k *Kubelet) Runner(r runtime.Runtime) (runner.Runner, error) { {Type: "bind", Destination: "/etc/cni", Source: "/etc/cni", Options: []string{"rbind", "rshared", "rw"}}, {Type: "bind", Destination: "/usr/libexec/kubernetes", Source: "/usr/libexec/kubernetes", Options: []string{"rbind", "rshared", "rw"}}, {Type: "bind", Destination: "/var/run", Source: "/run", Options: []string{"rbind", "rshared", "rw"}}, + {Type: "bind", Destination: "/var/run/dbus/system_bus_socket", Source: constants.DBusClientSocketPath, Options: []string{"bind", "rw"}}, {Type: "bind", Destination: "/var/lib/containerd", Source: "/var/lib/containerd", Options: []string{"rbind", "rshared", "rw"}}, {Type: "bind", Destination: "/var/lib/kubelet", Source: "/var/lib/kubelet", Options: []string{"rbind", "rshared", "rw"}}, {Type: "bind", Destination: "/var/log/containers", Source: "/var/log/containers", Options: []string{"rbind", "rshared", "rw"}}, diff --git a/internal/pkg/logind/broker.go b/internal/pkg/logind/broker.go new file mode 100644 index 0000000000..346d2af0be --- /dev/null +++ b/internal/pkg/logind/broker.go @@ -0,0 +1,264 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package logind + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "log" + "net" + "strings" + "sync" + "syscall" + "time" + + "golang.org/x/sync/errgroup" +) + +// DBusBroker implements simplified D-Bus broker which allows to connect +// kubelet D-Bus connection with Talos logind mock. +// +// Broker doesn't actually implement auth, and it is supposed that service +// connects on one socket, while a client connects on another socket. +type DBusBroker struct { + listenService, listenClient net.Listener +} + +// NewBroker initializes new broker. +func NewBroker(serviceSocketPath, clientSocketPath string) (*DBusBroker, error) { + broker := &DBusBroker{} + + var err error + + broker.listenService, err = net.Listen("unix", serviceSocketPath) + if err != nil { + return nil, err + } + + broker.listenClient, err = net.Listen("unix", clientSocketPath) + if err != nil { + return nil, err + } + + return broker, nil +} + +// Close the listen sockets. +func (broker *DBusBroker) Close() error { + if err := broker.listenClient.Close(); err != nil { + return err + } + + return broker.listenService.Close() +} + +// Run the broker. +func (broker *DBusBroker) Run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + + var ( + connClient, connService net.Conn + mu sync.Mutex + ) + + eg.Go(func() error { return broker.run(ctx, broker.listenService, &mu, &connService, &connClient) }) + eg.Go(func() error { return broker.run(ctx, broker.listenClient, &mu, &connClient, &connService) }) + + return eg.Wait() +} + +func (broker *DBusBroker) run(ctx context.Context, l net.Listener, mu *sync.Mutex, ours, theirs *net.Conn) error { + for ctx.Err() == nil { + conn, err := l.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return nil + } + + return err + } + + handleConn(ctx, conn.(*net.UnixConn), mu, ours, theirs) + } + + return nil +} + +func extractFiles(oob []byte) []int { + if len(oob) == 0 { + return nil + } + + var fds []int + + scms, err := syscall.ParseSocketControlMessage(oob) + if err == nil { + for _, scm := range scms { + var files []int + + files, err = syscall.ParseUnixRights(&scm) + if err == nil { + fds = append(fds, files...) + } + } + } + + return fds +} + +//nolint:gocyclo +func handleConn(ctx context.Context, conn *net.UnixConn, mu *sync.Mutex, ours, theirs *net.Conn) { + defer conn.Close() //nolint: errcheck + + r := bufio.NewReader(conn) + + if err := handleAuth(r, conn); err != nil { + log.Printf("auth failed: %s", err) + + return + } + + mu.Lock() + *ours = conn + mu.Unlock() + + defer func() { + mu.Lock() + *ours = nil + mu.Unlock() + }() + + buf := make([]byte, 4096) + oob := make([]byte, 4096) + + for ctx.Err() == nil { + var ( + n, oobn int + err error + ) + + if r.Buffered() > 0 { + // read remaining buffered data + n, err = r.Read(buf[:r.Buffered()]) + } else { + // read the message and OOB data from the UNIX socket + n, oobn, _, _, err = conn.ReadMsgUnix(buf, oob) + } + + if err != nil { + return + } + + // capture all file descriptors in the OOB message + // broker needs to close the file descriptors as they get passed to the other peer + fds := extractFiles(oob[:oobn]) + + // find the other side of the connection + var w net.Conn + + for i := 0; i < 10; i++ { + mu.Lock() + w = *theirs + mu.Unlock() + + if w != nil { + break + } + + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + } + + if w == nil { + // drop data, as there's no other connection + continue + } + + // send the message and OOB date + // this will pass the file descriptors if they are in the OOB date + if _, _, err = w.(*net.UnixConn).WriteMsgUnix(buf[:n], oob[:oobn], nil); err != nil { + return + } + + // close fds to make sure broker doesn't hold the fds on its side + for _, fd := range fds { + syscall.Close(fd) //nolint:errcheck + } + } +} + +//nolint:gocyclo +func handleAuth(r *bufio.Reader, w io.Writer) error { + readLine := func() (string, error) { + l, err := r.ReadString('\n') + if err != nil { + return l, err + } + + l = strings.TrimRight(l, "\r\n") + + return l, nil + } + + // first, should receive AUTH command preceded by zero byte + line, err := readLine() + if err != nil { + return err + } + + if line != "\x00AUTH" { + return fmt.Errorf("unexpected line, expected AUTH: %q", line) + } + + if _, err = w.Write([]byte("REJECTED EXTERNAL\r\n")); err != nil { + return err + } + + // now real auth command + line, err = readLine() + if err != nil { + return err + } + + if !strings.HasPrefix(line, "AUTH EXTERNAL") { + return fmt.Errorf("unexpected line, expected AUTH EXTERNAL: %q", line) + } + + if _, err = w.Write([]byte("OK 1234deadbeef\r\n")); err != nil { + return err + } + + // negotiate unix FDs + line, err = readLine() + if err != nil { + return err + } + + if line != "NEGOTIATE_UNIX_FD" { + return fmt.Errorf("unexpected line, expected NEGOTIATE_UNIX_FD: %q", line) + } + + if _, err = w.Write([]byte("AGREE_UNIX_FD\r\n")); err != nil { + return err + } + + // BEGIN + line, err = readLine() + if err != nil { + return err + } + + if line != "BEGIN" { + return fmt.Errorf("unexpected line, expected BEGIN: %q", line) + } + + return nil +} diff --git a/internal/pkg/logind/dbus.go b/internal/pkg/logind/dbus.go new file mode 100644 index 0000000000..831e202eb0 --- /dev/null +++ b/internal/pkg/logind/dbus.go @@ -0,0 +1,22 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package logind + +import "github.com/godbus/dbus/v5" + +const ( + dbusPath = dbus.ObjectPath("/org/freedesktop/DBus") + dbusInterface = "org.freedesktop.DBus" +) + +type dbusMock struct{} + +func (dbusMock) Hello() (string, *dbus.Error) { + return "id", nil +} + +func (dbusMock) AddMatch(_ string) *dbus.Error { + return nil +} diff --git a/internal/pkg/logind/kubelet_mock_test.go b/internal/pkg/logind/kubelet_mock_test.go new file mode 100644 index 0000000000..93ed9dea32 --- /dev/null +++ b/internal/pkg/logind/kubelet_mock_test.go @@ -0,0 +1,147 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package logind_test + +import ( + "fmt" + "log" + "syscall" + "time" + + "github.com/godbus/dbus/v5" +) + +const ( + logindService = "org.freedesktop.login1" + logindObject = dbus.ObjectPath("/org/freedesktop/login1") + logindInterface = "org.freedesktop.login1.Manager" +) + +type dBusConnector interface { + Object(dest string, path dbus.ObjectPath) dbus.BusObject + AddMatchSignal(options ...dbus.MatchOption) error + Signal(ch chan<- *dbus.Signal) + Close() error +} + +// DBusCon has functions that can be used to interact with systemd and logind over dbus. +type DBusCon struct { + SystemBus dBusConnector +} + +func NewDBusCon(path string) (*DBusCon, error) { + conn, err := dbus.Connect(path) + if err != nil { + return nil, err + } + + return &DBusCon{ + SystemBus: conn, + }, nil +} + +func (bus *DBusCon) Close() error { + return bus.SystemBus.Close() +} + +// InhibitLock is a lock obtained after creating an systemd inhibitor by calling InhibitShutdown(). +type InhibitLock uint32 + +// CurrentInhibitDelay returns the current delay inhibitor timeout value as configured in logind.conf(5). +// see https://www.freedesktop.org/software/systemd/man/logind.conf.html for more details. +func (bus *DBusCon) CurrentInhibitDelay() (time.Duration, error) { + obj := bus.SystemBus.Object(logindService, logindObject) + + res, err := obj.GetProperty(logindInterface + ".InhibitDelayMaxUSec") + if err != nil { + return 0, fmt.Errorf("failed reading InhibitDelayMaxUSec property from logind: %w", err) + } + + delay, ok := res.Value().(uint64) + if !ok { + return 0, fmt.Errorf("InhibitDelayMaxUSec from logind is not a uint64 as expected") + } + + // InhibitDelayMaxUSec is in microseconds + duration := time.Duration(delay) * time.Microsecond + + return duration, nil +} + +// InhibitShutdown creates an systemd inhibitor by calling logind's Inhibt() and returns the inhibitor lock +// see https://www.freedesktop.org/wiki/Software/systemd/inhibit/ for more details. +func (bus *DBusCon) InhibitShutdown() (InhibitLock, error) { + obj := bus.SystemBus.Object(logindService, logindObject) + what := "shutdown" + who := "kubelet" + why := "Kubelet needs time to handle node shutdown" + mode := "delay" + + call := obj.Call("org.freedesktop.login1.Manager.Inhibit", 0, what, who, why, mode) + if call.Err != nil { + return InhibitLock(0), fmt.Errorf("failed creating systemd inhibitor: %w", call.Err) + } + + var fd uint32 + + err := call.Store(&fd) + if err != nil { + return InhibitLock(0), fmt.Errorf("failed storing inhibit lock file descriptor: %w", err) + } + + return InhibitLock(fd), nil +} + +// ReleaseInhibitLock will release the underlying inhibit lock which will cause the shutdown to start. +func (bus *DBusCon) ReleaseInhibitLock(lock InhibitLock) error { + err := syscall.Close(int(lock)) + if err != nil { + return fmt.Errorf("unable to close systemd inhibitor lock: %w", err) + } + + return nil +} + +// MonitorShutdown detects the a node shutdown by watching for "PrepareForShutdown" logind events. +// see https://www.freedesktop.org/wiki/Software/systemd/inhibit/ for more details. +func (bus *DBusCon) MonitorShutdown() (<-chan bool, error) { + err := bus.SystemBus.AddMatchSignal(dbus.WithMatchInterface(logindInterface), dbus.WithMatchMember("PrepareForShutdown"), dbus.WithMatchObjectPath("/org/freedesktop/login1")) + if err != nil { + return nil, err + } + + busChan := make(chan *dbus.Signal, 1) + bus.SystemBus.Signal(busChan) + + shutdownChan := make(chan bool, 1) + + go func() { + for { + event, ok := <-busChan + if !ok { + close(shutdownChan) + + return + } + + if event == nil || len(event.Body) == 0 { + log.Printf("failed obtaining shutdown event, PrepareForShutdown event was empty") + + continue + } + + shutdownActive, ok := event.Body[0].(bool) + if !ok { + log.Printf("Failed obtaining shutdown event, PrepareForShutdown event was not bool type as expected") + + continue + } + + shutdownChan <- shutdownActive + } + }() + + return shutdownChan, nil +} diff --git a/internal/pkg/logind/logind.go b/internal/pkg/logind/logind.go new file mode 100644 index 0000000000..e307e04bde --- /dev/null +++ b/internal/pkg/logind/logind.go @@ -0,0 +1,62 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package logind provides D-Bus logind mock to facilitate graceful kubelet shutdown. +package logind + +import ( + "sync" + "syscall" + "time" + + "github.com/godbus/dbus/v5" + "github.com/godbus/dbus/v5/prop" + + "github.com/talos-systems/talos/pkg/machinery/constants" +) + +const ( + logindService = "org.freedesktop.login1" + logindObject = dbus.ObjectPath("/org/freedesktop/login1") + logindInterface = "org.freedesktop.login1.Manager" + + inhibitMaxDelay = 2 * constants.KubeletShutdownGracePeriod +) + +type logindMock struct { + mu sync.Mutex + inhibitPipe []int +} + +var logindProps = map[string]map[string]*prop.Prop{ + logindInterface: { + "InhibitDelayMaxUSec": { + Value: uint64(inhibitMaxDelay / time.Microsecond), + Writable: false, + }, + }, +} + +func (mock *logindMock) Inhibit(what, who, why, mode string) (dbus.UnixFD, *dbus.Error) { + mock.mu.Lock() + defer mock.mu.Unlock() + + for _, fd := range mock.inhibitPipe { + syscall.Close(fd) //nolint:errcheck + } + + mock.inhibitPipe = make([]int, 2) + if err := syscall.Pipe(mock.inhibitPipe); err != nil { + return dbus.UnixFD(0), dbus.MakeFailedError(err) + } + + return dbus.UnixFD(mock.inhibitPipe[1]), nil +} + +func (mock *logindMock) getPipe() []int { + mock.mu.Lock() + defer mock.mu.Unlock() + + return append([]int(nil), mock.inhibitPipe...) +} diff --git a/internal/pkg/logind/logind_test.go b/internal/pkg/logind/logind_test.go new file mode 100644 index 0000000000..20b5fa2097 --- /dev/null +++ b/internal/pkg/logind/logind_test.go @@ -0,0 +1,85 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package logind_test + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/talos-systems/talos/internal/pkg/logind" + "github.com/talos-systems/talos/pkg/machinery/constants" +) + +func TestIntegration(t *testing.T) { + dir := t.TempDir() + + socketPathService := filepath.Join(dir, "system_bus_service") + socketPathClient := filepath.Join(dir, "system_bus_client") + + broker, err := logind.NewBroker(socketPathService, socketPathClient) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errCh := make(chan error, 1) + + go func() { + errCh <- broker.Run(ctx) + }() + + serviceConn, err := logind.NewServiceMock(socketPathService) + require.NoError(t, err) + + defer serviceConn.Close() //nolint:errcheck + + kubeletConn, err := NewDBusCon("unix:path=" + socketPathClient) + require.NoError(t, err) + + defer kubeletConn.Close() //nolint:errcheck + + t.Log("ready to go") + + d, err := kubeletConn.CurrentInhibitDelay() + require.NoError(t, err) + + assert.Equal(t, 2*constants.KubeletShutdownGracePeriod, d) + + t.Log("acquiring inhibit lock") + + l, err := kubeletConn.InhibitShutdown() + require.NoError(t, err) + + t.Log("monitoring shutdown signal") + + ch, err := kubeletConn.MonitorShutdown() + require.NoError(t, err) + + t.Log("emitting shutdown signal") + + require.NoError(t, serviceConn.EmitShutdown()) + + assert.True(t, <-ch) + + t.Log("releasing inhibit lock") + + require.NoError(t, kubeletConn.ReleaseInhibitLock(l)) + + t.Log("waiting for inhibit lock release") + + assert.NoError(t, serviceConn.WaitLockRelease(ctx)) + + assert.NoError(t, serviceConn.Close()) + assert.NoError(t, kubeletConn.Close()) + assert.NoError(t, broker.Close()) + + cancel() + + assert.NoError(t, <-errCh) +} diff --git a/internal/pkg/logind/service.go b/internal/pkg/logind/service.go new file mode 100644 index 0000000000..86b34d9bcf --- /dev/null +++ b/internal/pkg/logind/service.go @@ -0,0 +1,96 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package logind + +import ( + "context" + "syscall" + + "github.com/godbus/dbus/v5" + "github.com/godbus/dbus/v5/prop" +) + +// ServiceMock connects to the broker and mocks the D-Bus and logind. +type ServiceMock struct { + conn *dbus.Conn + logind logindMock +} + +// NewServiceMock initializes the D-Bus and logind mock. +func NewServiceMock(socketPath string) (*ServiceMock, error) { + var mock ServiceMock + + conn, err := dbus.Dial("unix:path=" + socketPath) + if err != nil { + return nil, err + } + + if err = conn.Auth(nil); err != nil { + return nil, err + } + + if err = conn.Export(dbusMock{}, dbusPath, dbusInterface); err != nil { + return nil, err + } + + if err = conn.Export(&mock.logind, logindObject, logindService); err != nil { + return nil, err + } + + if err = conn.Export(&mock.logind, logindObject, logindInterface); err != nil { + return nil, err + } + + _, err = prop.Export(conn, logindObject, logindProps) + if err != nil { + return nil, err + } + + mock.conn = conn + + return &mock, nil +} + +// Close the connection. +func (mock *ServiceMock) Close() error { + return mock.conn.Close() +} + +// EmitShutdown notifies about the shutdown. +func (mock *ServiceMock) EmitShutdown() error { + return mock.conn.Emit(logindObject, logindService+".PrepareForShutdown", true) +} + +// WaitLockRelease waits for the inhibit lock to be released. +func (mock *ServiceMock) WaitLockRelease(ctx context.Context) error { + pipe := mock.logind.getPipe() + + // no inhibit lock + if len(pipe) == 0 { + return nil + } + + // close the write side of the pipe, other fd to the write pipe is in the kubelet + if err := syscall.Close(pipe[1]); err != nil { + return err + } + + errCh := make(chan error, 1) + + go func() { + // attempt to read from the pipe, as soon as kubelet closes its end, read should return + buf := make([]byte, 1) + _, err := syscall.Read(pipe[0], buf) + + errCh <- err + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/machinery/constants/constants.go b/pkg/machinery/constants/constants.go index f8aa3291cc..9bd61f7425 100644 --- a/pkg/machinery/constants/constants.go +++ b/pkg/machinery/constants/constants.go @@ -241,6 +241,14 @@ const ( // SystemKubeletPKIDir is the path to the directory where Talos copies kubelet issued certificates and keys. SystemKubeletPKIDir = "/system/secrets/kubelet" + // KubeletShutdownGracePeriod is the kubelet shutdown grace period. + KubeletShutdownGracePeriod = 30 * time.Second + + // KubeletShutdownGracePeriodCriticalPods is the kubelet shutdown grace period for critical pods. + // + // Should be less than KubeletShutdownGracePeriod. + KubeletShutdownGracePeriodCriticalPods = 10 * time.Second + // DefaultKubernetesVersion is the default target version of the control plane. DefaultKubernetesVersion = "1.23.4" @@ -636,6 +644,12 @@ const ( // ExtensionServicesRootfsPath is the path to the extracted rootfs files of extension services. ExtensionServicesRootfsPath = "/usr/local/lib/containers" + + // DBusServiceSocketPath is the path to the D-Bus socket for the logind mock to connect to. + DBusServiceSocketPath = SystemRunPath + "/dbus/service.socket" + + // DBusClientSocketPath is the path to the D-Bus socket for the kubelet to connect to. + DBusClientSocketPath = SystemRunPath + "/dbus/client.socket" ) // See https://linux.die.net/man/3/klogctl