Skip to content

Commit

Permalink
feat: implement static pods via machine configuration
Browse files Browse the repository at this point in the history
Fixes #4727

On worker nodes, static pods are injected, but status can't be monitored
by Talos. On control plane nodes full status is available via
`StaticPodStatus`.

Pod definition is left as `Unstructured` in the machine configuration,
and no specific validation is performed to avoid pulling in Kubernetes
libraries into Talos machinery package.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Feb 10, 2022
1 parent 6fadfa8 commit 492b156
Show file tree
Hide file tree
Showing 22 changed files with 804 additions and 139 deletions.
8 changes: 8 additions & 0 deletions hack/release.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ Talos now supports Jetson Nano SBC.
description="""\
`talosctl` commands which accept JSON patches (`gen config`, `cluster create`, `patch machineconfig`) now support multiple patches, loading patches
from files with `@file.json` syntax, and support loading from YAML format.
"""

[notes.staticpods]
title = "Static Pods in the Machine Configuration"
description="""\
Talos now accepts static pod definitions in the `.machine.pods` key of the machine configuration.
Please note that static pod definitions are not validated by Talos.
Static pod definitions can be updated without a node reboot.
"""

[make_deps]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ func (ctrl *ControlPlaneStaticPodController) Outputs() []controller.Output {
return []controller.Output{
{
Type: k8s.StaticPodType,
Kind: controller.OutputExclusive,
Kind: controller.OutputShared,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
//nolint:gocyclo,cyclop
func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
Expand Down Expand Up @@ -155,7 +155,7 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll

// clean up static pods which haven't been touched
{
list, err := r.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
list, err := r.List(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
if err != nil {
return err
}
Expand All @@ -165,6 +165,10 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
continue
}

if res.Metadata().Owner() != ctrl.Name() {
continue
}

if err = r.Destroy(ctx, res.Metadata()); err != nil {
return err
}
Expand All @@ -174,14 +178,16 @@ func (ctrl *ControlPlaneStaticPodController) Run(ctx context.Context, r controll
}

func (ctrl *ControlPlaneStaticPodController) teardownAll(ctx context.Context, r controller.Runtime) error {
list, err := r.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
list, err := r.List(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
if err != nil {
return err
}

// TODO: change this to proper teardown sequence

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if err = r.Destroy(ctx, res.Metadata()); err != nil {
return err
}
Expand Down Expand Up @@ -309,7 +315,7 @@ func (ctrl *ControlPlaneStaticPodController) manageAPIServer(ctx context.Context

args = append(args, builder.Args()...)

return config.K8sControlPlaneAPIServerID, r.Modify(ctx, k8s.NewStaticPod(k8s.ControlPlaneNamespaceName, config.K8sControlPlaneAPIServerID), func(r resource.Resource) error {
return config.K8sControlPlaneAPIServerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, config.K8sControlPlaneAPIServerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand Down Expand Up @@ -435,7 +441,7 @@ func (ctrl *ControlPlaneStaticPodController) manageControllerManager(ctx context
args = append(args, builder.Args()...)

//nolint:dupl
return config.K8sControlPlaneControllerManagerID, r.Modify(ctx, k8s.NewStaticPod(k8s.ControlPlaneNamespaceName, config.K8sControlPlaneControllerManagerID), func(r resource.Resource) error {
return config.K8sControlPlaneControllerManagerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, config.K8sControlPlaneControllerManagerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand Down Expand Up @@ -544,7 +550,7 @@ func (ctrl *ControlPlaneStaticPodController) manageScheduler(ctx context.Context
args = append(args, builder.Args()...)

//nolint:dupl
return config.K8sControlPlaneSchedulerID, r.Modify(ctx, k8s.NewStaticPod(k8s.ControlPlaneNamespaceName, config.K8sControlPlaneSchedulerID), func(r resource.Resource) error {
return config.K8sControlPlaneSchedulerID, r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, config.K8sControlPlaneSchedulerID), func(r resource.Resource) error {
return k8sadapter.StaticPod(r.(*k8s.StaticPod)).SetPod(&v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (suite *ControlPlaneStaticPodSuite) startRuntime() {

//nolint:dupl
func (suite *ControlPlaneStaticPodSuite) assertControlPlaneStaticPods(manifests []string) error {
resources, err := suite.state.List(suite.ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
resources, err := suite.state.List(suite.ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
if err != nil {
return err
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func (suite *ControlPlaneStaticPodSuite) TestReconcileDefaults() {

suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
list, err := suite.state.List(suite.ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
list, err := suite.state.List(suite.ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
if err != nil {
return err
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (suite *ControlPlaneStaticPodSuite) TestReconcileExtraMounts() {
},
))

r, err := suite.state.Get(suite.ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "kube-apiserver", resource.VersionUndefined))
r, err := suite.state.Get(suite.ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "kube-apiserver", resource.VersionUndefined))
suite.Require().NoError(err)

apiServerPod, err := k8sadapter.StaticPod(r.(*k8s.StaticPod)).Pod()
Expand Down Expand Up @@ -254,7 +254,7 @@ func (suite *ControlPlaneStaticPodSuite) TestReconcileExtraArgs() {
// wait for some time to ensure that controller has picked the input
time.Sleep(500 * time.Millisecond)

_, err := suite.state.Get(suite.ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "kube-apiserver", resource.VersionUndefined))
_, err := suite.state.Get(suite.ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "kube-apiserver", resource.VersionUndefined))
suite.Require().Error(err)

continue
Expand All @@ -270,7 +270,7 @@ func (suite *ControlPlaneStaticPodSuite) TestReconcileExtraArgs() {
},
))

r, err := suite.state.Get(suite.ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "kube-apiserver", resource.VersionUndefined))
r, err := suite.state.Get(suite.ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "kube-apiserver", resource.VersionUndefined))
suite.Require().NoError(err)

apiServerPod, err := k8sadapter.StaticPod(r.(*k8s.StaticPod)).Pod()
Expand All @@ -297,7 +297,7 @@ func (suite *ControlPlaneStaticPodSuite) TestReconcileExtraArgs() {

suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
list, err := suite.state.List(suite.ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
list, err := suite.state.List(suite.ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (ctrl *KubeletStaticPodController) Name() string {
func (ctrl *KubeletStaticPodController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: k8s.ControlPlaneNamespaceName,
Namespace: k8s.NamespaceName,
Type: k8s.StaticPodType,
Kind: controller.InputStrong,
},
Expand Down Expand Up @@ -131,12 +131,34 @@ func (ctrl *KubeletStaticPodController) Run(ctx context.Context, r controller.Ru
continue
}

staticPods, err := r.List(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing static pods: %w", err)
}

for _, staticPod := range staticPods.Items {
switch staticPod.Metadata().Phase() {
case resource.PhaseRunning:
if err = ctrl.writePod(logger, staticPod); err != nil {
return fmt.Errorf("error running pod: %w", err)
}
case resource.PhaseTearingDown:
if err = ctrl.teardownPod(logger, staticPod); err != nil {
return fmt.Errorf("error tearing down pod: %w", err)
}
}
}

if err = ctrl.cleanupPods(logger, staticPods.Items); err != nil {
return fmt.Errorf("error cleaning up static pods: %w", err)
}

// on worker nodes, there's no way to connect to the kubelet to fetch the pod status (only API server can do that)
// on control plane nodes, use API servers' client kubelet certificate to fetch statuses
rootSecretResource, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesRootType, secrets.KubernetesRootID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.cleanupPods(logger, nil); err != nil {
return fmt.Errorf("error cleaning up static pods: %w", err)
}
kubeletClient = nil

continue
}
Expand All @@ -149,9 +171,7 @@ func (ctrl *KubeletStaticPodController) Run(ctx context.Context, r controller.Ru
secretsResource, err := r.Get(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.KubernetesType, secrets.KubernetesID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
if err = ctrl.cleanupPods(logger, nil); err != nil {
return fmt.Errorf("error cleaning up static pods: %w", err)
}
kubeletClient = nil

continue
}
Expand All @@ -169,28 +189,6 @@ func (ctrl *KubeletStaticPodController) Run(ctx context.Context, r controller.Ru

nodename := nodenameResource.(*k8s.Nodename).TypedSpec().Nodename

staticPods, err := r.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing static pods: %w", err)
}

for _, staticPod := range staticPods.Items {
switch staticPod.Metadata().Phase() {
case resource.PhaseRunning:
if err = ctrl.writePod(logger, staticPod); err != nil {
return fmt.Errorf("error running pod: %w", err)
}
case resource.PhaseTearingDown:
if err = ctrl.teardownPod(logger, staticPod); err != nil {
return fmt.Errorf("error tearing down pod: %w", err)
}
}
}

if err = ctrl.cleanupPods(logger, staticPods.Items); err != nil {
return fmt.Errorf("error cleaning up static pods: %w", err)
}

// render static pods first, and attempt to build kubelet client last,
// as if kubelet issues certs from the API server, API server should be launched first.
kubeletClient, err = kubelet.NewClient(nodename, secrets.APIServerKubeletClient.Crt, secrets.APIServerKubeletClient.Key, rootSecrets.CA.Crt)
Expand Down Expand Up @@ -295,7 +293,7 @@ func (ctrl *KubeletStaticPodController) cleanupPods(logger *zap.Logger, staticPo
}

func (ctrl *KubeletStaticPodController) teardownStatuses(ctx context.Context, r controller.Runtime) error {
statuses, err := r.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodStatusType, "", resource.VersionUndefined))
statuses, err := r.List(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodStatusType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing pod statuses: %w", err)
}
Expand Down Expand Up @@ -329,14 +327,14 @@ func (ctrl *KubeletStaticPodController) refreshPodStatus(ctx context.Context, r

podsSeen[statusID] = struct{}{}

if err = r.Modify(ctx, k8s.NewStaticPodStatus(k8s.ControlPlaneNamespaceName, statusID), func(r resource.Resource) error {
if err = r.Modify(ctx, k8s.NewStaticPodStatus(k8s.NamespaceName, statusID), func(r resource.Resource) error {
return k8sadapter.StaticPodStatus(r.(*k8s.StaticPodStatus)).SetStatus(&pod.Status)
}); err != nil {
return fmt.Errorf("error updating pod status: %w", err)
}
}

statuses, err := r.List(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.StaticPodStatusType, "", resource.VersionUndefined))
statuses, err := r.List(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodStatusType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing pod statuses: %w", err)
}
Expand Down
131 changes: 131 additions & 0 deletions internal/app/machined/pkg/controllers/k8s/static_pod_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package k8s

import (
"context"
"fmt"

"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/talos-systems/talos/pkg/machinery/resources/config"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
)

// StaticPodConfigController manages k8s.StaticPod based on machine configuration.
type StaticPodConfigController struct{}

// Name implements controller.Controller interface.
func (ctrl *StaticPodConfigController) Name() string {
return "k8s.StaticPodConfigController"
}

// Inputs implements controller.Controller interface.
func (ctrl *StaticPodConfigController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineConfigType,
ID: pointer.ToString(config.V1Alpha1ID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *StaticPodConfigController) Outputs() []controller.Output {
return []controller.Output{
{
Type: k8s.StaticPodType,
Kind: controller.OutputShared,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *StaticPodConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineConfigType, config.V1Alpha1ID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting config: %w", err)
}
}

touchedIDs := map[string]struct{}{}

if cfg != nil {
cfgProvider := cfg.(*config.MachineConfig).Config()

for _, pod := range cfgProvider.Machine().Pods() {
name, ok, err := unstructured.NestedString(pod, "metadata", "name")
if err != nil {
return fmt.Errorf("error getting name from static pod: %w", err)
}

if !ok {
return fmt.Errorf("name is missing in static pod metadata")
}

namespace, ok, err := unstructured.NestedString(pod, "metadata", "namespace")
if err != nil {
return fmt.Errorf("error getting namespace from static pod: %w", err)
}

if !ok {
namespace = corev1.NamespaceDefault
}

id := fmt.Sprintf("%s-%s", namespace, name)

if err = r.Modify(ctx, k8s.NewStaticPod(k8s.NamespaceName, id), func(r resource.Resource) error {
r.(*k8s.StaticPod).TypedSpec().Pod = pod

return nil
}); err != nil {
return fmt.Errorf("error modifying resource: %w", err)
}

touchedIDs[id] = struct{}{}
}
}

// clean up static pods which haven't been touched
{
list, err := r.List(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.StaticPodType, "", resource.VersionUndefined))
if err != nil {
return err
}

for _, res := range list.Items {
if _, ok := touchedIDs[res.Metadata().ID()]; ok {
continue
}

if res.Metadata().Owner() != ctrl.Name() {
continue
}

if err = r.Destroy(ctx, res.Metadata()); err != nil {
return err
}
}
}
}
}
Loading

0 comments on commit 492b156

Please sign in to comment.