Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return an HTTP 429 when we're over capacity #232

Merged
merged 6 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewCatalystAPIRouter(vodEngine *pipeline.Coordinator, apiToken string) *htt
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
withCapacityChecking := middleware.HasCapacity

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
mistCallbackHandlers := &misttriggers.MistCallbackHandlersCollection{VODEngine: vodEngine}
Expand All @@ -49,8 +50,29 @@ func NewCatalystAPIRouter(vodEngine *pipeline.Coordinator, apiToken string) *htt
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))

// Public Catalyst API
router.POST("/api/vod", withLogging(withAuth(apiToken, catalystApiHandlers.UploadVOD())))
router.POST("/api/transcode/file", withLogging(withAuth(apiToken, catalystApiHandlers.TranscodeSegment())))
router.POST("/api/vod",
withLogging(
withAuth(
apiToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
),
),
),
)

router.POST("/api/transcode/file",
withLogging(
withAuth(
apiToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.TranscodeSegment(),
),
),
),
)

// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
Expand Down
16 changes: 13 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

type Cache[T interface{}] struct {
cache map[string]T
mutex sync.Mutex
mutex sync.RWMutex
}

func New[T interface{}]() *Cache[T] {
Expand All @@ -22,8 +22,8 @@ func (c *Cache[T]) Remove(streamName string) {
}

func (c *Cache[T]) Get(streamName string) T {
c.mutex.Lock()
defer c.mutex.Unlock()
c.mutex.RLock()
defer c.mutex.RUnlock()
info, ok := c.cache[streamName]
if ok {
return info
Expand All @@ -32,6 +32,16 @@ func (c *Cache[T]) Get(streamName string) T {
return zero
}

func (c *Cache[T]) GetKeys() []string {
c.mutex.RLock()
defer c.mutex.RUnlock()
keys := make([]string, 0, len(c.cache))
for k := range c.cache {
keys = append(keys, k)
}
return keys
}

func (c *Cache[T]) Store(streamName string, value T) {
c.mutex.Lock()
defer c.mutex.Unlock()
Expand Down
23 changes: 23 additions & 0 deletions middleware/capacity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package middleware

import (
"net/http"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/pipeline"
)

// Somewhat arbitrary and conservative number of maximum Catalyst VOD jobs in the system
// at one time. We can look at more sophisticated strategies for calculating capacity in
// the future.
const MAX_JOBS_IN_FLIGHT = 5

func HasCapacity(vodEngine *pipeline.Coordinator, next httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
if vodEngine.InFlightMistPipelineJobs() >= MAX_JOBS_IN_FLIGHT {
w.WriteHeader(http.StatusTooManyRequests)
return
}
next(w, r, ps)
}
}
60 changes: 60 additions & 0 deletions middleware/capacity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package middleware

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/pipeline"
"github.com/stretchr/testify/require"
)

func TestItCallsNextMiddlewareWhenCapacityAvailable(t *testing.T) {
// Create a next handler in the middleware chain, to confirm the request was passed onwards
var nextCalled bool
next := func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
nextCalled = true
}

// Set up the HTTP handler
handler := HasCapacity(pipeline.NewStubCoordinator(), next)

// Call the handler
responseRecorder := httptest.NewRecorder()
handler(responseRecorder, nil, nil)

// Confirm we got a success reponse and that the handler called the next middleware
require.Equal(t, http.StatusOK, responseRecorder.Code)
require.True(t, nextCalled)
}

func TestItErrorsWhenNoCapacityAvailable(t *testing.T) {
// Create a next handler in the middleware chain, to confirm the request was passed onwards
var nextCalled bool
next := func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
nextCalled = true
}

// Create a lot of in-flight jobs
coordinator := pipeline.NewStubCoordinator()
for x := 0; x < 5; x++ {
coordinator.StartUploadJob(pipeline.UploadJobPayload{
RequestID: fmt.Sprintf("request-%d", x),
})
}

// Set up the HTTP handler
handler := HasCapacity(coordinator, next)
Comment on lines +43 to +49
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I wonder how racy this is since the jobs run in background and the stub handlers return immediately. Does it pass consistently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think because the stub handlers never trigger the Mist callbacks right? So the jobs stay in flight

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure actually. I think they may return an error which finishes the job and sends a callback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the real handlers with mock clients instead


// Call the handler
responseRecorder := httptest.NewRecorder()
handler(responseRecorder, nil, nil)

// Confirm we got an HTTP 429 response
require.Equal(t, http.StatusTooManyRequests, responseRecorder.Code)

// Confirm the handler didn't call the next middleware
require.False(t, nextCalled)
}
11 changes: 11 additions & 0 deletions pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ func (c *Coordinator) TriggerPushEnd(p PushEndPayload) {
})
}

func (c *Coordinator) InFlightMistPipelineJobs() int {
keys := c.Jobs.GetKeys()
count := 0
for _, k := range keys {
if c.Jobs.Get(k).handler == c.pipeMist {
count++
}
}
return count
}

// runHandlerAsync starts a background go-routine to run the handler function
// safely. It locks on the JobInfo object to allow safe mutations inside the
// handler. It also handles panics and errors, turning them into a transcode
Expand Down