Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update directory tree of output segments from VoD pipeline #133

Merged
merged 3 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions handlers/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestSuccessfulVODUploadHandler(t *testing.T) {
"output_locations": [
{
"type": "object_store",
"url": "memory://localhost/output",
"url": "memory://localhost/output.m3u8",
"outputs": {
"source_segments": true
}
Expand All @@ -150,7 +150,6 @@ func TestSuccessfulVODUploadHandler(t *testing.T) {
rr := httptest.NewRecorder()
router.POST("/api/vod", catalystApiHandlers.UploadVOD())
router.ServeHTTP(rr, req)

require.Equal(http.StatusOK, rr.Result().StatusCode)

var uvr UploadVODResponse
Expand Down
28 changes: 25 additions & 3 deletions handlers/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"io"
"mime"
"net/http"
"net/url"
"path"
"strings"

"github.com/julienschmidt/httprouter"
Expand Down Expand Up @@ -96,20 +98,40 @@ func (d *CatalystAPIHandlersCollection) UploadVOD() httprouter.Handle {
errors.WriteHTTPBadRequest(w, "Invalid request payload", fmt.Errorf("no source segment URL in request"))
return
}
log.AddContext(requestID, "segmented_url", tURL)

// Create a separate subdirectory for the source segments
// Use the output directory specified in request as the output directory of transcoded renditions
targetURL, err := url.Parse(tURL)
if err != nil {
errors.WriteHTTPBadRequest(w, "Invalid request payload", fmt.Errorf("target output file shoul d end in .m3u8 extension"))
}
targetDirPath := path.Dir(targetURL.Path)
targetManifestFilename := path.Base(targetURL.String())
targetExtension := path.Ext(targetManifestFilename)
if targetExtension != ".m3u8" {
errors.WriteHTTPBadRequest(w, "Invalid request payload", fmt.Errorf("target output file should end in .m3u8 extension"))
}
targetSegmentedOutputPath := path.Join(targetDirPath, "source", targetManifestFilename)
sout, err := url.Parse(targetSegmentedOutputPath)
if err != nil {
errors.WriteHTTPInternalServerError(w, "Cannot parse targetSegmentedOutputPath", err)
}
targetSegmentedOutputURL := targetURL.ResolveReference(sout)

log.AddContext(requestID, "segmented_url", targetSegmentedOutputURL.String())

streamName := config.RandomStreamName(config.SEGMENTING_PREFIX)
cache.DefaultStreamCache.Segmenting.Store(streamName, cache.StreamInfo{
SourceFile: uploadVODRequest.Url,
CallbackURL: uploadVODRequest.CallbackUrl,
UploadURL: uploadVODRequest.OutputLocations[0].URL,
UploadURL: targetSegmentedOutputURL.String(),
AccessToken: uploadVODRequest.AccessToken,
TranscodeAPIUrl: uploadVODRequest.TranscodeAPIUrl,
RequestID: requestID,
})

// process the request
if err := d.processUploadVOD(streamName, uploadVODRequest.Url, tURL); err != nil {
if err := d.processUploadVOD(streamName, uploadVODRequest.Url, targetSegmentedOutputURL.String()); err != nil {
errors.WriteHTTPInternalServerError(w, "Cannot process upload VOD request", err)
}

Expand Down
4 changes: 2 additions & 2 deletions transcode/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetOSURL s
// For each profile, add a new entry to the master manifest
bitsPerSecond := uint32(float64(profile.Bytes) * 8000.0 / float64(profile.DurationMs))
masterPlaylist.Append(
fmt.Sprintf("rendition-%d/rendition.m3u8", i),
fmt.Sprintf("rendition-%d/index.m3u8", i),
&m3u8.MediaPlaylist{
TargetDuration: sourceManifest.TargetDuration,
},
Expand Down Expand Up @@ -115,7 +115,7 @@ func GenerateAndUploadManifests(sourceManifest m3u8.MediaPlaylist, targetOSURL s
renditionPlaylist.Close()

renditionManifestBaseURL := fmt.Sprintf("%s/rendition-%d", targetOSURL, i)
err = clients.UploadToOSURL(renditionManifestBaseURL, "rendition.m3u8", strings.NewReader(renditionPlaylist.String()))
err = clients.UploadToOSURL(renditionManifestBaseURL, "index.m3u8", strings.NewReader(renditionPlaylist.String()))
if err != nil {
return "", fmt.Errorf("failed to upload rendition playlist: %s", err)
}
Expand Down
10 changes: 5 additions & 5 deletions transcode/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ func TestItCanGenerateAndWriteManifests(t *testing.T) {
const expectedMasterManifest = `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:PROGRAM-ID=0,BANDWIDTH=1,RESOLUTION=800x600,NAME="0-lowlowlow",FRAME-RATE=60.000
rendition-0/rendition.m3u8
rendition-0/index.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=0,BANDWIDTH=1,RESOLUTION=1080x720,NAME="1-super-high-def",FRAME-RATE=30.000
rendition-1/rendition.m3u8
rendition-1/index.m3u8
`
require.Equal(t, expectedMasterManifest, string(masterManifestContents))

// Confirm we wrote out the rendition manifests that we expected
require.FileExists(t, filepath.Join(outputDir, "rendition-0/rendition.m3u8"))
require.FileExists(t, filepath.Join(outputDir, "rendition-1/rendition.m3u8"))
require.NoFileExists(t, filepath.Join(outputDir, "rendition-2/rendition.m3u8"))
require.FileExists(t, filepath.Join(outputDir, "rendition-0/index.m3u8"))
require.FileExists(t, filepath.Join(outputDir, "rendition-1/index.m3u8"))
require.NoFileExists(t, filepath.Join(outputDir, "rendition-2/index.m3u8"))
}
39 changes: 26 additions & 13 deletions transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"net/url"
"path"
"sync"
"time"

Expand Down Expand Up @@ -68,17 +69,31 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st

outputs := []clients.OutputVideo{}

// Create a separate subdirectory for the transcoded renditions
segmentedUploadURL, err := url.Parse(transcodeRequest.UploadURL)
// Parse the manifest destination of the segmented output specified in the request
segmentedOutputManifestURL, err := url.Parse(transcodeRequest.UploadURL)
if err != nil {
return outputs, fmt.Errorf("failed to parse transcodeRequest.UploadURL: %s", err)
}
relativeTranscodeURL, err := url.Parse("transcoded/")
// Go back to the root directory to set as the output for transcode renditions
targetTranscodedPath := path.Dir(path.Dir(segmentedOutputManifestURL.Path))
// Use the same manifest filename that was used for the segmented manifest
targetTranscodedManifestFilename := path.Base(segmentedOutputManifestURL.String())
// Generate the new output path of the transcoded manifest
targetTranscodedOutputPath := path.Join(targetTranscodedPath, targetTranscodedManifestFilename)
// Generate the manifest output URL from the manifest output path (e.g. s3+https://USER:[email protected]/user/hls/index.m3u8)
tpath, err := url.Parse(targetTranscodedOutputPath)
if err != nil {
return outputs, fmt.Errorf("failed to parse relativeTranscodeURL: %s", err)
return outputs, fmt.Errorf("failed to parse targetTranscodedOutputPath: %s", err)
}
targetTranscodedOutputURL := segmentedOutputManifestURL.ResolveReference(tpath)
fmt.Println(targetTranscodedOutputURL)
// Generate the rendition output URL (e.g. s3+https://USER:[email protected]/user/hls/)
tout, err := url.Parse(targetTranscodedPath)
if err != nil {
return outputs, fmt.Errorf("failed to parse targetTranscodedPath: %s", err)
}
targetTranscodedRenditionOutputURL := segmentedOutputManifestURL.ResolveReference(tout)

targetOSURL := segmentedUploadURL.ResolveReference(relativeTranscodeURL)
// Grab some useful parameters to be used later from the TranscodeSegmentRequest
sourceManifestOSURL := transcodeRequest.UploadURL
// transcodeProfiles are desired constraints for transcoding process
Expand Down Expand Up @@ -135,7 +150,7 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
go func() {
defer completed.Done()
for segment := range queue {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetOSURL, transcodedStats)
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetTranscodedRenditionOutputURL, transcodedStats)
if err != nil {
errors <- err
return
Expand All @@ -157,14 +172,14 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
}

// Build the manifests and push them to storage
manifestManifestURL, err := GenerateAndUploadManifests(sourceManifest, targetOSURL.String(), transcodedStats)
manifestManifestURL, err := GenerateAndUploadManifests(sourceManifest, targetTranscodedRenditionOutputURL.String(), transcodedStats)
if err != nil {
return outputs, err
}

outputs = []clients.OutputVideo{
{
Type: "google-s3",
Type: "object_store",
Manifest: manifestManifestURL,
},
}
Expand Down Expand Up @@ -208,14 +223,12 @@ func transcodeSegment(segment segmentInfo, streamName, manifestID string, transc
return fmt.Errorf("failed to find profile with name %q while parsing rendition segment", transcodedSegment.Name)
}

relativeRenditionPath := fmt.Sprintf("rendition-%d/", renditionIndex)
relativeRenditionURL, err := url.Parse(relativeRenditionPath)
targetRenditionURL, err := url.JoinPath(targetOSURL.String(), fmt.Sprintf("rendition-%d/", renditionIndex))
if err != nil {
return fmt.Errorf("error building rendition segment URL %q: %s", relativeRenditionPath, err)
return fmt.Errorf("error building rendition segment URL %q: %s", targetRenditionURL, err)
}
renditionURL := targetOSURL.ResolveReference(relativeRenditionURL)

err = clients.UploadToOSURL(renditionURL.String(), fmt.Sprintf("%d.ts", segment.Index), bytes.NewReader(transcodedSegment.MediaData))
err = clients.UploadToOSURL(targetRenditionURL, fmt.Sprintf("%d.ts", segment.Index), bytes.NewReader(transcodedSegment.MediaData))
if err != nil {
return fmt.Errorf("failed to upload master playlist: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion transcode/transcode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestItCanTranscode(t *testing.T) {
dir := os.TempDir()

// Create temporary manifest + segment files on the local filesystem
manifestFile, err := os.CreateTemp(dir, "manifest-*.m3u8")
manifestFile, err := os.CreateTemp(dir+"/path/to/", "index.m3u8")
require.NoError(t, err)

segment0, err := os.Create(dir + "/0.ts")
Expand Down