Skip to content

Commit

Permalink
Support for multiple PUSH_END causes
Browse files Browse the repository at this point in the history
  • Loading branch information
thomshutt authored and emranemran committed Sep 8, 2022
1 parent 1c3a78e commit 3f4614f
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 78 deletions.
6 changes: 3 additions & 3 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/handlers"
"github.com/livepeer/catalyst-api/handlers/misttriggers"
"github.com/livepeer/catalyst-api/middleware"
)

Expand All @@ -33,9 +34,8 @@ func NewCatalystAPIRouter(mc *clients.MistClient) *httprouter.Router {
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized

sc := make(map[string]handlers.StreamInfo)
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{MistClient: mc, StreamCache: sc}
mistCallbackHandlers := &handlers.MistCallbackHandlersCollection{MistClient: mc, StreamCache: sc}
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{MistClient: mc}
mistCallbackHandlers := &misttriggers.MistCallbackHandlersCollection{MistClient: mc}

// Simple endpoint for healthchecks
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
Expand Down
36 changes: 36 additions & 0 deletions cache/segmenting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cache

import (
"sync"
)

type SegmentingCache struct {
cache map[string]StreamInfo
mutex sync.Mutex
}

type StreamInfo struct {
callbackUrl string
}

func (c *SegmentingCache) Remove(streamName string) {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.cache, streamName)
}

func (c *SegmentingCache) GetCallbackUrl(streamName string) string {
c.mutex.Lock()
defer c.mutex.Unlock()
info, ok := c.cache[streamName]
if ok {
return info.callbackUrl
}
return ""
}

func (c *SegmentingCache) Store(streamName, callbackUrl string) {
c.mutex.Lock()
c.cache[streamName] = StreamInfo{callbackUrl: callbackUrl}
c.mutex.Unlock()
}
22 changes: 22 additions & 0 deletions cache/segmenting_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cache

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestStoreAndRetrieveSegmenting(t *testing.T) {
c := NewStreamCache()
c.Segmenting.Store("some-stream-name", "http://some-callback-url.com")
require.Equal(t, "http://some-callback-url.com", c.Segmenting.GetCallbackUrl("some-stream-name"))
}

func TestStoreAndRemoveSegmenting(t *testing.T) {
c := NewStreamCache()
c.Segmenting.Store("some-stream-name", "http://some-callback-url.com")
require.Equal(t, "http://some-callback-url.com", c.Segmenting.GetCallbackUrl("some-stream-name"))

c.Segmenting.Remove("some-stream-name")
require.Equal(t, "", c.Segmenting.GetCallbackUrl("some-stream-name"))
}
24 changes: 24 additions & 0 deletions cache/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package cache

// StreamCache is per server shared state.
// Each pipeline (usually endpoint) having separate structure for keeping state between HTTP calls.
// State is protected by mutex allowing concurent usage.
// All state manipulation is contained in this file with goal to be brief and release mutex asap.
type StreamCache struct {
Segmenting SegmentingCache
Transcoding TranscodingCache
}

var DefaultStreamCache = NewStreamCache()

// NewStreamCache returns pointer so each handler would refer to same object (kind of singleton)
func NewStreamCache() *StreamCache {
return &StreamCache{
Segmenting: SegmentingCache{
cache: make(map[string]StreamInfo),
},
Transcoding: TranscodingCache{
pushes: make(map[string]*SegmentInfo),
},
}
}
67 changes: 67 additions & 0 deletions cache/transcoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package cache

import (
"sync"
)

type TranscodingCache struct {
pushes map[string]*SegmentInfo
mutex sync.Mutex
}

type SegmentInfo struct {
CallbackUrl string
Source string // S3 input we are transcoding
UploadDir string // S3 destination url for multiple renditions
Destinations []string // Rendition URLS go here on push start and removed on push end
}

type IsEmpty = bool

func (c *TranscodingCache) AddDestination(streamName, destination string) {
c.mutex.Lock()
defer c.mutex.Unlock()
info, ok := c.pushes[streamName]
if ok {
info.Destinations = append(info.Destinations, destination)
}
}

func (c *TranscodingCache) RemovePushDestination(streamName, destination string) IsEmpty {
c.mutex.Lock()
defer c.mutex.Unlock()
info, ok := c.pushes[streamName]
if ok {
for i := 0; i < len(info.Destinations); i++ {
if info.Destinations[i] == destination {
info.Destinations[i] = info.Destinations[len(info.Destinations)-1]
info.Destinations = info.Destinations[:len(info.Destinations)-1]
break
}
}
return len(info.Destinations) == 0
}
return false
}

func (c *TranscodingCache) Remove(streamName string) {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.pushes, streamName)
}

func (c *TranscodingCache) Get(streamName string) *SegmentInfo {
c.mutex.Lock()
defer c.mutex.Unlock()
info, ok := c.pushes[streamName]
if ok {
return info
}
return nil
}

func (c *TranscodingCache) Store(streamName string, info SegmentInfo) {
c.mutex.Lock()
c.pushes[streamName] = &info
c.mutex.Unlock()
}
44 changes: 44 additions & 0 deletions cache/transcoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package cache

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestStoreAndRetrieveTranscoding(t *testing.T) {
c := NewStreamCache()
c.Transcoding.Store("some-stream-name", SegmentInfo{
CallbackUrl: "some-callback-url",
Source: "s3://source",
UploadDir: "upload-dir",
Destinations: []string{
"s3://destination-1",
"s3://destination-2",
},
})

si := c.Transcoding.Get("some-stream-name")
require.NotNil(t, si)
require.Equal(t, "some-callback-url", si.CallbackUrl)
require.Equal(t, "s3://source", si.Source)
require.Equal(t, "upload-dir", si.UploadDir)
require.Equal(t, []string{"s3://destination-1", "s3://destination-2"}, si.Destinations)
}

func TestStoreAndRemoveTranscoding(t *testing.T) {
c := NewStreamCache()
c.Transcoding.Store("some-stream-name", SegmentInfo{
CallbackUrl: "some-callback-url",
Source: "s3://source",
UploadDir: "upload-dir",
Destinations: []string{
"s3://destination-1",
"s3://destination-2",
},
})
require.NotNil(t, c.Transcoding.Get("some-stream-name"))

c.Transcoding.Remove("some-stream-name")
require.Nil(t, c.Transcoding.Get("some-stream-name"))
}
2 changes: 2 additions & 0 deletions clients/callback_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type CallbackClient struct {
httpClient *http.Client
}

var DefaultCallbackClient = NewCallbackClient()

func NewCallbackClient() CallbackClient {
client := retryablehttp.NewClient()
client.RetryMax = 2 // Retry a maximum of this+1 times
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ var Logger log.Logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
func init() {
Logger = log.With(Logger, "ts", log.DefaultTimestampUTC)
}

// Prefixes used in Mist stream names to let us determine whether a given "stream" in Mist is being used
// for the segmenting or transcoding phase
const SOURCE_PREFIX = "tr_src_"
const RENDITION_PREFIX = "tr_rend_+"
30 changes: 1 addition & 29 deletions handlers/handlers.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,9 @@
package handlers

import (
"fmt"
"math/rand"
"time"

"github.com/livepeer/catalyst-api/clients"
)

var CallbackClient = clients.NewCallbackClient()

type StreamInfo struct {
callbackUrl string
}

type CatalystAPIHandlersCollection struct {
MistClient clients.MistAPIClient
StreamCache map[string]StreamInfo
}

type MistCallbackHandlersCollection struct {
MistClient clients.MistAPIClient
StreamCache map[string]StreamInfo
}

func randomStreamName(prefix string) string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
const length = 8
r := rand.New(rand.NewSource(time.Now().UnixNano()))

res := make([]byte, length)
for i := 0; i < length; i++ {
res[i] = charset[r.Intn(length)]
}
return fmt.Sprintf("%s%s", prefix, string(res))
MistClient clients.MistAPIClient
}
2 changes: 1 addition & 1 deletion handlers/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestSuccessfulVODUploadHandler(t *testing.T) {
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
defer callbackServer.Close()

catalystApiHandlers := CatalystAPIHandlersCollection{MistClient: clients.StubMistClient{}, StreamCache: make(map[string]StreamInfo)}
catalystApiHandlers := CatalystAPIHandlersCollection{MistClient: clients.StubMistClient{}}
var jsonData = `{
"url": "http://localhost/input",
"callback_url": "CALLBACK_URL",
Expand Down
76 changes: 76 additions & 0 deletions handlers/misttriggers/push_end.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package misttriggers

import (
"fmt"
"net/http"
"strings"

"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/errors"
)

// TriggerPushEnd responds to PUSH_END trigger
// This trigger is run whenever an outgoing push stops, for any reason.
// This trigger is stream-specific and non-blocking. The payload for this trigger is multiple lines,
// each separated by a single newline character (without an ending newline), containing data:
//
// push ID (integer)
// stream name (string)
// target URI, before variables/triggers affected it (string)
// target URI, afterwards, as actually used (string)
// last 10 log messages (JSON array string)
// most recent push status (JSON object string)
func (d *MistCallbackHandlersCollection) TriggerPushEnd(w http.ResponseWriter, req *http.Request, payload []byte) {
lines := strings.Split(strings.TrimSuffix(string(payload), "\n"), "\n")
if len(lines) != 6 {
errors.WriteHTTPBadRequest(w, "Bad request payload", fmt.Errorf("unknown payload '%s'", string(payload)))
return
}
// stream name is the second line in the Mist Trigger payload
streamName := lines[1]
// TODO: Left commented as these will all be used by the next piece we'll pull out of https://github.com/livepeer/catalyst-api/pull/30
// destination := lines[2]
// actualDestination := lines[3]
// pushStatus := lines[5]

// Determine if the PUSH that has finished was happening during the segmenting or transcoding phase
if isTranscodeStream(streamName) {
// TODO: Left commented for illustration of the alternate code path here as this is the next piece we'll pull out of https://github.com/livepeer/catalyst-api/pull/30
// d.TranscodingPushEnd(w, req, streamName, destination, actualDestination, pushStatus)
} else {
d.SegmentingPushEnd(w, req, streamName)
}
}

func isTranscodeStream(streamName string) bool {
return strings.HasPrefix(streamName, config.RENDITION_PREFIX)
}

func (d *MistCallbackHandlersCollection) SegmentingPushEnd(w http.ResponseWriter, req *http.Request, streamName string) {
// when uploading is done, remove trigger and stream from Mist
defer cache.DefaultStreamCache.Segmenting.Remove(streamName)

callbackUrl := cache.DefaultStreamCache.Segmenting.GetCallbackUrl(streamName)
if callbackUrl == "" {
errors.WriteHTTPBadRequest(w, "PUSH_END trigger invoked for unknown stream: "+streamName, nil)
}

// Try to clean up the trigger and stream from Mist. If these fail then we only log, since we still want to do any
// further cleanup stages and callbacks
if err := d.MistClient.DeleteTrigger(streamName, TRIGGER_PUSH_END); err != nil {
_ = config.Logger.Log("msg", "Failed to delete PUSH_END trigger", "err", err.Error(), "stream_name", streamName)
}
if err := d.MistClient.DeleteStream(streamName); err != nil {
_ = config.Logger.Log("msg", "Failed to delete stream", "err", err.Error(), "stream_name", streamName)
}

// Let Studio know that we've finished the Segmenting phase
if err := clients.DefaultCallbackClient.SendTranscodeStatus(callbackUrl, clients.TranscodeStatusPreparingCompleted, 1); err != nil {
_ = config.Logger.Log("msg", "Failed to send transcode status callback", "err", err.Error(), "stream_name", streamName)
}

// TODO: Start Transcoding (stubbed for now with below method)
stubTranscodingCallbacksForStudio(callbackUrl)
}
Loading

0 comments on commit 3f4614f

Please sign in to comment.