Skip to content

Commit

Permalink
Merge pull request #160 from enocom/liveness
Browse files Browse the repository at this point in the history
Register liveness check in gameservers.Controller
  • Loading branch information
markmandel authored Apr 5, 2018
2 parents c25a323 + 111210f commit cbd5c86
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func NewController(
c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"})

c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.logger, stable.GroupName+".GameServerController")
health.AddLivenessCheck("game-server-worker-queue", healthcheck.Check(c.workerqueue.Healthy))

wh.AddHandler("/mutate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationMutationHandler)
wh.AddHandler("/validate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationValidationHandler)
Expand Down
5 changes: 4 additions & 1 deletion pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func TestControllerHealthCheck(t *testing.T) {
}

stop, cancel := startInformers(m, c.gameServerSynced)
defer cancel()

go http.ListenAndServe("localhost:9090", health)

Expand All @@ -209,6 +208,10 @@ func TestControllerHealthCheck(t *testing.T) {
}()

testHTTPHealth(t, "http://localhost:9090/live", "{}\n", http.StatusOK)

cancel()

testHTTPHealth(t, "http://localhost:9090/live", "{}\n", http.StatusServiceUnavailable)
}

func TestControllerCreationMutationHandler(t *testing.T) {
Expand Down
68 changes: 61 additions & 7 deletions pkg/util/workerqueue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package workerqueue

import (
"fmt"
"sync"
"time"

"agones.dev/agones/pkg/util/runtime"
Expand All @@ -28,6 +30,10 @@ import (
"k8s.io/client-go/util/workqueue"
)

const (
workFx = time.Second
)

// Handler is the handler for processing the work queue
// This is usually a syncronisation handler for a controller or related
type Handler func(string) error
Expand All @@ -40,6 +46,10 @@ type WorkerQueue struct {
queue workqueue.RateLimitingInterface
// SyncHandler is exported to make testing easier (hack)
SyncHandler Handler

mu sync.Mutex
workers int
running int
}

// NewWorkerQueue returns a new worker queue for a given name
Expand Down Expand Up @@ -106,15 +116,59 @@ func (wq *WorkerQueue) processNextWorkItem() bool {
}

// Run the WorkerQueue processing via the Handler. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (wq *WorkerQueue) Run(threadiness int, stop <-chan struct{}) {
defer wq.queue.ShutDown()

wq.logger.WithField("threadiness", threadiness).Info("Starting workers...")
for i := 0; i < threadiness; i++ {
go wait.Until(wq.runWorker, time.Second, stop)
// Runs a certain number workers to process the rate limited queue
func (wq *WorkerQueue) Run(workers int, stop <-chan struct{}) {
wq.setWorkerCount(workers)
wq.logger.WithField("workers", workers).Info("Starting workers...")
for i := 0; i < workers; i++ {
go wq.run(stop)
}

<-stop
wq.logger.Info("...shutting down workers")
wq.queue.ShutDown()
}

func (wq *WorkerQueue) run(stop <-chan struct{}) {
wq.inc()
defer wq.dec()
wait.Until(wq.runWorker, workFx, stop)
}

// Healthy reports whether all the worker goroutines are running.
func (wq *WorkerQueue) Healthy() error {
wq.mu.Lock()
defer wq.mu.Unlock()
want := wq.workers
got := wq.running

if want != got {
return fmt.Errorf("want %d worker goroutine(s), got %d", want, got)
}
return nil
}

// RunCount reports the number of running worker goroutines started by Run.
func (wq *WorkerQueue) RunCount() int {
wq.mu.Lock()
defer wq.mu.Unlock()
return wq.running
}

func (wq *WorkerQueue) setWorkerCount(n int) {
wq.mu.Lock()
defer wq.mu.Unlock()
wq.workers = n
}

func (wq *WorkerQueue) inc() {
wq.mu.Lock()
defer wq.mu.Unlock()
wq.running++
}

func (wq *WorkerQueue) dec() {
wq.mu.Lock()
defer wq.mu.Unlock()
wq.running--
}
27 changes: 27 additions & 0 deletions pkg/util/workerqueue/workerqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,30 @@ func TestWorkerQueueRun(t *testing.T) {
assert.Fail(t, "should have received value")
}
}

func TestWorkerQueueHealthy(t *testing.T) {
done := make(chan struct{})
handler := func(string) error {
<-done
return nil
}
wq := NewWorkerQueue(handler, logrus.WithField("source", "test"), "test")
wq.Enqueue(cache.ExplicitKey("default/test"))

stop := make(chan struct{})
go wq.Run(1, stop)

// Yield to the scheduler to ensure the worker queue goroutine can run.
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 1, wq.RunCount())
assert.Nil(t, wq.Healthy())

close(done) // Ensure the handler no longer blocks.
close(stop) // Stop the worker queue.

// Yield to the scheduler again to ensure the worker queue goroutine can
// finish.
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 0, wq.RunCount())
assert.EqualError(t, wq.Healthy(), "want 1 worker goroutine(s), got 0")
}

0 comments on commit cbd5c86

Please sign in to comment.