Skip to content

Commit

Permalink
fix: write etcd PKI files in a controller
Browse files Browse the repository at this point in the history
Instead of writing PKI "once" around the startup time, keep writing PKI
files as the certificates get updated. `etcd` is able to reload
certificates, so we should keep updating them e.g. if the hostname/IPs
change over time.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Jul 21, 2022
1 parent bb4abc0 commit a2aea97
Show file tree
Hide file tree
Showing 17 changed files with 345 additions and 136 deletions.
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ COPY --from=generate-build /api/storage/*.pb.go /pkg/machinery/api/storage/
COPY --from=generate-build /api/resource/*.pb.go /pkg/machinery/api/resource/
COPY --from=generate-build /api/resource/secrets/*.pb.go /pkg/machinery/api/resource/secrets/
COPY --from=generate-build /api/inspect/*.pb.go /pkg/machinery/api/inspect/
COPY --from=go-generate /src/pkg/machinery/resources/kubespan/ /pkg/machinery/resources/kubespan/
COPY --from=go-generate /src/pkg/machinery/resources/network/ /pkg/machinery/resources/network/
COPY --from=go-generate /src/pkg/machinery/resources/ /pkg/machinery/resources/
COPY --from=go-generate /src/pkg/machinery/config/types/v1alpha1/ /pkg/machinery/config/types/v1alpha1/
COPY --from=go-generate /src/pkg/machinery/nethelpers/ /pkg/machinery/nethelpers/
COPY --from=go-generate /src/pkg/machinery/extensions/ /pkg/machinery/extensions/
Expand Down
6 changes: 6 additions & 0 deletions internal/app/machined/pkg/controllers/etcd/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package etcd provides controllers which manage etcd resources.
package etcd
148 changes: 148 additions & 0 deletions internal/app/machined/pkg/controllers/etcd/pki.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package etcd

import (
"context"
"fmt"
"os"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/go-pointer"
"github.com/talos-systems/crypto/x509"
"go.uber.org/zap"

"github.com/talos-systems/talos/pkg/filetree"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/resources/etcd"
"github.com/talos-systems/talos/pkg/machinery/resources/secrets"
)

// PKIController renders manifests based on templates and config/secrets.
type PKIController struct{}

// Name implements controller.Controller interface.
func (ctrl *PKIController) Name() string {
return "etcd.PKIController"
}

// Inputs implements controller.Controller interface.
func (ctrl *PKIController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: secrets.NamespaceName,
Type: secrets.EtcdRootType,
ID: pointer.To(secrets.EtcdRootID),
Kind: controller.InputWeak,
},
{
Namespace: secrets.NamespaceName,
Type: secrets.EtcdType,
ID: pointer.To(secrets.EtcdID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *PKIController) Outputs() []controller.Output {
return []controller.Output{
{
Type: etcd.PKIStatusType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *PKIController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

rootScrts, err := safe.ReaderGet[*secrets.EtcdRoot](ctx, r, resource.NewMetadata(secrets.NamespaceName, secrets.EtcdRootType, secrets.EtcdRootID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
continue
}

return fmt.Errorf("error getting root secrets: %w", err)
}

scrts, err := safe.ReaderGet[*secrets.Etcd](ctx, r, resource.NewMetadata(secrets.NamespaceName, secrets.EtcdType, secrets.EtcdID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
continue
}

return fmt.Errorf("error getting secrets: %w", err)
}

if err = os.MkdirAll(constants.EtcdPKIPath, 0o700); err != nil {
return err
}

if err = os.WriteFile(constants.KubernetesEtcdCACert, rootScrts.TypedSpec().EtcdCA.Crt, 0o400); err != nil {
return fmt.Errorf("failed to write CA certificate: %w", err)
}

if err = os.WriteFile(constants.KubernetesEtcdCAKey, rootScrts.TypedSpec().EtcdCA.Key, 0o400); err != nil {
return fmt.Errorf("failed to write CA key: %w", err)
}

etcdCerts := scrts.TypedSpec()

for _, keypair := range []struct {
getter func() *x509.PEMEncodedCertificateAndKey
keyPath string
certPath string
}{
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.Etcd },
keyPath: constants.KubernetesEtcdKey,
certPath: constants.KubernetesEtcdCert,
},
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.EtcdPeer },
keyPath: constants.KubernetesEtcdPeerKey,
certPath: constants.KubernetesEtcdPeerCert,
},
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.EtcdAdmin },
keyPath: constants.KubernetesEtcdAdminKey,
certPath: constants.KubernetesEtcdAdminCert,
},
} {
if err = os.WriteFile(keypair.keyPath, keypair.getter().Key, 0o400); err != nil {
return err
}

if err = os.WriteFile(keypair.certPath, keypair.getter().Crt, 0o400); err != nil {
return err
}
}

if err = filetree.ChownRecursive(constants.EtcdPKIPath, constants.EtcdUserID, constants.EtcdUserID); err != nil {
return nil
}

if err = safe.WriterModify(ctx, r, etcd.NewPKIStatus(etcd.NamespaceName, etcd.PKIID), func(status *etcd.PKIStatus) error {
status.TypedSpec().Ready = true
status.TypedSpec().Version = scrts.Metadata().Version().String()

return nil
}); err != nil {
return fmt.Errorf("error updating PKI status: %w", err)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/cluster"
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/config"
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/etcd"
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/files"
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/hardware"
"github.com/talos-systems/talos/internal/app/machined/pkg/controllers/k8s"
Expand Down Expand Up @@ -103,6 +104,7 @@ func (ctrl *Controller) Run(ctx context.Context, drainer *runtime.Drainer) error
&config.MachineTypeController{},
&config.K8sAddressFilterController{},
&config.K8sControlPlaneController{},
&etcd.PKIController{},
&files.CRIConfigPartsController{},
&files.CRIRegistryConfigController{},
&files.EtcFileController{
Expand Down
3 changes: 3 additions & 0 deletions internal/app/machined/pkg/runtime/v1alpha2/v1alpha2_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
talosconfig "github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/machinery/resources/cluster"
"github.com/talos-systems/talos/pkg/machinery/resources/config"
"github.com/talos-systems/talos/pkg/machinery/resources/etcd"
"github.com/talos-systems/talos/pkg/machinery/resources/files"
"github.com/talos-systems/talos/pkg/machinery/resources/hardware"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
Expand Down Expand Up @@ -63,6 +64,7 @@ func NewState() (*State, error) {
{cluster.NamespaceName, "Cluster configuration and discovery resources."},
{cluster.RawNamespaceName, "Cluster unmerged raw resources."},
{config.NamespaceName, "Talos node configuration."},
{etcd.NamespaceName, "etcd resources."},
{files.NamespaceName, "Files and file-like resources."},
{hardware.NamespaceName, "Hardware resources."},
{k8s.NamespaceName, "Kubernetes all node types resources."},
Expand All @@ -87,6 +89,7 @@ func NewState() (*State, error) {
&cluster.Member{},
&config.MachineConfig{},
&config.MachineType{},
&etcd.PKIStatus{},
&files.EtcFileSpec{},
&files.EtcFileStatus{},
&hardware.Processor{},
Expand Down
109 changes: 11 additions & 98 deletions internal/app/machined/pkg/system/services/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
goruntime "runtime"
Expand All @@ -23,7 +22,6 @@ import (
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/talos-systems/crypto/x509"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/net"
clientv3 "go.etcd.io/etcd/client/v3"
Expand All @@ -42,13 +40,14 @@ import (
"github.com/talos-systems/talos/internal/pkg/etcd"
"github.com/talos-systems/talos/pkg/argsbuilder"
"github.com/talos-systems/talos/pkg/conditions"
"github.com/talos-systems/talos/pkg/filetree"
"github.com/talos-systems/talos/pkg/logging"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
etcdresource "github.com/talos-systems/talos/pkg/machinery/resources/etcd"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
"github.com/talos-systems/talos/pkg/machinery/resources/network"
"github.com/talos-systems/talos/pkg/machinery/resources/secrets"
timeresource "github.com/talos-systems/talos/pkg/machinery/resources/time"
)

Expand Down Expand Up @@ -87,7 +86,7 @@ func (e *Etcd) PreFunc(ctx context.Context, r runtime.Runtime) (err error) {
}

// Make sure etcd user can access files in the data directory.
if err = chownRecursive(constants.EtcdDataPath, constants.EtcdUserID, constants.EtcdUserID); err != nil {
if err = filetree.ChownRecursive(constants.EtcdDataPath, constants.EtcdUserID, constants.EtcdUserID); err != nil {
return err
}

Expand Down Expand Up @@ -126,7 +125,7 @@ func (e *Etcd) PreFunc(ctx context.Context, r runtime.Runtime) (err error) {
panic(fmt.Sprintf("unexpected machine type %v", t))
}

if err = generatePKI(ctx, r); err != nil {
if err = waitPKI(ctx, r); err != nil {
return fmt.Errorf("failed to generate etcd PKI: %w", err)
}

Expand Down Expand Up @@ -243,102 +242,16 @@ func (e *Etcd) HealthSettings(runtime.Runtime) *health.Settings {
}
}

//nolint:gocyclo
func generatePKI(ctx context.Context, r runtime.Runtime) (err error) {
if err = os.MkdirAll(constants.EtcdPKIPath, 0o700); err != nil {
return err
}

if err = ioutil.WriteFile(constants.KubernetesEtcdCACert, r.Config().Cluster().Etcd().CA().Crt, 0o400); err != nil {
return fmt.Errorf("failed to write CA certificate: %w", err)
}

if err = ioutil.WriteFile(constants.KubernetesEtcdCAKey, r.Config().Cluster().Etcd().CA().Key, 0o400); err != nil {
return fmt.Errorf("failed to write CA key: %w", err)
}

// wait for etcd certificates to be generated in the controller
watchCh := make(chan state.Event)

if err = r.State().V1Alpha2().Resources().Watch(ctx, resource.NewMetadata(secrets.NamespaceName, secrets.EtcdType, secrets.EtcdID, resource.VersionUndefined), watchCh); err != nil {
return err
}

var event state.Event

for {
select {
case <-ctx.Done():
return ctx.Err()
case event = <-watchCh:
}

if event.Type == state.Created || event.Type == state.Updated {
break
}
}

// wait for additional events with 500ms timeout and absorb new versions of the resource
const settleDownTimeout = 500 * time.Millisecond

timer := time.NewTimer(settleDownTimeout)
defer timer.Stop()

waitLoop:
for {
select {
case event = <-watchCh:
if !timer.Stop() {
<-timer.C
}

timer.Reset(settleDownTimeout)
case <-timer.C:
break waitLoop
}
}

etcdCerts := event.Resource.(*secrets.Etcd).TypedSpec()

for _, keypair := range []struct {
getter func() *x509.PEMEncodedCertificateAndKey
keyPath string
certPath string
}{
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.Etcd },
keyPath: constants.KubernetesEtcdKey,
certPath: constants.KubernetesEtcdCert,
},
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.EtcdPeer },
keyPath: constants.KubernetesEtcdPeerKey,
certPath: constants.KubernetesEtcdPeerCert,
},
{
getter: func() *x509.PEMEncodedCertificateAndKey { return etcdCerts.EtcdAdmin },
keyPath: constants.KubernetesEtcdAdminKey,
certPath: constants.KubernetesEtcdAdminCert,
},
} {
if err = ioutil.WriteFile(keypair.keyPath, keypair.getter().Key, 0o400); err != nil {
return err
}

if err = ioutil.WriteFile(keypair.certPath, keypair.getter().Crt, 0o400); err != nil {
return err
}
}
func waitPKI(ctx context.Context, r runtime.Runtime) error {
_, err := r.State().V1Alpha2().Resources().WatchFor(ctx,
resource.NewMetadata(etcdresource.NamespaceName, etcdresource.PKIStatusType, etcdresource.PKIID, resource.VersionUndefined),
state.WithEventTypes(state.Created, state.Updated),
)

return chownRecursive(constants.EtcdPKIPath, constants.EtcdUserID, constants.EtcdUserID)
return err
}

func addMember(ctx context.Context, r runtime.Runtime, addrs []string, name string) (*clientv3.MemberListResponse, uint64, error) {
// update PKI on each join attempt
if err := generatePKI(ctx, r); err != nil {
return nil, 0, fmt.Errorf("failed to generate etcd PKI: %w", err)
}

client, err := etcd.NewClientFromControlPlaneIPsNoDiscovery(ctx, r.State().V1Alpha2().Resources())
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -672,7 +585,7 @@ func (e *Etcd) recoverFromSnapshot(hostname, primaryAddr string) error {
return fmt.Errorf("error deleting snapshot: %w", err)
}

return chownRecursive(constants.EtcdDataPath, constants.EtcdUserID, constants.EtcdUserID)
return filetree.ChownRecursive(constants.EtcdDataPath, constants.EtcdUserID, constants.EtcdUserID)
}

func promoteMember(ctx context.Context, r runtime.Runtime, memberID uint64) error {
Expand Down
Loading

0 comments on commit a2aea97

Please sign in to comment.