Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload transcodes to output target location rather than input location #61

Merged
merged 1 commit into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cache/segmenting.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type SegmentingCache struct {

type StreamInfo struct {
SourceFile string
CallbackUrl string
UploadDir string
CallbackURL string
UploadURL string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed Url -> URL because that's Go convention.

Changed "Dir" to URL because it's a URL and not a Dir

AccessToken string
TranscodeAPIUrl string
HardcodedBroadcasters string
Expand All @@ -29,7 +29,7 @@ func (c *SegmentingCache) GetCallbackUrl(streamName string) string {
defer c.mutex.Unlock()
info, ok := c.cache[streamName]
if ok {
return info.CallbackUrl
return info.CallbackURL
}
return ""
}
Expand All @@ -48,8 +48,8 @@ func (c *SegmentingCache) Store(streamName string, streamInfo StreamInfo) {
c.mutex.Lock()
c.cache[streamName] = StreamInfo{
SourceFile: streamInfo.SourceFile,
CallbackUrl: streamInfo.CallbackUrl,
UploadDir: streamInfo.UploadDir,
CallbackURL: streamInfo.CallbackURL,
UploadURL: streamInfo.UploadURL,
AccessToken: streamInfo.AccessToken,
TranscodeAPIUrl: streamInfo.TranscodeAPIUrl,
HardcodedBroadcasters: streamInfo.HardcodedBroadcasters,
Expand Down
4 changes: 2 additions & 2 deletions cache/segmenting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestStoreAndRetrieveSegmenting(t *testing.T) {
c.Segmenting.Store(
"some-stream-name",
StreamInfo{
CallbackUrl: "http://some-callback-url.com",
CallbackURL: "http://some-callback-url.com",
},
)
require.Equal(t, "http://some-callback-url.com", c.Segmenting.GetCallbackUrl("some-stream-name"))
Expand All @@ -22,7 +22,7 @@ func TestStoreAndRemoveSegmenting(t *testing.T) {
c.Segmenting.Store(
"some-stream-name",
StreamInfo{
CallbackUrl: "http://some-callback-url.com",
CallbackURL: "http://some-callback-url.com",
},
)
require.Equal(t, "http://some-callback-url.com", c.Segmenting.GetCallbackUrl("some-stream-name"))
Expand Down
1 change: 1 addition & 0 deletions handlers/misttriggers/live_tracklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (d *MistCallbackHandlersCollection) TriggerLiveTrackList(w http.ResponseWri
sort.Sort(sort.Reverse(ByBitrate(trackList)))
manifest := createPlaylist(multivariantPlaylist, trackList)
path := fmt.Sprintf("%s/%s-master.m3u8", rootPathUrl.String(), uniqueName)

err = uploadPlaylist(path, manifest)
if err != nil {
errors.WriteHTTPInternalServerError(w, "Failed to upload multivariant master playlist: "+streamName, err)
Expand Down
5 changes: 2 additions & 3 deletions handlers/misttriggers/push_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func (d *MistCallbackHandlersCollection) TriggerPushEnd(w http.ResponseWriter, r

switch streamNameToPipeline(streamName) {
case Transcoding:
// TODO: Left commented for illustration of the alternate code path here as this is the next piece we'll pull out of https://github.com/livepeer/catalyst-api/pull/30
d.TranscodingPushEnd(w, req, streamName, destination, actualDestination, pushStatus)
case Segmenting:
d.SegmentingPushEnd(w, req, streamName)
Expand Down Expand Up @@ -87,7 +86,6 @@ func (d *MistCallbackHandlersCollection) TranscodingPushEnd(w http.ResponseWrite
// We do not delete triggers as source stream is wildcard stream: RENDITION_PREFIX
cache.DefaultStreamCache.Transcoding.RemovePushDestination(streamName, destination)
if cache.DefaultStreamCache.Transcoding.AreDestinationsEmpty(streamName) {
// TODO: Fill this in properly
if err := clients.DefaultCallbackClient.SendTranscodeStatusCompleted(info.CallbackUrl, clients.InputVideo{}, []clients.OutputVideo{}); err != nil {
_ = config.Logger.Log("msg", "Error sending transcode completed status in TranscodingPushEnd", "err", err)
}
Expand Down Expand Up @@ -145,11 +143,12 @@ func (d *MistCallbackHandlersCollection) SegmentingPushEnd(w http.ResponseWriter
si := cache.DefaultStreamCache.Segmenting.Get(streamName)
transcodeRequest := handlers.TranscodeSegmentRequest{
SourceFile: si.SourceFile,
CallbackUrl: si.CallbackUrl,
CallbackURL: si.CallbackURL,
AccessToken: si.AccessToken,
TranscodeAPIUrl: si.TranscodeAPIUrl,
HardcodedBroadcasters: si.HardcodedBroadcasters,
SourceStreamInfo: infoJson,
UploadURL: si.UploadURL,
}
go func() {
err := handlers.RunTranscodeProcess(d.MistClient, transcodeRequest)
Expand Down
1 change: 1 addition & 0 deletions handlers/misttriggers/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (d *MistCallbackHandlersCollection) Trigger() httprouter.Handle {
_ = config.Logger.Log(
"msg", "Received Mist Trigger",
"trigger_name", triggerName,
"payload", string(payload),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

)

switch triggerName {
Expand Down
23 changes: 8 additions & 15 deletions handlers/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,12 @@ import (

type TranscodeSegmentRequest struct {
SourceFile string `json:"source_location"`
CallbackUrl string `json:"callback_url"`
ManifestID string `json:"manifestID"`
StreamID string `json:"streamID"`
SessionID string `json:"sessionID"`
CallbackURL string `json:"callback_url"`
UploadURL string `json:"upload_url"`
StreamKey string `json:"streamKey"`
AccessToken string `json:"accessToken"`
TranscodeAPIUrl string `json:"transcodeAPIUrl"`
HardcodedBroadcasters string `json:"hardcodedBroadcasters"`
Presets []string `json:"presets"`
ObjectStore string `json:"objectStore"`
RecordObjectStore string `json:"recordObjectStore"`
RecordObjectStoreURL string `json:"recordObjectStoreUrl"`
Profiles []cache.EncodedProfile `json:"profiles"`
Detection struct {
Freq uint `json:"freq"`
Expand All @@ -42,7 +36,6 @@ type TranscodeSegmentRequest struct {
Name string `json:"name"`
} `json:"sceneClassification"`
} `json:"detection"`
VerificationFreq uint `json:"verificationFreq"`
SourceStreamInfo string
}

Expand Down Expand Up @@ -79,7 +72,7 @@ func (d *CatalystAPIHandlersCollection) TranscodeSegment() httprouter.Handle {
// RunTranscodeProcess starts `MistLivepeeerProc` as a subprocess to transcode inputStream into renditionsStream.
func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegmentRequest) error {

inputUrl, err := url.Parse(request.SourceFile)
uploadURL, err := url.Parse(request.UploadURL)
if err != nil {
return fmt.Errorf("invalid request source location: %s, error: %s", request.SourceFile, err)
}
Expand Down Expand Up @@ -107,11 +100,11 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm
return fmt.Errorf("failed to start MistProcLivepeer: %s", err)
}

dir, _ := url.Parse(".")
uploadDir := inputUrl.ResolveReference(dir)
dir, _ := url.Parse("transcoded/")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be hard-coded or can it be configurable in request payload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least initially, hardcoded - can look at making it variable in the future if needed

uploadDir := uploadURL.ResolveReference(dir)
// Cache the stream data, later used in the trigger handlers called by Mist
cache.DefaultStreamCache.Transcoding.Store(renditionsStream, cache.SegmentInfo{
CallbackUrl: request.CallbackUrl,
CallbackUrl: request.CallbackURL,
Source: request.SourceFile,
Profiles: request.Profiles[:],
UploadDir: uploadDir.String(),
Expand All @@ -125,7 +118,7 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm
}

// If we're here, then transcode completed successfully
if err := clients.DefaultCallbackClient.SendTranscodeStatus(request.CallbackUrl, clients.TranscodeStatusTranscoding, 1); err != nil {
if err := clients.DefaultCallbackClient.SendTranscodeStatus(request.CallbackURL, clients.TranscodeStatusTranscoding, 1); err != nil {
_ = config.Logger.Log("msg", "Error in SendTranscodeStatus", "err", err)
}

Expand All @@ -148,7 +141,7 @@ func RunTranscodeProcess(mistClient clients.MistAPIClient, request TranscodeSegm
}

err = clients.DefaultCallbackClient.SendTranscodeStatusCompleted(
request.CallbackUrl,
request.CallbackURL,
clients.InputVideo{
Format: "unknown",
Duration: v.Duration,
Expand Down
4 changes: 2 additions & 2 deletions handlers/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (d *CatalystAPIHandlersCollection) UploadVOD() httprouter.Handle {
streamName := config.RandomStreamName(config.SEGMENTING_PREFIX)
cache.DefaultStreamCache.Segmenting.Store(streamName, cache.StreamInfo{
SourceFile: uploadVODRequest.Url,
CallbackUrl: uploadVODRequest.CallbackUrl,
UploadDir: uploadVODRequest.OutputLocations[0].URL,
CallbackURL: uploadVODRequest.CallbackUrl,
UploadURL: uploadVODRequest.OutputLocations[0].URL,
AccessToken: uploadVODRequest.AccessToken,
TranscodeAPIUrl: uploadVODRequest.TranscodeAPIUrl,
HardcodedBroadcasters: uploadVODRequest.HardcodedBroadcasters,
Expand Down