diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index f2d92116a9bf5..dd6a41cfd179e 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -26,7 +26,7 @@ type ClusterSharding struct { getClusterShard DistributionFunction } -func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache { +func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache { log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm) clusterSharding := &ClusterSharding{ Shard: shard, @@ -67,7 +67,8 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) { defer sharding.lock.Unlock() newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items)) for _, c := range clusters.Items { - newClusters[c.Server] = &c + cluster := c + newClusters[c.Server] = &cluster } sharding.Clusters = newClusters sharding.updateDistribution() @@ -122,9 +123,7 @@ func (sharding *ClusterSharding) GetDistribution() map[string]int { } func (sharding *ClusterSharding) updateDistribution() { - log.Info("Updating cluster shards") - - for _, c := range sharding.Clusters { + for k, c := range sharding.Clusters { shard := 0 if c.Shard != nil { requestedShard := int(*c.Shard) @@ -136,17 +135,32 @@ func (sharding *ClusterSharding) updateDistribution() { } else { shard = sharding.getClusterShard(c) } - var shard64 int64 = int64(shard) - c.Shard = &shard64 - sharding.Shards[c.Server] = shard + + existingShard, ok := sharding.Shards[k] + if ok && existingShard != shard { + log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard) + } else if !ok { + log.Infof("Cluster %s has been assigned to shard %d", k, shard) + } else { + log.Debugf("Cluster %s has not changed shard", k) + } + sharding.Shards[k] = shard } } -// hasShardingUpdates returns true if the sharding distribution has been updated. -// nil checking is done for the corner case of the in-cluster cluster which may -// have a nil shard assigned +// hasShardingUpdates returns true if the sharding distribution has explicitly changed func hasShardingUpdates(old, new *v1alpha1.Cluster) bool { - if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) { + if old == nil || new == nil { + return false + } + + // returns true if the cluster id has changed because some sharding algorithms depend on it. + if old.ID != new.ID { + return true + } + + // return false if the shard field has not been modified + if old.Shard == nil && new.Shard == nil { return false } return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard) diff --git a/controller/sharding/cache_test.go b/controller/sharding/cache_test.go new file mode 100644 index 0000000000000..ab431d39de44b --- /dev/null +++ b/controller/sharding/cache_test.go @@ -0,0 +1,341 @@ +package sharding + +import ( + "testing" + + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + dbmocks "github.com/argoproj/argo-cd/v2/util/db/mocks" + "github.com/stretchr/testify/assert" +) + +func setupTestSharding(shard int, replicas int) *ClusterSharding { + shardingAlgorithm := "legacy" // we are using the legacy algorithm as it is deterministic based on the cluster id + + db := &dbmocks.ArgoDB{} + + return NewClusterSharding(db, shard, replicas, shardingAlgorithm).(*ClusterSharding) +} + +func TestNewClusterSharding(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + assert.NotNil(t, sharding) + assert.Equal(t, shard, sharding.Shard) + assert.Equal(t, replicas, sharding.Replicas) + assert.NotNil(t, sharding.Shards) + assert.NotNil(t, sharding.Clusters) +} + +func TestClusterSharding_Add(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) // Implement a helper function to setup the test environment + + cluster := &v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + } + + sharding.Add(cluster) + + myCluster := v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + } + + sharding.Add(&myCluster) + + distribution := sharding.GetDistribution() + + assert.Contains(t, sharding.Clusters, cluster.Server) + assert.Contains(t, sharding.Clusters, myCluster.Server) + + clusterDistribution, ok := distribution[cluster.Server] + assert.True(t, ok) + assert.Equal(t, 1, clusterDistribution) + + myClusterDistribution, ok := distribution[myCluster.Server] + assert.True(t, ok) + assert.Equal(t, 0, myClusterDistribution) + + assert.Equal(t, 2, len(distribution)) +} + +func TestClusterSharding_AddRoundRobin(t *testing.T) { + shard := 1 + replicas := 2 + + db := &dbmocks.ArgoDB{} + + sharding := NewClusterSharding(db, shard, replicas, "round-robin").(*ClusterSharding) + + firstCluster := &v1alpha1.Cluster{ + ID: "1", + Server: "https://127.0.0.1:6443", + } + sharding.Add(firstCluster) + + secondCluster := v1alpha1.Cluster{ + ID: "2", + Server: "https://kubernetes.default.svc", + } + sharding.Add(&secondCluster) + + distribution := sharding.GetDistribution() + + assert.Contains(t, sharding.Clusters, firstCluster.Server) + assert.Contains(t, sharding.Clusters, secondCluster.Server) + + clusterDistribution, ok := distribution[firstCluster.Server] + assert.True(t, ok) + assert.Equal(t, 0, clusterDistribution) + + myClusterDistribution, ok := distribution[secondCluster.Server] + assert.True(t, ok) + assert.Equal(t, 1, myClusterDistribution) + + assert.Equal(t, 2, len(distribution)) +} + +func TestClusterSharding_Delete(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + sharding.Delete("https://kubernetes.default.svc") + distribution := sharding.GetDistribution() + assert.Equal(t, 1, len(distribution)) +} + +func TestClusterSharding_Update(t *testing.T) { + shard := 1 + replicas := 2 + sharding := setupTestSharding(shard, replicas) + + sharding.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + distribution := sharding.GetDistribution() + assert.Equal(t, 2, len(distribution)) + + myClusterDistribution, ok := distribution["https://kubernetes.default.svc"] + assert.True(t, ok) + assert.Equal(t, 0, myClusterDistribution) + + sharding.Update(&v1alpha1.Cluster{ + ID: "4", + Server: "https://kubernetes.default.svc", + }) + + distribution = sharding.GetDistribution() + assert.Equal(t, 2, len(distribution)) + + myClusterDistribution, ok = distribution["https://kubernetes.default.svc"] + assert.True(t, ok) + assert.Equal(t, 1, myClusterDistribution) +} + +func TestClusterSharding_IsManagedCluster(t *testing.T) { + replicas := 2 + sharding0 := setupTestSharding(0, replicas) + + sharding0.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + }, + }, + ) + + assert.True(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + })) + + assert.False(t, sharding0.IsManagedCluster(&v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + })) + + sharding1 := setupTestSharding(1, replicas) + + sharding1.Init( + &v1alpha1.ClusterList{ + Items: []v1alpha1.Cluster{ + { + ID: "2", + Server: "https://127.0.0.1:6443", + }, + { + ID: "1", + Server: "https://kubernetes.default.svc", + }, + }, + }, + ) + + assert.False(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + })) + + assert.True(t, sharding1.IsManagedCluster(&v1alpha1.Cluster{ + ID: "2", + Server: "https://127.0.0.1:6443", + })) + +} + +func Int64Ptr(i int64) *int64 { + return &i +} + +func TestHasShardingUpdates(t *testing.T) { + testCases := []struct { + name string + old *v1alpha1.Cluster + new *v1alpha1.Cluster + expected bool + }{ + { + name: "No updates", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + expected: false, + }, + { + name: "Updates", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(1), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + { + name: "Old is nil", + old: nil, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: false, + }, + { + name: "New is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: nil, + expected: false, + }, + { + name: "Both are nil", + old: nil, + new: nil, + expected: false, + }, + { + name: "Both shards are nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + expected: false, + }, + { + name: "Old shard is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + { + name: "New shard is nil", + old: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: &v1alpha1.Cluster{ + Server: "https://kubernetes.default.svc", + Shard: nil, + }, + expected: true, + }, + { + name: "Cluster ID has changed", + old: &v1alpha1.Cluster{ + ID: "1", + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + new: &v1alpha1.Cluster{ + ID: "2", + Server: "https://kubernetes.default.svc", + Shard: Int64Ptr(2), + }, + expected: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, hasShardingUpdates(tc.old, tc.new)) + }) + } +}