Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register liveness check in gameservers.Controller #160

Merged
merged 1 commit into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func NewController(
c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"})

c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.logger, stable.GroupName+".GameServerController")
health.AddLivenessCheck("game-server-worker-queue", healthcheck.Check(c.workerqueue.Healthy))

wh.AddHandler("/mutate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationMutationHandler)
wh.AddHandler("/validate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationValidationHandler)
Expand Down
5 changes: 4 additions & 1 deletion pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func TestControllerHealthCheck(t *testing.T) {
}

stop, cancel := startInformers(m, c.gameServerSynced)
defer cancel()

go http.ListenAndServe("localhost:9090", health)

Expand All @@ -209,6 +208,10 @@ func TestControllerHealthCheck(t *testing.T) {
}()

testHTTPHealth(t, "http://localhost:9090/live", "{}\n", http.StatusOK)

cancel()

testHTTPHealth(t, "http://localhost:9090/live", "{}\n", http.StatusServiceUnavailable)
}

func TestControllerCreationMutationHandler(t *testing.T) {
Expand Down
68 changes: 61 additions & 7 deletions pkg/util/workerqueue/workerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package workerqueue

import (
"fmt"
"sync"
"time"

"agones.dev/agones/pkg/util/runtime"
Expand All @@ -28,6 +30,10 @@ import (
"k8s.io/client-go/util/workqueue"
)

const (
workFx = time.Second
)

// Handler is the handler for processing the work queue
// This is usually a syncronisation handler for a controller or related
type Handler func(string) error
Expand All @@ -40,6 +46,10 @@ type WorkerQueue struct {
queue workqueue.RateLimitingInterface
// SyncHandler is exported to make testing easier (hack)
SyncHandler Handler

mu sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a RWLock? I suppose there will only be a single thread hitting the Healthy() function, so there isn't much benefit to allow for multiple concurrent Healthy() requests. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to start with a plain mutex and leave open the possibility for improving performance of the WorkerQueue once we understand the bottle necks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM!

workers int
running int
}

// NewWorkerQueue returns a new worker queue for a given name
Expand Down Expand Up @@ -106,15 +116,59 @@ func (wq *WorkerQueue) processNextWorkItem() bool {
}

// Run the WorkerQueue processing via the Handler. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (wq *WorkerQueue) Run(threadiness int, stop <-chan struct{}) {
defer wq.queue.ShutDown()

wq.logger.WithField("threadiness", threadiness).Info("Starting workers...")
for i := 0; i < threadiness; i++ {
go wait.Until(wq.runWorker, time.Second, stop)
// Runs a certain number workers to process the rate limited queue
func (wq *WorkerQueue) Run(workers int, stop <-chan struct{}) {
wq.setWorkerCount(workers)
wq.logger.WithField("workers", workers).Info("Starting workers...")
for i := 0; i < workers; i++ {
go wq.run(stop)
}

<-stop
wq.logger.Info("...shutting down workers")
wq.queue.ShutDown()
}

func (wq *WorkerQueue) run(stop <-chan struct{}) {
wq.inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wq.inc()
defer wq.dec()
wait.Until(wq.runWorker, workFx, stop)

?

defer wq.dec()
wait.Until(wq.runWorker, workFx, stop)
}

// Healthy reports whether all the worker goroutines are running.
func (wq *WorkerQueue) Healthy() error {
wq.mu.Lock()
defer wq.mu.Unlock()
want := wq.workers
got := wq.running

if want != got {
return fmt.Errorf("want %d worker goroutine(s), got %d", want, got)
}
return nil
}

// RunCount reports the number of running worker goroutines started by Run.
func (wq *WorkerQueue) RunCount() int {
wq.mu.Lock()
defer wq.mu.Unlock()
return wq.running
}

func (wq *WorkerQueue) setWorkerCount(n int) {
wq.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wq.mu.Lock()
defer wq.mu.Unlock()
wq.workers = n

?
Really just being consistent in lock/unlock strategy through the code.

defer wq.mu.Unlock()
wq.workers = n
}

func (wq *WorkerQueue) inc() {
wq.mu.Lock()
defer wq.mu.Unlock()
wq.running++
}

func (wq *WorkerQueue) dec() {
wq.mu.Lock()
defer wq.mu.Unlock()
wq.running--
}
27 changes: 27 additions & 0 deletions pkg/util/workerqueue/workerqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,30 @@ func TestWorkerQueueRun(t *testing.T) {
assert.Fail(t, "should have received value")
}
}

func TestWorkerQueueHealthy(t *testing.T) {
done := make(chan struct{})
handler := func(string) error {
<-done
return nil
}
wq := NewWorkerQueue(handler, logrus.WithField("source", "test"), "test")
wq.Enqueue(cache.ExplicitKey("default/test"))

stop := make(chan struct{})
go wq.Run(1, stop)

// Yield to the scheduler to ensure the worker queue goroutine can run.
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 1, wq.RunCount())
assert.Nil(t, wq.Healthy())

close(done) // Ensure the handler no longer blocks.
close(stop) // Stop the worker queue.

// Yield to the scheduler again to ensure the worker queue goroutine can
// finish.
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 0, wq.RunCount())
assert.EqualError(t, wq.Healthy(), "want 1 worker goroutine(s), got 0")
}