diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index 58efccaff19de..4e97946cd220c 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -147,7 +147,8 @@ func NewCommand() *cobra.Command { appController.InvalidateProjectsCache() })) kubectl := kubeutil.NewKubectl() - clusterSharding := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution) + clusterSharding, err := getClusterSharding(kubeClient, settingsMgr, shardingAlgorithm, enableDynamicClusterDistribution) + errors.CheckError(err) appController, err = controller.NewApplicationController( namespace, settingsMgr, @@ -170,6 +171,7 @@ func NewCommand() *cobra.Command { applicationNamespaces, &workqueueRateLimit, serverSideDiff, + enableDynamicClusterDistribution, ) errors.CheckError(err) cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer()) @@ -238,18 +240,23 @@ func NewCommand() *cobra.Command { return &command } -func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) sharding.ClusterShardingCache { +func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings.SettingsManager, shardingAlgorithm string, enableDynamicClusterDistribution bool) (sharding.ClusterShardingCache, error) { var replicasCount int - applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) - appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) + if enableDynamicClusterDistribution { + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, err := kubeClient.AppsV1().Deployments(settingsMgr.GetNamespace()).Get(context.Background(), applicationControllerName, metav1.GetOptions{}) - // if the application controller deployment was not found, the Get() call returns an empty Deployment object. So, set the variable to nil explicitly - if err != nil && kubeerrors.IsNotFound(err) { - appControllerDeployment = nil - } + // if app controller deployment is not found when dynamic cluster distribution is enabled error out + if err != nil { + return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment: %v", err) + } + + if appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { + replicasCount = int(*appControllerDeployment.Spec.Replicas) + } else { + return nil, fmt.Errorf("(dymanic cluster distribution) failed to get app controller deployment replica count") + } - if enableDynamicClusterDistribution && appControllerDeployment != nil && appControllerDeployment.Spec.Replicas != nil { - replicasCount = int(*appControllerDeployment.Spec.Replicas) } else { replicasCount = env.ParseNumFromEnv(common.EnvControllerReplicas, 0, 0, math.MaxInt32) } @@ -257,7 +264,7 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings. if replicasCount > 1 { // check for shard mapping using configmap if application-controller is a deployment // else use existing logic to infer shard from pod name if application-controller is a statefulset - if enableDynamicClusterDistribution && appControllerDeployment != nil { + if enableDynamicClusterDistribution { var err error // retry 3 times if we find a conflict while updating shard mapping configMap. // If we still see conflicts after the retries, wait for next iteration of heartbeat process. @@ -286,5 +293,5 @@ func getClusterSharding(kubeClient *kubernetes.Clientset, settingsMgr *settings. shardNumber = 0 } db := db.NewDB(settingsMgr.GetNamespace(), settingsMgr, kubeClient) - return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm) + return sharding.NewClusterSharding(db, shardNumber, replicasCount, shardingAlgorithm), nil } diff --git a/controller/appcontroller.go b/controller/appcontroller.go index e6dee507caa2e..f038b770c29c4 100644 --- a/controller/appcontroller.go +++ b/controller/appcontroller.go @@ -113,7 +113,6 @@ type ApplicationController struct { appInformer cache.SharedIndexInformer appLister applisters.ApplicationLister projInformer cache.SharedIndexInformer - deploymentInformer informerv1.DeploymentInformer appStateManager AppStateManager stateCache statecache.LiveStateCache statusRefreshTimeout time.Duration @@ -130,6 +129,10 @@ type ApplicationController struct { clusterSharding sharding.ClusterShardingCache projByNameCache sync.Map applicationNamespaces []string + + // dynamicClusterDistributionEnabled if disabled deploymentInformer is never initialized + dynamicClusterDistributionEnabled bool + deploymentInformer informerv1.DeploymentInformer } // NewApplicationController creates new instance of ApplicationController. @@ -155,6 +158,7 @@ func NewApplicationController( applicationNamespaces []string, rateLimiterConfig *ratelimiter.AppControllerRateLimiterConfig, serverSideDiff bool, + dynamicClusterDistributionEnabled bool, ) (*ApplicationController, error) { log.Infof("appResyncPeriod=%v, appHardResyncPeriod=%v, appResyncJitter=%v", appResyncPeriod, appHardResyncPeriod, appResyncJitter) db := db.NewDB(namespace, settingsMgr, kubeClientset) @@ -163,28 +167,29 @@ func NewApplicationController( log.Info("Using default workqueue rate limiter config") } ctrl := ApplicationController{ - cache: argoCache, - namespace: namespace, - kubeClientset: kubeClientset, - kubectl: kubectl, - applicationClientset: applicationClientset, - repoClientset: repoClientset, - appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"), - appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"), - projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"), - appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)), - db: db, - statusRefreshTimeout: appResyncPeriod, - statusHardRefreshTimeout: appHardResyncPeriod, - statusRefreshJitter: appResyncJitter, - refreshRequestedApps: make(map[string]CompareWith), - refreshRequestedAppsMutex: &sync.Mutex{}, - auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController), - settingsMgr: settingsMgr, - selfHealTimeout: selfHealTimeout, - clusterSharding: clusterSharding, - projByNameCache: sync.Map{}, - applicationNamespaces: applicationNamespaces, + cache: argoCache, + namespace: namespace, + kubeClientset: kubeClientset, + kubectl: kubectl, + applicationClientset: applicationClientset, + repoClientset: repoClientset, + appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"), + appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"), + projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"), + appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)), + db: db, + statusRefreshTimeout: appResyncPeriod, + statusHardRefreshTimeout: appHardResyncPeriod, + statusRefreshJitter: appResyncJitter, + refreshRequestedApps: make(map[string]CompareWith), + refreshRequestedAppsMutex: &sync.Mutex{}, + auditLogger: argo.NewAuditLogger(namespace, kubeClientset, common.ApplicationController), + settingsMgr: settingsMgr, + selfHealTimeout: selfHealTimeout, + clusterSharding: clusterSharding, + projByNameCache: sync.Map{}, + applicationNamespaces: applicationNamespaces, + dynamicClusterDistributionEnabled: dynamicClusterDistributionEnabled, } if kubectlParallelismLimit > 0 { ctrl.kubectlSemaphore = semaphore.NewWeighted(kubectlParallelismLimit) @@ -227,25 +232,33 @@ func NewApplicationController( } factory := informers.NewSharedInformerFactoryWithOptions(ctrl.kubeClientset, defaultDeploymentInformerResyncDuration, informers.WithNamespace(settingsMgr.GetNamespace())) - deploymentInformer := factory.Apps().V1().Deployments() + + var deploymentInformer informerv1.DeploymentInformer + + // only initialize deployment informer if dynamic distribution is enabled + if dynamicClusterDistributionEnabled { + deploymentInformer = factory.Apps().V1().Deployments() + } readinessHealthCheck := func(r *http.Request) error { - applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) - appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName) - if err != nil { - if kubeerrors.IsNotFound(err) { - appControllerDeployment = nil - } else { - return fmt.Errorf("error retrieving Application Controller Deployment: %s", err) - } - } - if appControllerDeployment != nil { - if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 { - return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas) + if dynamicClusterDistributionEnabled { + applicationControllerName := env.StringFromEnv(common.EnvAppControllerName, common.DefaultApplicationControllerName) + appControllerDeployment, err := deploymentInformer.Lister().Deployments(settingsMgr.GetNamespace()).Get(applicationControllerName) + if err != nil { + if kubeerrors.IsNotFound(err) { + appControllerDeployment = nil + } else { + return fmt.Errorf("error retrieving Application Controller Deployment: %s", err) + } } - shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) - if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil { - return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err) + if appControllerDeployment != nil { + if appControllerDeployment.Spec.Replicas != nil && int(*appControllerDeployment.Spec.Replicas) <= 0 { + return fmt.Errorf("application controller deployment replicas is not set or is less than 0, replicas: %d", appControllerDeployment.Spec.Replicas) + } + shard := env.ParseNumFromEnv(common.EnvControllerShard, -1, -math.MaxInt32, math.MaxInt32) + if _, err := sharding.GetOrUpdateShardFromConfigMap(kubeClientset.(*kubernetes.Clientset), settingsMgr, int(*appControllerDeployment.Spec.Replicas), shard); err != nil { + return fmt.Errorf("error while updating the heartbeat for to the Shard Mapping ConfigMap: %s", err) + } } } return nil @@ -773,7 +786,11 @@ func (ctrl *ApplicationController) Run(ctx context.Context, statusProcessors int go ctrl.appInformer.Run(ctx.Done()) go ctrl.projInformer.Run(ctx.Done()) - go ctrl.deploymentInformer.Informer().Run(ctx.Done()) + + if ctrl.dynamicClusterDistributionEnabled { + // only start deployment informer if dynamic distribution is enabled + go ctrl.deploymentInformer.Informer().Run(ctx.Done()) + } clusters, err := ctrl.db.ListClusters(ctx) if err != nil { diff --git a/controller/appcontroller_test.go b/controller/appcontroller_test.go index 4162a9983e941..33a29bc5ca3f8 100644 --- a/controller/appcontroller_test.go +++ b/controller/appcontroller_test.go @@ -157,6 +157,7 @@ func newFakeController(data *fakeData, repoErr error) *ApplicationController { nil, false, + false, ) db := &dbmocks.ArgoDB{} db.On("GetApplicationControllerReplicas").Return(1) diff --git a/controller/sharding/cache.go b/controller/sharding/cache.go index 7e2c37a2c37ee..69f7e4c851775 100644 --- a/controller/sharding/cache.go +++ b/controller/sharding/cache.go @@ -111,8 +111,8 @@ func (sharding *ClusterSharding) Update(c *v1alpha1.Cluster) { func (sharding *ClusterSharding) GetDistribution() map[string]int { sharding.lock.RLock() - defer sharding.lock.RUnlock() shards := sharding.Shards + sharding.lock.RUnlock() distribution := make(map[string]int, len(shards)) for k, v := range shards { diff --git a/docs/operator-manual/notifications/functions.md b/docs/operator-manual/notifications/functions.md index 3d614e4e53a55..c50d122024b76 100644 --- a/docs/operator-manual/notifications/functions.md +++ b/docs/operator-manual/notifications/functions.md @@ -48,6 +48,16 @@ Transforms given GIT URL into HTTPs format. Returns repository URL full name `(/)`. Currently supports only Github, GitLab and Bitbucket. +
+**`repo.QueryEscape(s string) string`** + +QueryEscape escapes the string, so it can be safely placed inside a URL + +Example: +``` +/projects/{{ call .repo.QueryEscape (call .repo.FullNameByRepoURL .app.status.RepoURL) }}/merge_requests +``` +
**`repo.GetCommitMetadata(sha string) CommitMetadata`** diff --git a/util/cache/cache.go b/util/cache/cache.go index d34fba5d38f7b..b632824e9c96b 100644 --- a/util/cache/cache.go +++ b/util/cache/cache.go @@ -27,6 +27,10 @@ const ( envRedisRetryCount = "REDIS_RETRY_COUNT" // defaultRedisRetryCount holds default number of retries defaultRedisRetryCount = 3 + // envRedisSentinelPassword is an env variable name which stores redis sentinel password + envRedisSentinelPassword = "REDIS_SENTINEL_PASSWORD" + // envRedisSentinelUsername is an env variable name which stores redis sentinel username + envRedisSentinelUsername = "REDIS_SENTINEL_USERNAME" ) const ( @@ -57,21 +61,23 @@ func buildRedisClient(redisAddress, password, username string, redisDB, maxRetri return client } -func buildFailoverRedisClient(sentinelMaster, password, username string, redisDB, maxRetries int, tlsConfig *tls.Config, sentinelAddresses []string) *redis.Client { +func buildFailoverRedisClient(sentinelMaster, sentinelUsername, sentinelPassword, password, username string, redisDB, maxRetries int, tlsConfig *tls.Config, sentinelAddresses []string) *redis.Client { opts := &redis.FailoverOptions{ - MasterName: sentinelMaster, - SentinelAddrs: sentinelAddresses, - DB: redisDB, - Password: password, - MaxRetries: maxRetries, - TLSConfig: tlsConfig, - Username: username, + MasterName: sentinelMaster, + SentinelAddrs: sentinelAddresses, + DB: redisDB, + Password: password, + MaxRetries: maxRetries, + TLSConfig: tlsConfig, + Username: username, + SentinelUsername: sentinelUsername, + SentinelPassword: sentinelPassword, } client := redis.NewFailoverClient(opts) client.AddHook(redis.Hook(NewArgoRedisHook(func() { - *client = *buildFailoverRedisClient(sentinelMaster, password, username, redisDB, maxRetries, tlsConfig, sentinelAddresses) + *client = *buildFailoverRedisClient(sentinelMaster, sentinelUsername, sentinelPassword, password, username, redisDB, maxRetries, tlsConfig, sentinelAddresses) }))) return client @@ -199,6 +205,8 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...Options) func() (*Cache, err } password := os.Getenv(envRedisPassword) username := os.Getenv(envRedisUsername) + sentinelUsername := os.Getenv(envRedisSentinelUsername) + sentinelPassword := os.Getenv(envRedisSentinelPassword) if opt.FlagPrefix != "" { if val := os.Getenv(opt.getEnvPrefix() + envRedisUsername); val != "" { username = val @@ -206,14 +214,21 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...Options) func() (*Cache, err if val := os.Getenv(opt.getEnvPrefix() + envRedisPassword); val != "" { password = val } + if val := os.Getenv(opt.getEnvPrefix() + envRedisSentinelUsername); val != "" { + sentinelUsername = val + } + if val := os.Getenv(opt.getEnvPrefix() + envRedisSentinelPassword); val != "" { + sentinelPassword = val + } } + maxRetries := env.ParseNumFromEnv(envRedisRetryCount, defaultRedisRetryCount, 0, math.MaxInt32) compression, err := CompressionTypeFromString(compressionStr) if err != nil { return nil, err } if len(sentinelAddresses) > 0 { - client := buildFailoverRedisClient(sentinelMaster, password, username, redisDB, maxRetries, tlsConfig, sentinelAddresses) + client := buildFailoverRedisClient(sentinelMaster, sentinelUsername, sentinelPassword, password, username, redisDB, maxRetries, tlsConfig, sentinelAddresses) opt.callOnClientCreated(client) return NewCache(NewRedisCache(client, defaultCacheExpiration, compression)), nil } diff --git a/util/notification/expression/repo/repo.go b/util/notification/expression/repo/repo.go index 060060cbccd68..110c278cb486b 100644 --- a/util/notification/expression/repo/repo.go +++ b/util/notification/expression/repo/repo.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "net/url" "regexp" "strings" @@ -90,6 +91,7 @@ func NewExprs(argocdService service.Service, app *unstructured.Unstructured) map return map[string]interface{}{ "RepoURLToHTTPS": repoURLToHTTPS, "FullNameByRepoURL": FullNameByRepoURL, + "QueryEscape": url.QueryEscape, "GetCommitMetadata": func(commitSHA string) interface{} { meta, err := getCommitMetadata(commitSHA, app, argocdService) if err != nil {