diff --git a/cache/stream.go b/cache/stream.go index 196b599b9..636dcdea0 100644 --- a/cache/stream.go +++ b/cache/stream.go @@ -5,8 +5,7 @@ package cache // State is protected by mutex allowing concurent usage. // All state manipulation is contained in this file with goal to be brief and release mutex asap. type StreamCache struct { - Segmenting SegmentingCache - Transcoding TranscodingCache + Segmenting SegmentingCache } var DefaultStreamCache = NewStreamCache() @@ -17,8 +16,5 @@ func NewStreamCache() *StreamCache { Segmenting: SegmentingCache{ cache: make(map[string]StreamInfo), }, - Transcoding: TranscodingCache{ - pushes: make(map[string]*SegmentInfo), - }, } } diff --git a/cache/transcoding.go b/cache/transcoding.go deleted file mode 100644 index 2f5b52dd2..000000000 --- a/cache/transcoding.go +++ /dev/null @@ -1,133 +0,0 @@ -package cache - -import ( - "sync" - "time" - - "github.com/kylelemons/godebug/pretty" - "github.com/livepeer/catalyst-api/clients" - "github.com/livepeer/catalyst-api/log" -) - -type TranscodingCache struct { - pushes map[string]*SegmentInfo - mutex sync.Mutex -} - -type SegmentInfo struct { - CallbackUrl string - Source string // S3 input we are transcoding - UploadDir string // S3 destination url for multiple renditions - Profiles []clients.EncodedProfile // Requested encoding profiles to produce - Destinations []string // Rendition URLS go here on push start and removed on push end - Outputs []clients.OutputVideo // Information about the final transcoded outputs we've created - updatedAt time.Time // Time at which this object was last updated in cache -} - -// Send "keepalive" callbacks to ensure the caller (Studio) knows transcoding is still ongoing and hasn't failed -func (t *TranscodingCache) SendTranscodingHeartbeats(interval time.Duration, maxAge time.Duration, quit chan bool) { - for { - // Stop the infinite loop if we receive a quit message - select { - case <-quit: - return - default: - } - - jobs := t.GetAll() - for id, job := range jobs { - // If the job is past the expiry time then we've probably failed to remove it from the cache when it completed / errored - if job.updatedAt.Add(maxAge).Before(time.Now()) { - t.Remove(id) - continue - } - - err := clients.DefaultCallbackClient.SendTranscodeStatus(job.CallbackUrl, clients.TranscodeStatusTranscoding, 0.5) - if err == nil { - log.LogNoRequestID("Sent Transcode Status heartbeat", "id", id, "callback_url", job.CallbackUrl) - } else { - log.LogNoRequestID("failed to send Transcode Status heartbeat", "id", id, "callback_url", job.CallbackUrl, "error", err) - } - } - time.Sleep(interval) - } -} - -func (si SegmentInfo) ContainsDestination(destination string) bool { - for _, existing := range si.Destinations { - if existing == destination { - return true - } - } - return false -} - -func (c *TranscodingCache) AddDestination(streamName, destination string) { - c.mutex.Lock() - defer c.mutex.Unlock() - info, ok := c.pushes[streamName] - if ok { - info.Destinations = append(info.Destinations, destination) - } -} - -func (c *TranscodingCache) AreDestinationsEmpty(streamName string) bool { - c.mutex.Lock() - defer c.mutex.Unlock() - info, ok := c.pushes[streamName] - if ok { - return len(info.Destinations) == 0 - } - return true -} - -func (c *TranscodingCache) RemovePushDestination(streamName, destination string) { - c.mutex.Lock() - defer c.mutex.Unlock() - info, ok := c.pushes[streamName] - if ok { - for i := 0; i < len(info.Destinations); i++ { - if info.Destinations[i] == destination { - info.Destinations[i] = info.Destinations[len(info.Destinations)-1] - info.Destinations = info.Destinations[:len(info.Destinations)-1] - break - } - } - } -} - -func (c *TranscodingCache) Remove(streamName string) { - c.mutex.Lock() - defer c.mutex.Unlock() - delete(c.pushes, streamName) - c.debugPrint("remove", streamName) -} - -func (c *TranscodingCache) Get(streamName string) *SegmentInfo { - c.mutex.Lock() - defer c.mutex.Unlock() - info, ok := c.pushes[streamName] - if ok { - return info - } - return nil -} - -func (c *TranscodingCache) GetAll() map[string]*SegmentInfo { - c.mutex.Lock() - defer c.mutex.Unlock() - return c.pushes -} - -func (c *TranscodingCache) Store(streamName string, info SegmentInfo) { - c.mutex.Lock() - defer c.mutex.Unlock() - info.updatedAt = time.Now() - c.pushes[streamName] = &info - c.debugPrint("add", streamName) -} - -func (c *TranscodingCache) debugPrint(action, streamName string) { - var id string = action + " " + streamName + ": TranscodingCache" - pretty.Print(id, c.pushes) -} diff --git a/cache/transcoding_test.go b/cache/transcoding_test.go deleted file mode 100644 index 39fe2892b..000000000 --- a/cache/transcoding_test.go +++ /dev/null @@ -1,164 +0,0 @@ -package cache - -import ( - "encoding/json" - "io" - "net/http" - "net/http/httptest" - "strings" - "sync" - "testing" - "time" - - "github.com/livepeer/catalyst-api/clients" - "github.com/stretchr/testify/require" -) - -func TestStoreAndRetrieveTranscoding(t *testing.T) { - c := NewStreamCache() - c.Transcoding.Store("some-stream-name", SegmentInfo{ - CallbackUrl: "some-callback-url", - Source: "s3://source", - UploadDir: "upload-dir", - Destinations: []string{ - "s3://destination-1", - "s3://destination-2", - }, - }) - - si := c.Transcoding.Get("some-stream-name") - require.NotNil(t, si) - require.Equal(t, "some-callback-url", si.CallbackUrl) - require.Equal(t, "s3://source", si.Source) - require.Equal(t, "upload-dir", si.UploadDir) - require.Equal(t, []string{"s3://destination-1", "s3://destination-2"}, si.Destinations) -} - -func TestStoreAndRemoveTranscoding(t *testing.T) { - c := NewStreamCache() - c.Transcoding.Store("some-stream-name", SegmentInfo{ - CallbackUrl: "some-callback-url", - Source: "s3://source", - UploadDir: "upload-dir", - Destinations: []string{ - "s3://destination-1", - "s3://destination-2", - }, - }) - require.NotNil(t, c.Transcoding.Get("some-stream-name")) - - c.Transcoding.Remove("some-stream-name") - require.Nil(t, c.Transcoding.Get("some-stream-name")) -} - -func TestHeartbeatsAreFiredWithInterval(t *testing.T) { - // Create a stub server to receive the callbacks and a variable to track how many we get - var requests = map[string]int{} - var requestsMutex = &sync.RWMutex{} - - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Check the message is a valid TranscodeStatusMessage - body, err := io.ReadAll(r.Body) - require.NoError(t, err) - var tsm clients.TranscodeStatusMessage - require.NoError(t, json.Unmarshal(body, &tsm)) - - // Increment our counter for the stream ID, which comes on the final part of our URL - parts := strings.Split(r.URL.Path, "/") - require.NotZero(t, len(parts), 0, "Expected "+r.URL.Path+" to have some slashes in") - id := parts[len(parts)-1] - - requestsMutex.Lock() - requests[id] += 1 - requestsMutex.Unlock() - })) - defer ts.Close() - - // Add 2 jobs into the stream cache with different names - c := NewStreamCache() - c.Transcoding.Store("some-stream-name", SegmentInfo{ - CallbackUrl: ts.URL + "/some-stream-name", - Source: "s3://source", - UploadDir: "upload-dir", - Destinations: []string{ - "s3://destination-1", - "s3://destination-2", - }, - }) - c.Transcoding.Store("some-stream-name-2", SegmentInfo{ - CallbackUrl: ts.URL + "/some-stream-name-2", - Source: "s3://source", - UploadDir: "upload-dir", - Destinations: []string{ - "s3://destination-1", - "s3://destination-2", - }, - }) - - // Start the callback loop - heartbeatStop := make(chan bool) - go c.Transcoding.SendTranscodingHeartbeats(200*time.Millisecond, time.Hour, heartbeatStop) - defer func() { heartbeatStop <- true }() - - // Wait for a few iterations - time.Sleep(time.Second) - - // Check that we got roughly the amount of callbacks we'd expect - requestsMutex.RLock() - defer requestsMutex.RUnlock() - require.GreaterOrEqual(t, requests["some-stream-name"], 3) - require.LessOrEqual(t, requests["some-stream-name"], 10) - require.GreaterOrEqual(t, requests["some-stream-name-2"], 3) - require.LessOrEqual(t, requests["some-stream-name-2"], 10) -} - -func TestUpdatedAtFieldSetWhenCacheWritten(t *testing.T) { - c := NewStreamCache() - c.Transcoding.Store("some-stream-name", SegmentInfo{ - CallbackUrl: "some-callback-url", - Source: "s3://source", - UploadDir: "upload-dir", - Destinations: []string{ - "s3://destination-1", - "s3://destination-2", - }, - }) - - si := c.Transcoding.Get("some-stream-name") - require.NotNil(t, si) - updated1 := si.updatedAt - - c.Transcoding.Store("some-stream-name", *si) - - si = c.Transcoding.Get("some-stream-name") - require.NotNil(t, si) - updated2 := si.updatedAt - - require.True(t, updated1.Before(updated2), "Expected the timestamp from the second write to be after the one from the first") -} - -func TestExpiredEntriesAreRemovedFromCache(t *testing.T) { - c := NewStreamCache() - c.Transcoding.Store("some-stream-name", SegmentInfo{ - CallbackUrl: "some-callback-url", - Source: "s3://source", - UploadDir: "upload-dir", - Destinations: []string{ - "s3://destination-1", - "s3://destination-2", - }, - }) - - // Check the item is there when we immediately check - si := c.Transcoding.Get("some-stream-name") - require.NotNil(t, si) - - // Start the callback loop - heartbeatStop := make(chan bool) - go c.Transcoding.SendTranscodingHeartbeats(200*time.Millisecond, 400*time.Millisecond, heartbeatStop) - defer func() { heartbeatStop <- true }() - - // Check the item has been removed once we wait past the expiry time - time.Sleep(500 * time.Millisecond) - require.Nil(t, c.Transcoding.Get("some-stream-name")) -} diff --git a/handlers/misttriggers/live_tracklist.go b/handlers/misttriggers/live_tracklist.go deleted file mode 100644 index c7909914e..000000000 --- a/handlers/misttriggers/live_tracklist.go +++ /dev/null @@ -1,194 +0,0 @@ -package misttriggers - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "log" - "net/http" - "net/url" - "sort" - "strconv" - "strings" - "time" - - "github.com/livepeer/catalyst-api/cache" - "github.com/livepeer/catalyst-api/clients" - "github.com/livepeer/catalyst-api/config" - "github.com/livepeer/catalyst-api/errors" - "github.com/livepeer/go-tools/drivers" -) - -type MistTrack struct { - // populated by mist when trigger is received - Id int32 `json:"trackid"` - ByteRate int32 `json:"bps"` - Fpks int32 `json:"fpks"` - Height int32 `json:"height"` - Width int32 `json:"width"` - Index int32 `json:"idx"` - Type string `json:"type"` - Codec string `json:"codec"` - StartTimeMs int32 `json:"firstms"` - EndTimeMs int32 `json:"lastms"` - // populated by us when processing trigger - manifestDestPath string -} - -type LiveTrackListTriggerJson = map[string]MistTrack - -// create ByBitrate type which is a MistTrack slice -type ByBitrate []MistTrack - -func (a ByBitrate) Len() int { - return len(a) -} - -func (a ByBitrate) Less(i, j int) bool { - if a[i].ByteRate == a[j].ByteRate { - // if two tracks have the same byterate, then sort by resolution - return a[i].Width*a[i].Height < a[j].Width*a[j].Height - } else { - return a[i].ByteRate < a[j].ByteRate - } -} - -func (a ByBitrate) Swap(i, j int) { - a[i], a[j] = a[j], a[i] -} - -func createPlaylist(multivariantPlaylist string, tracks []MistTrack) string { - for _, track := range tracks { - multivariantPlaylist += fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d\r\n%s\r\n", track.ByteRate*8, track.Width, track.Height, track.manifestDestPath) - } - return multivariantPlaylist -} - -func uploadPlaylist(uploadPath, manifest string) error { - storageDriver, err := drivers.ParseOSURL(uploadPath, true) - if err != nil { - return fmt.Errorf("error parsing multivariant playlist's upload directory: %s, error: %s", uploadPath, err) - } - session := storageDriver.NewSession("") - ctx := context.Background() - _, err = session.SaveData(ctx, "", bytes.NewBuffer([]byte(manifest)), nil, 3*time.Second) - if err != nil { - return fmt.Errorf("failed to upload multivariant playlist to: %s, error: %s", uploadPath, err) - } - return nil -} - -// TriggerLiveTrackList responds to LIVE_TRACK_LIST trigger. -// It is stream-specific and must be blocking. The payload for this trigger is multiple lines, -// each separated by a single newline character (without an ending newline), containing data: -// -// stream name -// track list (JSON) -// -// TriggerLiveTrackList is used only by transcoding. -func (d *MistCallbackHandlersCollection) TriggerLiveTrackList(w http.ResponseWriter, req *http.Request, payload []byte) { - lines := strings.Split(strings.TrimSuffix(string(payload), "\n"), "\n") - streamName := lines[0] - encodedTracks := lines[1] - - // Check that the name looks right for a stream we've completed as part of the Transcode workflow - if !config.IsTranscodeStream(streamName) { - errors.WriteHTTPBadRequest(w, "PUSH_END trigger invoked for something that isn't a transcode stream: "+streamName, nil) - return - } - uniqueName := streamName[len(config.RENDITION_PREFIX):] - - // Fetch the stream info from cache (cached when we kicked off the transcode process) - info := cache.DefaultStreamCache.Transcoding.Get(streamName) - if info == nil { - errors.WriteHTTPInternalServerError(w, "LIVE_TRACK_LIST unknown push source: "+streamName, nil) - return - } - - // Check if LIVE_TRACK_LIST trigger is being fired *after* the push-from-Mist-to-S3 is complete - var streamEnded = (encodedTracks == "null") - if streamEnded { - // SOURCE_PREFIX stream is no longer needed - suffix := strings.TrimPrefix(streamName, config.RENDITION_PREFIX) - inputStream := fmt.Sprintf("%s%s", config.SOURCE_PREFIX, suffix) - if err := d.MistClient.DeleteStream(inputStream); err != nil { - log.Printf("ERROR LIVE_TRACK_LIST DeleteStream(%s) %v", inputStream, err) - } - // Multiple pushes from RENDITION_PREFIX are in progress. - return - } - - var tracks LiveTrackListTriggerJson - if err := json.Unmarshal([]byte(encodedTracks), &tracks); err != nil { - errors.WriteHTTPInternalServerError(w, "LiveTrackListTriggerJson json decode error: "+streamName, err) - return - } - - multivariantPlaylist := "#EXTM3U\r\n" - - trackList := []MistTrack{} - - // Build the full URL path that will be sent to Mist as the target upload location - rootPathUrl, err := url.Parse(info.UploadDir) - if err != nil { - errors.WriteHTTPInternalServerError(w, "Failed to parse root path URL: "+streamName, err) - return - } - - // upload each track (transcoded rendition) returned by Mist to S3 - for i := range tracks { - // Only produce a rendition for each video track, selecting best audio track - if tracks[i].Type != "video" { - continue - } - - dirPath := fmt.Sprintf("_%s_%dx%d/stream.m3u8", uniqueName, tracks[i].Width, tracks[i].Height) - dirPathUrl, err := url.JoinPath(info.UploadDir, dirPath) - if err != nil { - errors.WriteHTTPInternalServerError(w, "Failed to generate the upload directory path: "+streamName, err) - return - } - - fullPathUrl, err := url.Parse(dirPathUrl) - if err != nil { - errors.WriteHTTPInternalServerError(w, "Failed to parse the upload directory url: "+streamName, err) - return - } - - // Add URL query parameters (e.g. ?video=0&audio=maxbps) used by Mist to select - // the correct trancoded rendtion track(s) - urlParams := fullPathUrl.Query() - urlParams.Add("video", strconv.FormatInt(int64(tracks[i].Index), 10)) - urlParams.Add("audio", "maxbps") - fullPathUrl.RawQuery = urlParams.Encode() - - destination := fullPathUrl.String() - - if err := d.MistClient.PushStart(streamName, destination); err != nil { - log.Printf("> ERROR push to %s %v", destination, err) - } else { - cache.DefaultStreamCache.Transcoding.AddDestination(streamName, destination) - trackList = append(trackList, tracks[i]) - trackList[len(trackList)-1].manifestDestPath = dirPath - } - } - - // Generate a sorted list for multivariant playlist (reverse order of bitrate then resolution): - sort.Sort(sort.Reverse(ByBitrate(trackList))) - manifest := createPlaylist(multivariantPlaylist, trackList) - path := fmt.Sprintf("%s/%s-master.m3u8", rootPathUrl.String(), uniqueName) - - err = uploadPlaylist(path, manifest) - if err != nil { - errors.WriteHTTPInternalServerError(w, "Failed to upload multivariant master playlist: "+streamName, err) - return - } - - // Store the path back into our cached object to allow us to populate the final "Transcode Success" metadata callback - info.Outputs = append(info.Outputs, clients.OutputVideo{ - Type: "google-s3", // TODO: Stop hardcoding this once we support other schemes - Manifest: path, - }) - cache.DefaultStreamCache.Transcoding.Store(streamName, *info) -} diff --git a/handlers/misttriggers/triggers.go b/handlers/misttriggers/triggers.go index b9a0b8f98..55d0e8612 100644 --- a/handlers/misttriggers/triggers.go +++ b/handlers/misttriggers/triggers.go @@ -14,10 +14,9 @@ import ( ) const ( - TRIGGER_PUSH_END = "PUSH_END" - TRIGGER_PUSH_OUT_START = "PUSH_OUT_START" - TRIGGER_LIVE_TRACK_LIST = "LIVE_TRACK_LIST" - TRIGGER_RECORDING_END = "RECORDING_END" + TRIGGER_PUSH_END = "PUSH_END" + TRIGGER_PUSH_OUT_START = "PUSH_OUT_START" + TRIGGER_RECORDING_END = "RECORDING_END" ) type MistCallbackHandlersCollection struct { @@ -48,8 +47,6 @@ func (d *MistCallbackHandlersCollection) Trigger() httprouter.Handle { d.TriggerPushOutStart(w, req, payload) case TRIGGER_PUSH_END: d.TriggerPushEnd(w, req, payload) - case TRIGGER_LIVE_TRACK_LIST: - d.TriggerLiveTrackList(w, req, payload) case TRIGGER_RECORDING_END: d.TriggerRecordingEnd(w, req, payload) default: diff --git a/main.go b/main.go index 148c23798..9d457aeaa 100644 --- a/main.go +++ b/main.go @@ -3,10 +3,8 @@ package main import ( "flag" "log" - "time" "github.com/livepeer/catalyst-api/api" - "github.com/livepeer/catalyst-api/cache" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/livepeer-data/pkg/mistconnector" ) @@ -25,11 +23,6 @@ func main() { return } - // Send "keepalive" heartbeats while transcodes are ongoing - heartbeatStop := make(chan bool) - go cache.DefaultStreamCache.Transcoding.SendTranscodingHeartbeats(15*time.Second, time.Hour, heartbeatStop) - defer func() { heartbeatStop <- true }() - if err := api.ListenAndServe(*port, *mistPort, *mistHttpPort, *apiToken); err != nil { log.Fatal(err) }