Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix-processing-shards
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Wöhrl <[email protected]>
  • Loading branch information
woehrl01 committed Feb 12, 2024
2 parents 1bb3db9 + 82433ff commit d586b63
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func NewCommand() *cobra.Command {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterSharding := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution)
errors.CheckError(err)
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
Expand All @@ -170,6 +171,7 @@ func NewCommand() *cobra.Command {
applicationNamespaces,
&workqueueRateLimit,
serverSideDiff,
enableDynamicClusterDistribution,
)
errors.CheckError(err)
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer())
Expand Down Expand Up @@ -238,26 +240,31 @@ func NewCommand() *cobra.Command {
return &command
}

func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) sharding.ClusterShardingCache {
func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) {
var replicasCount int
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})
if enableDynamicClusterDistribution {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{})

// if the application controller deployment was not found, the Get() call returns an empty Deployment object. So, set the variable to nil explicitly
if err != nil && kubeerrors.IsNotFound(err) {
appControllerDeployment = nil
}
// if app controller deployment is not found when dynamic cluster distribution is enabled error out
if err != nil {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err)
}

if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
} else {
return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count")
}

if enableDynamicClusterDistribution && appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil {
replicasCount = int(*appControllerDeployment.Spec.Replicas)
} else {
replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32)
}
shardNumber := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if replicasCount > 1 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if enableDynamicClusterDistribution && appControllerDeployment != nil {
if enableDynamicClusterDistribution {
var err error
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
Expand Down Expand Up @@ -286,5 +293,5 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.
shardNumber = 0
}
db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm)
return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil
}
97 changes: 57 additions & 40 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ type ApplicationController struct {
appInformer cache.SharedIndexInformer
appLister applisters.ApplicationLister
projInformer cache.SharedIndexInformer
deploymentInformer informerv1.DeploymentInformer
appStateManager AppStateManager
stateCache statecache.LiveStateCache
statusRefreshTimeout time.Duration
Expand All @@ -130,6 +129,10 @@ type ApplicationController struct {
clusterSharding sharding.ClusterShardingCache
projByNameCache sync.Map
applicationNamespaces []string

// dynamicClusterDistributionEnabled if disabled deploymentInformer is never initialized
dynamicClusterDistributionEnabled bool
deploymentInformer informerv1.DeploymentInformer
}

// NewApplicationController creates new instance of ApplicationController.
Expand All @@ -155,6 +158,7 @@ func NewApplicationController(
applicationNamespaces []string,
rateLimiterConfig *ratelimiter.AppControllerRateLimiterConfig,
serverSideDiff bool,
dynamicClusterDistributionEnabled bool,
) (*ApplicationController, error) {
log.Infof("appResyncPeriod=%v, appHardResyncPeriod=%v, appResyncJitter=%v", appResyncPeriod, appHardResyncPeriod, appResyncJitter)
db := db.NewDB(namespace, settingsMgr, kubeClientset)
Expand All @@ -163,28 +167,29 @@ func NewApplicationController(
log.Info("Using default workqueue rate limiter config")
}
ctrl := ApplicationController{
cache: argoCache,
namespace: namespace,
kubeClientset: kubeClientset,
kubectl: kubectl,
applicationClientset: applicationClientset,
repoClientset: repoClientset,
appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"),
appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"),
projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"),
appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)),
db: db,
statusRefreshTimeout: appResyncPeriod,
statusHardRefreshTimeout: appHardResyncPeriod,
statusRefreshJitter: appResyncJitter,
refreshRequestedApps: make(map[string]CompareWith),
refreshRequestedAppsMutex: &sync.Mutex{},
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
settingsMgr: settingsMgr,
selfHealTimeout: selfHealTimeout,
clusterSharding: clusterSharding,
projByNameCache: sync.Map{},
applicationNamespaces: applicationNamespaces,
cache: argoCache,
namespace: namespace,
kubeClientset: kubeClientset,
kubectl: kubectl,
applicationClientset: applicationClientset,
repoClientset: repoClientset,
appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"),
appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"),
projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"),
appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)),
db: db,
statusRefreshTimeout: appResyncPeriod,
statusHardRefreshTimeout: appHardResyncPeriod,
statusRefreshJitter: appResyncJitter,
refreshRequestedApps: make(map[string]CompareWith),
refreshRequestedAppsMutex: &sync.Mutex{},
auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController),
settingsMgr: settingsMgr,
selfHealTimeout: selfHealTimeout,
clusterSharding: clusterSharding,
projByNameCache: sync.Map{},
applicationNamespaces: applicationNamespaces,
dynamicClusterDistributionEnabled: dynamicClusterDistributionEnabled,
}
if kubectlParallelismLimit > 0 {
ctrl.kubectlSemaphore = semaphore.NewWeighted(kubectlParallelismLimit)
Expand Down Expand Up @@ -227,25 +232,33 @@ func NewApplicationController(
}

factory := informers.NewSharedInformerFactoryWithOptions(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration, informers.WithNamespace(settingsMgr.GetNamespace()))
deploymentInformer := factory.Apps().V1().Deployments()

var deploymentInformer informerv1.DeploymentInformer

// only initialize deployment informer if dynamic distribution is enabled
if dynamicClusterDistributionEnabled {
deploymentInformer = factory.Apps().V1().Deployments()
}

readinessHealthCheck := func(r *http.Request) error {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
if err != nil {
if kubeerrors.IsNotFound(err) {
appControllerDeployment = nil
} else {
return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
}
}
if appControllerDeployment != nil {
if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
if dynamicClusterDistributionEnabled {
applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName)
appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName)
if err != nil {
if kubeerrors.IsNotFound(err) {
appControllerDeployment = nil
} else {
return fmt.Errorf("error retrieving Application Controller Deployment: %s", err)
}
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
if appControllerDeployment != nil {
if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 {
return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas)
}
shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil {
return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err)
}
}
}
return nil
Expand Down Expand Up @@ -773,7 +786,11 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int

go ctrl.appInformer.Run(ctx.Done())
go ctrl.projInformer.Run(ctx.Done())
go ctrl.deploymentInformer.Informer().Run(ctx.Done())

if ctrl.dynamicClusterDistributionEnabled {
// only start deployment informer if dynamic distribution is enabled
go ctrl.deploymentInformer.Informer().Run(ctx.Done())
}

clusters, err := ctrl.db.ListClusters(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions controller/appcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func newFakeController(data *fakeData, repoErr error) *ApplicationController {
nil,

false,
false,
)
db := &dbmocks.ArgoDB{}
db.On("GetApplicationControllerReplicas").Return(1)
Expand Down
2 changes: 1 addition & 1 deletion controller/sharding/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) {

func (sharding *ClusterSharding) GetDistribution() map[string]int {
sharding.lock.RLock()
defer sharding.lock.RUnlock()
shards := sharding.Shards
sharding.lock.RUnlock()

distribution := make(map[string]int, len(shards))
for k, v := range shards {
Expand Down
10 changes: 10 additions & 0 deletions docs/operator-manual/notifications/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ Transforms given GIT URL into HTTPs format.

Returns repository URL full name `(<owner>/<repoName>)`. Currently supports only Github, GitLab and Bitbucket.

<hr>
**`repo.QueryEscape(s string) string`**

QueryEscape escapes the string, so it can be safely placed inside a URL

Example:
```
/projects/{{ call .repo.QueryEscape (call .repo.FullNameByRepoURL .app.status.RepoURL) }}/merge_requests
```

<hr>
**`repo.GetCommitMetadata(sha string) CommitMetadata`**

Expand Down
35 changes: 25 additions & 10 deletions util/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
envRedisRetryCount = "REDIS_RETRY_COUNT"
// defaultRedisRetryCount holds default number of retries
defaultRedisRetryCount = 3
// envRedisSentinelPassword is an env variable name which stores redis sentinel password
envRedisSentinelPassword = "REDIS_SENTINEL_PASSWORD"
// envRedisSentinelUsername is an env variable name which stores redis sentinel username
envRedisSentinelUsername = "REDIS_SENTINEL_USERNAME"
)

const (
Expand Down Expand Up @@ -57,21 +61,23 @@ func buildRedisClient(redisAddress, password, username string, redisDB, maxRetri
return client
}

func buildFailoverRedisClient(sentinelMaster, password, username string, redisDB, maxRetries int, tlsConfig *tls.Config, sentinelAddresses []string) *redis.Client {
func buildFailoverRedisClient(sentinelMaster, sentinelUsername, sentinelPassword, password, username string, redisDB, maxRetries int, tlsConfig *tls.Config, sentinelAddresses []string) *redis.Client {
opts := &redis.FailoverOptions{
MasterName: sentinelMaster,
SentinelAddrs: sentinelAddresses,
DB: redisDB,
Password: password,
MaxRetries: maxRetries,
TLSConfig: tlsConfig,
Username: username,
MasterName: sentinelMaster,
SentinelAddrs: sentinelAddresses,
DB: redisDB,
Password: password,
MaxRetries: maxRetries,
TLSConfig: tlsConfig,
Username: username,
SentinelUsername: sentinelUsername,
SentinelPassword: sentinelPassword,
}

client := redis.NewFailoverClient(opts)

client.AddHook(redis.Hook(NewArgoRedisHook(func() {
*client = *buildFailoverRedisClient(sentinelMaster, password, username, redisDB, maxRetries, tlsConfig, sentinelAddresses)
*client = *buildFailoverRedisClient(sentinelMaster, sentinelUsername, sentinelPassword, password, username, redisDB, maxRetries, tlsConfig, sentinelAddresses)
})))

return client
Expand Down Expand Up @@ -199,21 +205,30 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...Options) func() (*Cache, err
}
password := os.Getenv(envRedisPassword)
username := os.Getenv(envRedisUsername)
sentinelUsername := os.Getenv(envRedisSentinelUsername)
sentinelPassword := os.Getenv(envRedisSentinelPassword)
if opt.FlagPrefix != "" {
if val := os.Getenv(opt.getEnvPrefix() + envRedisUsername); val != "" {
username = val
}
if val := os.Getenv(opt.getEnvPrefix() + envRedisPassword); val != "" {
password = val
}
if val := os.Getenv(opt.getEnvPrefix() + envRedisSentinelUsername); val != "" {
sentinelUsername = val
}
if val := os.Getenv(opt.getEnvPrefix() + envRedisSentinelPassword); val != "" {
sentinelPassword = val
}
}

maxRetries := env.ParseNumFromEnv(envRedisRetryCount, defaultRedisRetryCount, 0, math.MaxInt32)
compression, err := CompressionTypeFromString(compressionStr)
if err != nil {
return nil, err
}
if len(sentinelAddresses) > 0 {
client := buildFailoverRedisClient(sentinelMaster, password, username, redisDB, maxRetries, tlsConfig, sentinelAddresses)
client := buildFailoverRedisClient(sentinelMaster, sentinelUsername, sentinelPassword, password, username, redisDB, maxRetries, tlsConfig, sentinelAddresses)
opt.callOnClientCreated(client)
return NewCache(NewRedisCache(client, defaultCacheExpiration, compression)), nil
}
Expand Down
2 changes: 2 additions & 0 deletions util/notification/expression/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"net/url"
"regexp"
"strings"

Expand Down Expand Up @@ -90,6 +91,7 @@ func NewExprs(argocdService service.Service, app *unstructured.Unstructured) map
return map[string]interface{}{
"RepoURLToHTTPS": repoURLToHTTPS,
"FullNameByRepoURL": FullNameByRepoURL,
"QueryEscape": url.QueryEscape,
"GetCommitMetadata": func(commitSHA string) interface{} {
meta, err := getCommitMetadata(commitSHA, app, argocdService)
if err != nil {
Expand Down

0 comments on commit d586b63

Please sign in to comment.