Skip to content

Commit

Permalink
transcode: retreive T segments, sort, and concatenate into single .ts…
Browse files Browse the repository at this point in the history
… file

The segments returned from the T are first stored in the order they are
received, and then sorted according to the segment index, and then
concatenated into a single .ts file. A nested map with an outer and
inner table is used to temporarily store the segment data streams.
  • Loading branch information
emranemran committed Mar 1, 2023
1 parent c95dc31 commit e0bfd82
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 1 deletion.
37 changes: 36 additions & 1 deletion transcode/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/url"
"path"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -109,9 +110,20 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
// transcodedStats hold actual info from transcoded results within requested constraints (this usually differs from requested profiles)
transcodedStats := statsFromProfiles(transcodeProfiles)

renditionList := video.TRenditionList{RenditionSegmentTable: make(map[string]*video.TSegmentList)}
// only populate video.TRenditionList map if MP4 is enabled via override or short-form video detection
if transcodeRequest.GenerateMP4 {
for _, profile := range transcodeProfiles {
renditionList.AddRenditionSegment(profile.Name,
&video.TSegmentList{
SegmentDataTable: make(map[int][]byte),
})
}
}

var jobs *ParallelTranscoding
jobs = NewParallelTranscoding(sourceSegmentURLs, func(segment segmentInfo) error {
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetTranscodedRenditionOutputURL, transcodedStats)
err := transcodeSegment(segment, streamName, manifestID, transcodeRequest, transcodeProfiles, targetTranscodedRenditionOutputURL, transcodedStats, renditionList)
segmentsCount++
if err != nil {
return err
Expand All @@ -129,6 +141,20 @@ func RunTranscodeProcess(transcodeRequest TranscodeSegmentRequest, streamName st
return outputs, segmentsCount, err
}

// TODO: remove this in follow-up PR to write files to disk for transmuxing
/*
if transcodeRequest.GenerateMP4 {
for rlist, slist := range renditionList.RenditionSegmentTable {
table := slist.SegmentDataTable
for s, d := range table {
fmt.Println("debug-segments:", rlist, ":", strconv.Itoa(s)+".ts", "size:", len(d))
}
for _, k := range slist.GetSortedSegments() {
fmt.Println("debug-segments:", k, len(slist.SegmentDataTable[k]))
}
}
}
*/
// Build the manifests and push them to storage
manifestURL, err := GenerateAndUploadManifests(sourceManifest, targetTranscodedRenditionOutputURL.String(), transcodedStats)
if err != nil {
Expand Down Expand Up @@ -157,6 +183,7 @@ func transcodeSegment(
transcodeProfiles []video.EncodedProfile,
targetOSURL *url.URL,
transcodedStats []*RenditionStats,
renditionList video.TRenditionList,
) error {
start := time.Now()

Expand Down Expand Up @@ -207,12 +234,20 @@ func transcodeSegment(
return fmt.Errorf("error building rendition segment URL %q: %s", targetRenditionURL, err)
}

if transcodeRequest.GenerateMP4 {
// get inner segments table from outer rendition table
segmentsList := renditionList.GetSegmentList(transcodedSegment.Name)
// add new entry for segment # and corresponding byte stream
segmentsList.AddSegmentData(segment.Index, transcodedSegment.MediaData)
}

err = backoff.Retry(func() error {
return clients.UploadToOSURL(targetRenditionURL, fmt.Sprintf("%d.ts", segment.Index), bytes.NewReader(transcodedSegment.MediaData), UPLOAD_TIMEOUT)
}, clients.UploadRetryBackoff())
if err != nil {
return fmt.Errorf("failed to upload master playlist: %s", err)
}

// bitrate calculation
transcodedStats[renditionIndex].Bytes += int64(len(transcodedSegment.MediaData))
transcodedStats[renditionIndex].DurationMs += float64(segment.Input.DurationMillis)
Expand Down
73 changes: 73 additions & 0 deletions video/media.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package video

import (
"sort"
"sync"
)

/* The struct definitions here aims to represent the transcoded stream(s)
and it's segment data in a table of nested maps as follows.
____________________________________________
| Rendition | | Segment # | | Data |
|___________|______|___________|______|______|
360p0 ---> 0.ts ---> [...]
1.ts ---> [...]
1080p0 ---> 0.ts ---> [...]
1.ts ---> [...]
The inner map is accessed via TSegmentList representing the
segments returned by the T for a given rendition (e.g. 360p0).
It maps the segment index to the byte stream.
The outer map is accessed via TRenditionList representing the
renditions returned by the T. It maps the rendition name to the
list of segments referenced by the inner map above.
Since parallel jobs are used to transcode, all r/w accesses to
these structs are protected to allow for atomic ops.
*/

type TSegmentList struct {
mu sync.Mutex
SegmentDataTable map[int][]byte
}

func (s *TSegmentList) AddSegmentData(segIdx int, data []byte) {
s.mu.Lock()
s.SegmentDataTable[segIdx] = data
s.mu.Unlock()
}

func (s *TSegmentList) GetSegment(segIdx int) []byte {
s.mu.Lock()
defer s.mu.Unlock()
return s.SegmentDataTable[segIdx]
}

func (s *TSegmentList) GetSortedSegments() []int {
segmentsTable := s.SegmentDataTable
segments := make([]int, 0, len(segmentsTable))
for k := range segmentsTable {
segments = append(segments, k)
}
sort.Ints(segments)
return segments
}

type TRenditionList struct {
mu sync.Mutex
RenditionSegmentTable map[string]*TSegmentList
}

func (r *TRenditionList) AddRenditionSegment(rendName string, sList *TSegmentList) {
r.mu.Lock()
r.RenditionSegmentTable[rendName] = sList
r.mu.Unlock()
}

func (r *TRenditionList) GetSegmentList(rendName string) *TSegmentList {
r.mu.Lock()
defer r.mu.Unlock()
return r.RenditionSegmentTable[rendName]
}

0 comments on commit e0bfd82

Please sign in to comment.