Skip to content

Commit

Permalink
fix(controller): leader election preventing two controllers running a…
Browse files Browse the repository at this point in the history
…nd gracefully shutting down (argoproj#2291)

* WIP on fixing leader election fix

Signed-off-by: zachaller <[email protected]>

* Start and stop informers as well

Signed-off-by: zachaller <[email protected]>

* lint

Signed-off-by: zachaller <[email protected]>

* Remove tests that do not test anything

Signed-off-by: zachaller <[email protected]>

* fix lint

Signed-off-by: zachaller <[email protected]>

* github trigger re-run

Signed-off-by: zachaller <[email protected]>

* Cleanup

Signed-off-by: zachaller <[email protected]>

* cleanup

Signed-off-by: zachaller <[email protected]>

* Add back one test

Signed-off-by: zachaller <[email protected]>

* remove secondary metric server

Signed-off-by: zachaller <[email protected]>

* Remove secondary metric test

Signed-off-by: zachaller <[email protected]>

* Add single instance test to catch log lines

Signed-off-by: zachaller <[email protected]>

* We should shutdown if we can not sync

Signed-off-by: zachaller <[email protected]>

* fix lint

Signed-off-by: zachaller <[email protected]>

* Redo for loop will have another pr that stops via context

Signed-off-by: zachaller <[email protected]>

* Fix comment

Signed-off-by: zachaller <[email protected]>

* Add context and graceful shutdown

Signed-off-by: zachaller <[email protected]>

* lint

Signed-off-by: zachaller <[email protected]>

* Fix test

Signed-off-by: zachaller <[email protected]>

* github trigger re-run

Signed-off-by: zachaller <[email protected]>

* add more time for startup

Signed-off-by: zachaller <[email protected]>

* add individual controller startup tests

Signed-off-by: zachaller <[email protected]>

* standardize shutdown

Signed-off-by: zachaller <[email protected]>

* Standardize leader test

Signed-off-by: zachaller <[email protected]>

* fix test

Signed-off-by: zachaller <[email protected]>

* We can not turn on release on cancel

Signed-off-by: zachaller <[email protected]>

* fix release on cancel

Signed-off-by: zachaller <[email protected]>

Signed-off-by: zachaller <[email protected]>
  • Loading branch information
zachaller authored and jandersen-plaid committed Nov 8, 2022
1 parent 2bd87eb commit 657bde2
Show file tree
Hide file tree
Showing 25 changed files with 431 additions and 380 deletions.
19 changes: 13 additions & 6 deletions analysis/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package analysis

import (
"context"
"sync"
"time"

unstructuredutil "github.com/argoproj/argo-rollouts/utils/unstructured"
Expand Down Expand Up @@ -131,21 +133,26 @@ func NewController(cfg ControllerConfig) *Controller {
return controller
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
func (c *Controller) Run(ctx context.Context, threadiness int) error {
log.Info("Starting analysis workers")
wg := sync.WaitGroup{}
for i := 0; i < threadiness; i++ {
wg.Add(1)
go wait.Until(func() {
controllerutil.RunWorker(c.analysisRunWorkQueue, logutil.AnalysisRunKey, c.syncHandler, c.metricsServer)
}, time.Second, stopCh)
controllerutil.RunWorker(ctx, c.analysisRunWorkQueue, logutil.AnalysisRunKey, c.syncHandler, c.metricsServer)
log.Debug("Analysis worker has stopped")
wg.Done()
}, time.Second, ctx.Done())
}
log.Infof("Started %d analysis workers", threadiness)
<-stopCh
log.Info("Shutting down analysis workers")
<-ctx.Done()
wg.Wait()
log.Info("All analysis workers have stopped")

return nil
}

func (c *Controller) syncHandler(key string) error {
func (c *Controller) syncHandler(ctx context.Context, key string) error {
startTime := timeutil.Now()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down
21 changes: 19 additions & 2 deletions analysis/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package analysis

import (
"context"
"encoding/json"
"reflect"
"testing"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: "localhost:8080",
K8SRequestProvider: &metrics.K8sRequestsCountProvider{},
}, true)
})

c := NewController(ControllerConfig{
KubeClientSet: f.kubeclient,
Expand Down Expand Up @@ -159,7 +160,7 @@ func (f *fixture) runController(analysisRunName string, startInformers bool, exp
assert.True(f.t, cache.WaitForCacheSync(stopCh, c.analysisRunSynced))
}

err := c.syncHandler(analysisRunName)
err := c.syncHandler(context.Background(), analysisRunName)
if !expectError && err != nil {
f.t.Errorf("error syncing experiment: %v", err)
} else if expectError && err == nil {
Expand Down Expand Up @@ -314,3 +315,19 @@ func TestNoReconcileForAnalysisRunWithDeletionTimestamp(t *testing.T) {

f.run(getKey(ar, t))
}

func TestRun(t *testing.T) {
f := newFixture(t)
defer f.Close()

// make sure we can start and top the controller
c, _, _ := f.newController(noResyncPeriodFunc)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
go func() {
time.Sleep(1000 * time.Millisecond)
c.analysisRunWorkQueue.ShutDownWithDrain()
cancel()
}()
c.Run(ctx, 1)
}
27 changes: 10 additions & 17 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newCommand() *cobra.Command {
log.WithField("version", version.GetVersion()).Info("Argo Rollouts starting")

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
ctx := signals.SetupSignalHandlerContext()

defaults.SetVerifyTargetGroup(awsVerifyTargetGroup)
defaults.SetIstioAPIVersion(istioVersion)
Expand Down Expand Up @@ -190,23 +190,16 @@ func newCommand() *cobra.Command {
healthzPort,
k8sRequestProvider,
nginxIngressClasses,
albIngressClasses)
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
dynamicInformerFactory.Start(stopCh)
if !namespaced {
clusterDynamicInformerFactory.Start(stopCh)
}
kubeInformerFactory.Start(stopCh)
controllerNamespaceInformerFactory.Start(stopCh)
jobInformerFactory.Start(stopCh)

// Check if Istio installed on cluster before starting dynamicInformerFactory
if istioutil.DoesIstioExist(istioPrimaryDynamicClient, namespace) {
istioDynamicInformerFactory.Start(stopCh)
}
albIngressClasses,
dynamicInformerFactory,
clusterDynamicInformerFactory,
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
controllerNamespaceInformerFactory,
jobInformerFactory)

if err = cm.Run(rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts, stopCh); err != nil {
if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
log.Fatalf("Error running controller: %s", err.Error())
}
return nil
Expand Down
Loading

0 comments on commit 657bde2

Please sign in to comment.