diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 3ca1c11671..0b1b7b9631 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -109,6 +109,7 @@ func NewController( c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"}) c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.logger, stable.GroupName+".GameServerController") + health.AddLivenessCheck("game-server-worker-queue", healthcheck.Check(c.workerqueue.Healthy)) wh.AddHandler("/mutate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationMutationHandler) wh.AddHandler("/validate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationValidationHandler) diff --git a/pkg/gameservers/controller_test.go b/pkg/gameservers/controller_test.go index a65476e6d4..517e07c725 100644 --- a/pkg/gameservers/controller_test.go +++ b/pkg/gameservers/controller_test.go @@ -199,7 +199,6 @@ func TestControllerHealthCheck(t *testing.T) { } stop, cancel := startInformers(m, c.gameServerSynced) - defer cancel() go http.ListenAndServe("localhost:9090", health) @@ -209,6 +208,10 @@ func TestControllerHealthCheck(t *testing.T) { }() testHTTPHealth(t, "http://localhost:9090/live", "{}\n", http.StatusOK) + + cancel() + + testHTTPHealth(t, "http://localhost:9090/live", "{}\n", http.StatusServiceUnavailable) } func TestControllerCreationMutationHandler(t *testing.T) { diff --git a/pkg/util/workerqueue/workerqueue.go b/pkg/util/workerqueue/workerqueue.go index 626903d715..33f242fff5 100644 --- a/pkg/util/workerqueue/workerqueue.go +++ b/pkg/util/workerqueue/workerqueue.go @@ -18,6 +18,8 @@ package workerqueue import ( + "fmt" + "sync" "time" "agones.dev/agones/pkg/util/runtime" @@ -28,6 +30,10 @@ import ( "k8s.io/client-go/util/workqueue" ) +const ( + workFx = time.Second +) + // Handler is the handler for processing the work queue // This is usually a syncronisation handler for a controller or related type Handler func(string) error @@ -40,6 +46,10 @@ type WorkerQueue struct { queue workqueue.RateLimitingInterface // SyncHandler is exported to make testing easier (hack) SyncHandler Handler + + mu sync.Mutex + workers int + running int } // NewWorkerQueue returns a new worker queue for a given name @@ -106,15 +116,59 @@ func (wq *WorkerQueue) processNextWorkItem() bool { } // Run the WorkerQueue processing via the Handler. Will block until stop is closed. -// Runs threadiness number workers to process the rate limited queue -func (wq *WorkerQueue) Run(threadiness int, stop <-chan struct{}) { - defer wq.queue.ShutDown() - - wq.logger.WithField("threadiness", threadiness).Info("Starting workers...") - for i := 0; i < threadiness; i++ { - go wait.Until(wq.runWorker, time.Second, stop) +// Runs a certain number workers to process the rate limited queue +func (wq *WorkerQueue) Run(workers int, stop <-chan struct{}) { + wq.setWorkerCount(workers) + wq.logger.WithField("workers", workers).Info("Starting workers...") + for i := 0; i < workers; i++ { + go wq.run(stop) } <-stop wq.logger.Info("...shutting down workers") + wq.queue.ShutDown() +} + +func (wq *WorkerQueue) run(stop <-chan struct{}) { + wq.inc() + defer wq.dec() + wait.Until(wq.runWorker, workFx, stop) +} + +// Healthy reports whether all the worker goroutines are running. +func (wq *WorkerQueue) Healthy() error { + wq.mu.Lock() + defer wq.mu.Unlock() + want := wq.workers + got := wq.running + + if want != got { + return fmt.Errorf("want %d worker goroutine(s), got %d", want, got) + } + return nil +} + +// RunCount reports the number of running worker goroutines started by Run. +func (wq *WorkerQueue) RunCount() int { + wq.mu.Lock() + defer wq.mu.Unlock() + return wq.running +} + +func (wq *WorkerQueue) setWorkerCount(n int) { + wq.mu.Lock() + defer wq.mu.Unlock() + wq.workers = n +} + +func (wq *WorkerQueue) inc() { + wq.mu.Lock() + defer wq.mu.Unlock() + wq.running++ +} + +func (wq *WorkerQueue) dec() { + wq.mu.Lock() + defer wq.mu.Unlock() + wq.running-- } diff --git a/pkg/util/workerqueue/workerqueue_test.go b/pkg/util/workerqueue/workerqueue_test.go index 9792b38774..ca48d51bd6 100644 --- a/pkg/util/workerqueue/workerqueue_test.go +++ b/pkg/util/workerqueue/workerqueue_test.go @@ -54,3 +54,30 @@ func TestWorkerQueueRun(t *testing.T) { assert.Fail(t, "should have received value") } } + +func TestWorkerQueueHealthy(t *testing.T) { + done := make(chan struct{}) + handler := func(string) error { + <-done + return nil + } + wq := NewWorkerQueue(handler, logrus.WithField("source", "test"), "test") + wq.Enqueue(cache.ExplicitKey("default/test")) + + stop := make(chan struct{}) + go wq.Run(1, stop) + + // Yield to the scheduler to ensure the worker queue goroutine can run. + time.Sleep(10 * time.Millisecond) + assert.Equal(t, 1, wq.RunCount()) + assert.Nil(t, wq.Healthy()) + + close(done) // Ensure the handler no longer blocks. + close(stop) // Stop the worker queue. + + // Yield to the scheduler again to ensure the worker queue goroutine can + // finish. + time.Sleep(10 * time.Millisecond) + assert.Equal(t, 0, wq.RunCount()) + assert.EqualError(t, wq.Healthy(), "want 1 worker goroutine(s), got 0") +}