diff --git a/cache/segmenting.go b/cache/segmenting.go index af09b7edf..b384bdcd0 100644 --- a/cache/segmenting.go +++ b/cache/segmenting.go @@ -11,8 +11,8 @@ type SegmentingCache struct { type StreamInfo struct { SourceFile string - CallbackUrl string - UploadDir string + CallbackURL string + UploadURL string AccessToken string TranscodeAPIUrl string HardcodedBroadcasters string @@ -29,7 +29,7 @@ func (c *SegmentingCache) GetCallbackUrl(streamName string) string { defer c.mutex.Unlock() info, ok := c.cache[streamName] if ok { - return info.CallbackUrl + return info.CallbackURL } return "" } @@ -48,8 +48,8 @@ func (c *SegmentingCache) Store(streamName string, streamInfo StreamInfo) { c.mutex.Lock() c.cache[streamName] = StreamInfo{ SourceFile: streamInfo.SourceFile, - CallbackUrl: streamInfo.CallbackUrl, - UploadDir: streamInfo.UploadDir, + CallbackURL: streamInfo.CallbackURL, + UploadURL: streamInfo.UploadURL, AccessToken: streamInfo.AccessToken, TranscodeAPIUrl: streamInfo.TranscodeAPIUrl, HardcodedBroadcasters: streamInfo.HardcodedBroadcasters, diff --git a/cache/segmenting_test.go b/cache/segmenting_test.go index 98add465c..20a3c99c7 100644 --- a/cache/segmenting_test.go +++ b/cache/segmenting_test.go @@ -11,7 +11,7 @@ func TestStoreAndRetrieveSegmenting(t *testing.T) { c.Segmenting.Store( "some-stream-name", StreamInfo{ - CallbackUrl: "http://some-callback-url.com", + CallbackURL: "http://some-callback-url.com", }, ) require.Equal(t, "http://some-callback-url.com", c.Segmenting.GetCallbackUrl("some-stream-name")) @@ -22,7 +22,7 @@ func TestStoreAndRemoveSegmenting(t *testing.T) { c.Segmenting.Store( "some-stream-name", StreamInfo{ - CallbackUrl: "http://some-callback-url.com", + CallbackURL: "http://some-callback-url.com", }, ) require.Equal(t, "http://some-callback-url.com", c.Segmenting.GetCallbackUrl("some-stream-name")) diff --git a/handlers/misttriggers/live_tracklist.go b/handlers/misttriggers/live_tracklist.go index e76564f92..c7909914e 100644 --- a/handlers/misttriggers/live_tracklist.go +++ b/handlers/misttriggers/live_tracklist.go @@ -178,6 +178,7 @@ func (d *MistCallbackHandlersCollection) TriggerLiveTrackList(w http.ResponseWri sort.Sort(sort.Reverse(ByBitrate(trackList))) manifest := createPlaylist(multivariantPlaylist, trackList) path := fmt.Sprintf("%s/%s-master.m3u8", rootPathUrl.String(), uniqueName) + err = uploadPlaylist(path, manifest) if err != nil { errors.WriteHTTPInternalServerError(w, "Failed to upload multivariant master playlist: "+streamName, err) diff --git a/handlers/misttriggers/push_end.go b/handlers/misttriggers/push_end.go index 5a8b62a2f..ff8c8bc41 100644 --- a/handlers/misttriggers/push_end.go +++ b/handlers/misttriggers/push_end.go @@ -43,7 +43,6 @@ func (d *MistCallbackHandlersCollection) TriggerPushEnd(w http.ResponseWriter, r switch streamNameToPipeline(streamName) { case Transcoding: - // TODO: Left commented for illustration of the alternate code path here as this is the next piece we'll pull out of https://github.com/livepeer/catalyst-api/pull/30 d.TranscodingPushEnd(w, req, streamName, destination, actualDestination, pushStatus) case Segmenting: d.SegmentingPushEnd(w, req, streamName) @@ -87,7 +86,6 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite // We do not delete triggers as source stream is wildcard stream: RENDITION_PREFIX cache.DefaultStreamCache.Transcoding.RemovePushDestination(streamName, destination) if cache.DefaultStreamCache.Transcoding.AreDestinationsEmpty(streamName) { - // TODO: Fill this in properly if err := clients.DefaultCallbackClient.SendTranscodeStatusCompleted(info.CallbackUrl, clients.InputVideo{}, []clients.OutputVideo{}); err != nil { _ = config.Logger.Log("msg", "Error sending transcode completed status in TranscodingPushEnd", "err", err) } @@ -145,11 +143,12 @@ func (d *MistCallbackHandlersCollection) SegmentingPushEnd(w http.ResponseWriter si := cache.DefaultStreamCache.Segmenting.Get(streamName) transcodeRequest := handlers.TranscodeSegmentRequest{ SourceFile: si.SourceFile, - CallbackUrl: si.CallbackUrl, + CallbackURL: si.CallbackURL, AccessToken: si.AccessToken, TranscodeAPIUrl: si.TranscodeAPIUrl, HardcodedBroadcasters: si.HardcodedBroadcasters, SourceStreamInfo: infoJson, + UploadURL: si.UploadURL, } go func() { err := handlers.RunTranscodeProcess(d.MistClient, transcodeRequest) diff --git a/handlers/misttriggers/triggers.go b/handlers/misttriggers/triggers.go index a98f44df1..20e5172cb 100644 --- a/handlers/misttriggers/triggers.go +++ b/handlers/misttriggers/triggers.go @@ -36,6 +36,7 @@ func (d *MistCallbackHandlersCollection) Trigger() httprouter.Handle { _ = config.Logger.Log( "msg", "Received Mist Trigger", "trigger_name", triggerName, + "payload", string(payload), ) switch triggerName { diff --git a/handlers/transcode.go b/handlers/transcode.go index ea78ff48c..2a134329d 100644 --- a/handlers/transcode.go +++ b/handlers/transcode.go @@ -22,18 +22,12 @@ import ( type TranscodeSegmentRequest struct { SourceFile string `json:"source_location"` - CallbackUrl string `json:"callback_url"` - ManifestID string `json:"manifestID"` - StreamID string `json:"streamID"` - SessionID string `json:"sessionID"` + CallbackURL string `json:"callback_url"` + UploadURL string `json:"upload_url"` StreamKey string `json:"streamKey"` AccessToken string `json:"accessToken"` TranscodeAPIUrl string `json:"transcodeAPIUrl"` HardcodedBroadcasters string `json:"hardcodedBroadcasters"` - Presets []string `json:"presets"` - ObjectStore string `json:"objectStore"` - RecordObjectStore string `json:"recordObjectStore"` - RecordObjectStoreURL string `json:"recordObjectStoreUrl"` Profiles []cache.EncodedProfile `json:"profiles"` Detection struct { Freq uint `json:"freq"` @@ -42,7 +36,6 @@ type TranscodeSegmentRequest struct { Name string `json:"name"` } `json:"sceneClassification"` } `json:"detection"` - VerificationFreq uint `json:"verificationFreq"` SourceStreamInfo string } @@ -79,7 +72,7 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle { // RunTranscodeProcess starts `MistLivepeeerProc` as a subprocess to transcode inputStream into renditionsStream. func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegmentRequest) error { - inputUrl, err := url.Parse(request.SourceFile) + uploadURL, err := url.Parse(request.UploadURL) if err != nil { return fmt.Errorf("invalid request source location: %s, error: %s", request.SourceFile, err) } @@ -107,11 +100,11 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm return fmt.Errorf("failed to start MistProcLivepeer: %s", err) } - dir, _ := url.Parse(".") - uploadDir := inputUrl.ResolveReference(dir) + dir, _ := url.Parse("transcoded/") + uploadDir := uploadURL.ResolveReference(dir) // Cache the stream data, later used in the trigger handlers called by Mist cache.DefaultStreamCache.Transcoding.Store(renditionsStream, cache.SegmentInfo{ - CallbackUrl: request.CallbackUrl, + CallbackUrl: request.CallbackURL, Source: request.SourceFile, Profiles: request.Profiles[:], UploadDir: uploadDir.String(), @@ -125,7 +118,7 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm } // If we're here, then transcode completed successfully - if err := clients.DefaultCallbackClient.SendTranscodeStatus(request.CallbackUrl, clients.TranscodeStatusTranscoding, 1); err != nil { + if err := clients.DefaultCallbackClient.SendTranscodeStatus(request.CallbackURL, clients.TranscodeStatusTranscoding, 1); err != nil { _ = config.Logger.Log("msg", "Error in SendTranscodeStatus", "err", err) } @@ -148,7 +141,7 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm } err = clients.DefaultCallbackClient.SendTranscodeStatusCompleted( - request.CallbackUrl, + request.CallbackURL, clients.InputVideo{ Format: "unknown", Duration: v.Duration, diff --git a/handlers/upload.go b/handlers/upload.go index be9e6cb9c..57b50d947 100644 --- a/handlers/upload.go +++ b/handlers/upload.go @@ -91,8 +91,8 @@ func (d *CatalystAPIHandlersCollection) UploadVOD() httprouter.Handle { streamName := config.RandomStreamName(config.SEGMENTING_PREFIX) cache.DefaultStreamCache.Segmenting.Store(streamName, cache.StreamInfo{ SourceFile: uploadVODRequest.Url, - CallbackUrl: uploadVODRequest.CallbackUrl, - UploadDir: uploadVODRequest.OutputLocations[0].URL, + CallbackURL: uploadVODRequest.CallbackUrl, + UploadURL: uploadVODRequest.OutputLocations[0].URL, AccessToken: uploadVODRequest.AccessToken, TranscodeAPIUrl: uploadVODRequest.TranscodeAPIUrl, HardcodedBroadcasters: uploadVODRequest.HardcodedBroadcasters,