Skip to content

Commit

Permalink
fix: cleanup affiliates
Browse files Browse the repository at this point in the history
Cleanup affiliates when registries are disabled

Fixes #4290

Signed-off-by: Noel Georgi <[email protected]>
  • Loading branch information
frezbo committed Dec 10, 2021
1 parent 2dd0b5b commit 79f213e
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 33 deletions.
32 changes: 32 additions & 0 deletions internal/app/machined/pkg/controllers/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,35 @@

// Package cluster provides controllers which manage Talos cluster resources.
package cluster

import (
"context"
"fmt"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"

"github.com/talos-systems/talos/pkg/machinery/resources/cluster"
)

func cleanupAffiliates(ctx context.Context, ctrl controller.Controller, r controller.Runtime, touchedIDs map[resource.ID]struct{}) error {
// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(cluster.RawNamespaceName, cluster.AffiliateType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
}

return nil
}
23 changes: 7 additions & 16 deletions internal/app/machined/pkg/controllers/cluster/discovery_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Ru
}

if !discoveryConfig.(*cluster.Config).TypedSpec().RegistryServiceEnabled {
// if discovery is disabled cleanup existing resources
if err = cleanupAffiliates(ctx, ctrl, r, nil); err != nil {
return err
}

if clientCtxCancel != nil {
clientCtxCancel()

Expand Down Expand Up @@ -267,22 +272,8 @@ func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Ru
touchedIDs[id] = struct{}{}
}

// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(cluster.RawNamespaceName, cluster.AffiliateType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
if err := cleanupAffiliates(ctx, ctrl, r, touchedIDs); err != nil {
return err
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,108 @@ func (suite *DiscoveryServiceSuite) TestReconcile() {
suite.Assert().NoError(<-errCh)
}

func (suite *DiscoveryServiceSuite) TestDisable() {
suite.startRuntime()

suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.DiscoveryServiceController{}))

serviceEndpoint, err := url.Parse(constants.DefaultDiscoveryServiceEndpoint)
suite.Require().NoError(err)

if serviceEndpoint.Port() == "" {
serviceEndpoint.Host += ":443"
}

clusterIDRaw := make([]byte, constants.DefaultClusterIDSize)
_, err = io.ReadFull(rand.Reader, clusterIDRaw)
suite.Require().NoError(err)

clusterID := base64.StdEncoding.EncodeToString(clusterIDRaw)

encryptionKey := make([]byte, constants.DefaultClusterSecretSize)
_, err = io.ReadFull(rand.Reader, encryptionKey)
suite.Require().NoError(err)

// regular discovery affiliate
discoveryConfig := cluster.NewConfig(config.NamespaceName, cluster.ConfigID)
discoveryConfig.TypedSpec().DiscoveryEnabled = true
discoveryConfig.TypedSpec().RegistryServiceEnabled = true
discoveryConfig.TypedSpec().ServiceEndpoint = serviceEndpoint.Host
discoveryConfig.TypedSpec().ServiceClusterID = clusterID
discoveryConfig.TypedSpec().ServiceEncryptionKey = encryptionKey
suite.Require().NoError(suite.state.Create(suite.ctx, discoveryConfig))

nodeIdentity := cluster.NewIdentity(cluster.NamespaceName, cluster.LocalIdentity)
suite.Require().NoError(clusteradapter.IdentitySpec(nodeIdentity.TypedSpec()).Generate())
suite.Require().NoError(suite.state.Create(suite.ctx, nodeIdentity))

localAffiliate := cluster.NewAffiliate(cluster.NamespaceName, nodeIdentity.TypedSpec().NodeID)
*localAffiliate.TypedSpec() = cluster.AffiliateSpec{
NodeID: nodeIdentity.TypedSpec().NodeID,
Hostname: "foo.com",
Nodename: "bar",
MachineType: machine.TypeControlPlane,
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.4")},
}
suite.Require().NoError(suite.state.Create(suite.ctx, localAffiliate))

// create a test client connected to the same cluster but under different affiliate ID
cipher, err := aes.NewCipher(discoveryConfig.TypedSpec().ServiceEncryptionKey)
suite.Require().NoError(err)

cli, err := client.NewClient(client.Options{
Cipher: cipher,
Endpoint: serviceEndpoint.Host,
ClusterID: discoveryConfig.TypedSpec().ServiceClusterID,
AffiliateID: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC",
TTL: 5 * time.Minute,
})
suite.Require().NoError(err)

errCh := make(chan error, 1)
notifyCh := make(chan struct{}, 1)

cliCtx, cliCtxCancel := context.WithCancel(suite.ctx)
defer cliCtxCancel()

go func() {
errCh <- cli.Run(cliCtx, logging.Wrap(log.Writer()), notifyCh)
}()

// inject some affiliate via our client, controller should publish it as an affiliate
suite.Require().NoError(cli.SetLocalData(&client.Affiliate{
Affiliate: &pb.Affiliate{
NodeId: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC",
},
}, nil))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(*cluster.NewAffiliate(cluster.RawNamespaceName, "service/7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC").Metadata(), func(r resource.Resource) error {
spec := r.(*cluster.Affiliate).TypedSpec()

suite.Assert().Equal("7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC", spec.NodeID)

return nil
}),
))

// now disable the service registry
_, err = suite.state.UpdateWithConflicts(suite.ctx, discoveryConfig.Metadata(), func(r resource.Resource) error {
r.(*cluster.Config).TypedSpec().RegistryServiceEnabled = false

return nil
})

suite.Require().NoError(err)

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertNoResource(*cluster.NewAffiliate(cluster.RawNamespaceName, "service/7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC").Metadata()),
))

cliCtxCancel()
suite.Assert().NoError(<-errCh)
}

func TestDiscoveryServiceSuite(t *testing.T) {
suite.Run(t, new(DiscoveryServiceSuite))
}
24 changes: 7 additions & 17 deletions internal/app/machined/pkg/controllers/cluster/kubernetes_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (ctrl *KubernetesPullController) Run(ctx context.Context, r controller.Runt
}

if !discoveryConfig.(*cluster.Config).TypedSpec().RegistryKubernetesEnabled {
// if discovery is disabled cleanup existing resources
if err = cleanupAffiliates(ctx, ctrl, r, nil); err != nil {
return err
}

continue
}

Expand Down Expand Up @@ -127,7 +132,6 @@ func (ctrl *KubernetesPullController) Run(ctx context.Context, r controller.Runt

if notifyCh == nil {
var watchCtx context.Context

watchCtx, watchCtxCancel = context.WithCancel(ctx) //nolint:govet

notifyCh, err = kubernetesRegistry.Watch(watchCtx, logger)
Expand Down Expand Up @@ -159,22 +163,8 @@ func (ctrl *KubernetesPullController) Run(ctx context.Context, r controller.Runt
touchedIDs[id] = struct{}{}
}

// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(cluster.RawNamespaceName, cluster.AffiliateType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
if err := cleanupAffiliates(ctx, ctrl, r, touchedIDs); err != nil {
return err
}
}
}

0 comments on commit 79f213e

Please sign in to comment.