Skip to content

Commit

Permalink
fix: added tests and fixed some behaviour bugs
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Wöhrl <[email protected]>
  • Loading branch information
woehrl01 committed Feb 13, 2024
1 parent 08adee2 commit 5b52b6d
Show file tree
Hide file tree
Showing 2 changed files with 367 additions and 12 deletions.
38 changes: 26 additions & 12 deletions controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ClusterSharding struct {
getClusterShard DistributionFunction
}

func NewClusterSharding(db db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
func NewClusterSharding(_ db.ArgoDB, shard, replicas int, shardingAlgorithm string) ClusterShardingCache {
log.Debugf("Processing clusters from shard %d: Using filter function: %s", shard, shardingAlgorithm)
clusterSharding := &ClusterSharding{
Shard: shard,
Expand Down Expand Up @@ -67,7 +67,8 @@ func (sharding *ClusterSharding) Init(clusters *v1alpha1.ClusterList) {
defer sharding.lock.Unlock()
newClusters := make(map[string]*v1alpha1.Cluster, len(clusters.Items))
for _, c := range clusters.Items {
newClusters[c.Server] = &c
cluster := c
newClusters[c.Server] = &cluster
}
sharding.Clusters = newClusters
sharding.updateDistribution()
Expand Down Expand Up @@ -122,9 +123,7 @@ func (sharding *ClusterSharding) GetDistribution() map[string]int {
}

func (sharding *ClusterSharding) updateDistribution() {
log.Info("Updating cluster shards")

for _, c := range sharding.Clusters {
for k, c := range sharding.Clusters {
shard := 0
if c.Shard != nil {
requestedShard := int(*c.Shard)
Expand All @@ -136,17 +135,32 @@ func (sharding *ClusterSharding) updateDistribution() {
} else {
shard = sharding.getClusterShard(c)
}
var shard64 int64 = int64(shard)
c.Shard = &shard64
sharding.Shards[c.Server] = shard

existingShard, ok := sharding.Shards[k]
if ok && existingShard != shard {
log.Infof("Cluster %s has changed shard from %d to %d", k, existingShard, shard)
} else if !ok {
log.Infof("Cluster %s has been assigned to shard %d", k, shard)
} else {
log.Debugf("Cluster %s has not changed shard", k)
}
sharding.Shards[k] = shard
}
}

// hasShardingUpdates returns true if the sharding distribution has been updated.
// nil checking is done for the corner case of the in-cluster cluster which may
// have a nil shard assigned
// hasShardingUpdates returns true if the sharding distribution has explicitly changed
func hasShardingUpdates(old, new *v1alpha1.Cluster) bool {
if old == nil || new == nil || (old.Shard == nil && new.Shard == nil) {
if old == nil || new == nil {
return false
}

// returns true if the cluster id has changed because some sharding algorithms depend on it.
if old.ID != new.ID {
return true
}

// return false if the shard field has not been modified
if old.Shard == nil && new.Shard == nil {
return false
}
return old.Shard == nil || new.Shard == nil || int64(*old.Shard) != int64(*new.Shard)
Expand Down
Loading

0 comments on commit 5b52b6d

Please sign in to comment.