Skip to content

Commit

Permalink
hls: synchronize track reproduction around a leading one
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Oct 23, 2022
1 parent 19a369a commit 6718406
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 276 deletions.
91 changes: 62 additions & 29 deletions internal/hls/client_downloader_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ func allCodecsAreSupported(codecs string) bool {
return true
}

func pickLeadingPlaylist(variants []*gm3u8.Variant) *gm3u8.Variant {
var candidates []*gm3u8.Variant //nolint:prealloc
for _, v := range variants {
if !allCodecsAreSupported(v.Codecs) {
continue
}
candidates = append(candidates, v)
}
if candidates == nil {
return nil
}

// pick the variant with the greatest bandwidth
var leadingPlaylist *gm3u8.Variant
for _, v := range candidates {
if leadingPlaylist == nil ||
v.VariantParams.Bandwidth > leadingPlaylist.VariantParams.Bandwidth {
leadingPlaylist = v
}
}
return leadingPlaylist
}

func pickAudioPlaylist(alternatives []*gm3u8.Alternative, groupID string) *gm3u8.Alternative {
candidates := func() []*gm3u8.Alternative {
var ret []*gm3u8.Alternative
Expand All @@ -78,6 +101,8 @@ func pickAudioPlaylist(alternatives []*gm3u8.Alternative, groupID string) *gm3u8
return candidates[0]
}

type clientTimeSync interface{}

type clientDownloaderPrimary struct {
primaryPlaylistURL *url.URL
logger ClientLogger
Expand All @@ -86,13 +111,15 @@ type clientDownloaderPrimary struct {
onAudioData func(time.Duration, []byte)
rp *clientRoutinePool

httpClient *http.Client
httpClient *http.Client
leadingTimeSync clientTimeSync

// in
streamTracks chan []gortsplib.Track

// out
startStreaming chan struct{}
startStreaming chan struct{}
leadingTimeSyncReady chan struct{}
}

func newClientDownloaderPrimary(
Expand Down Expand Up @@ -136,8 +163,9 @@ func newClientDownloaderPrimary(
TLSClientConfig: tlsConfig,
},
},
streamTracks: make(chan []gortsplib.Track),
startStreaming: make(chan struct{}),
streamTracks: make(chan []gortsplib.Track),
startStreaming: make(chan struct{}),
leadingTimeSyncReady: make(chan struct{}),
}
}

Expand All @@ -155,63 +183,50 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
case *m3u8.MediaPlaylist:
d.logger.Log(logger.Debug, "primary playlist is a stream playlist")
ds := newClientDownloaderStream(
true,
d.httpClient,
d.primaryPlaylistURL,
plt,
d.logger,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onVideoData,
d.onAudioData)
d.rp.add(ds)
streamCount++

case *m3u8.MasterPlaylist:
// gather variants with supported codecs
var supportedVariants []*gm3u8.Variant
for _, v := range plt.Variants {
if !allCodecsAreSupported(v.Codecs) {
continue
}
supportedVariants = append(supportedVariants, v)
}
if supportedVariants == nil {
leadingPlaylist := pickLeadingPlaylist(plt.Variants)
if leadingPlaylist == nil {
return fmt.Errorf("no variants with supported codecs found")
}

// choose the variant with the greatest bandwidth
var chosenVariant *gm3u8.Variant
for _, v := range supportedVariants {
if chosenVariant == nil ||
v.VariantParams.Bandwidth > chosenVariant.VariantParams.Bandwidth {
chosenVariant = v
}
}
if chosenVariant == nil {
return fmt.Errorf("no variants found")
}

u, err := clientAbsoluteURL(d.primaryPlaylistURL, chosenVariant.URI)
u, err := clientAbsoluteURL(d.primaryPlaylistURL, leadingPlaylist.URI)
if err != nil {
return err
}

ds := newClientDownloaderStream(
true,
d.httpClient,
u,
nil,
d.logger,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onVideoData,
d.onAudioData)
d.rp.add(ds)
streamCount++

if chosenVariant.Audio != "" {
audioPlaylist := pickAudioPlaylist(plt.Alternatives, chosenVariant.Audio)
if leadingPlaylist.Audio != "" {
audioPlaylist := pickAudioPlaylist(plt.Alternatives, leadingPlaylist.Audio)
if audioPlaylist == nil {
return fmt.Errorf("audio playlist with id \"%s\" not found", chosenVariant.Audio)
return fmt.Errorf("audio playlist with id \"%s\" not found", leadingPlaylist.Audio)
}

u, err := clientAbsoluteURL(d.primaryPlaylistURL, audioPlaylist.URI)
Expand All @@ -220,12 +235,15 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
}

ds := newClientDownloaderStream(
false,
d.httpClient,
u,
nil,
d.logger,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onVideoData,
d.onAudioData)
d.rp.add(ds)
Expand Down Expand Up @@ -272,6 +290,7 @@ func (d *clientDownloaderPrimary) run(ctx context.Context) error {
}

close(d.startStreaming)

return nil
}

Expand All @@ -290,3 +309,17 @@ func (d *clientDownloaderPrimary) onStreamTracks(ctx context.Context, tracks []g

return true
}

func (d *clientDownloaderPrimary) onSetLeadingTimeSync(ts clientTimeSync) {
d.leadingTimeSync = ts
close(d.leadingTimeSyncReady)
}

func (d *clientDownloaderPrimary) onGetLeadingTimeSync(ctx context.Context) (clientTimeSync, bool) {
select {
case <-d.leadingTimeSyncReady:
case <-ctx.Done():
return nil, false
}
return d.leadingTimeSync, true
}
65 changes: 40 additions & 25 deletions internal/hls/client_downloader_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,46 @@ func findSegmentWithID(seqNo uint64, segments []*gm3u8.MediaSegment, id uint64)
}

type clientDownloaderStream struct {
httpClient *http.Client
playlistURL *url.URL
initialPlaylist *m3u8.MediaPlaylist
logger ClientLogger
rp *clientRoutinePool
onStreamTracks func(context.Context, []gortsplib.Track) bool
onVideoData func(time.Duration, [][]byte)
onAudioData func(time.Duration, []byte)

segmentQueue *clientSegmentQueue
isLeading bool
httpClient *http.Client
playlistURL *url.URL
initialPlaylist *m3u8.MediaPlaylist
logger ClientLogger
rp *clientRoutinePool
onStreamTracks func(context.Context, []gortsplib.Track) bool
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onVideoData func(time.Duration, [][]byte)
onAudioData func(time.Duration, []byte)

curSegmentID *uint64
}

func newClientDownloaderStream(
isLeading bool,
httpClient *http.Client,
playlistURL *url.URL,
initialPlaylist *m3u8.MediaPlaylist,
logger ClientLogger,
rp *clientRoutinePool,
onStreamTracks func(context.Context, []gortsplib.Track) bool,
onSetLeadingTimeSync func(clientTimeSync),
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool),
onVideoData func(time.Duration, [][]byte),
onAudioData func(time.Duration, []byte),
) *clientDownloaderStream {
return &clientDownloaderStream{
httpClient: httpClient,
playlistURL: playlistURL,
initialPlaylist: initialPlaylist,
logger: logger,
rp: rp,
segmentQueue: newClientSegmentQueue(),
onStreamTracks: onStreamTracks,
onVideoData: onVideoData,
onAudioData: onAudioData,
isLeading: isLeading,
httpClient: httpClient,
playlistURL: playlistURL,
initialPlaylist: initialPlaylist,
logger: logger,
rp: rp,
onStreamTracks: onStreamTracks,
onSetLeadingTimeSync: onSetLeadingTimeSync,
onGetLeadingTimeSync: onGetLeadingTimeSync,
onVideoData: onVideoData,
onAudioData: onAudioData,
}
}

Expand All @@ -91,6 +98,8 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
}
}

segmentQueue := newClientSegmentQueue()

if initialPlaylist.Map != nil && initialPlaylist.Map.URI != "" {
byts, err := d.downloadSegment(ctx, initialPlaylist.Map.URI, initialPlaylist.Map.Offset, initialPlaylist.Map.Limit)
if err != nil {
Expand All @@ -99,11 +108,14 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {

proc, err := newClientProcessorFMP4(
ctx,
d.isLeading,
byts,
d.segmentQueue,
segmentQueue,
d.logger,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onVideoData,
d.onAudioData,
)
Expand All @@ -114,23 +126,26 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
d.rp.add(proc)
} else {
proc := newClientProcessorMPEGTS(
d.segmentQueue,
d.isLeading,
segmentQueue,
d.logger,
d.rp,
d.onStreamTracks,
d.onSetLeadingTimeSync,
d.onGetLeadingTimeSync,
d.onVideoData,
d.onAudioData,
)
d.rp.add(proc)
}

for {
ok := d.segmentQueue.waitUntilSizeIsBelow(ctx, 1)
ok := segmentQueue.waitUntilSizeIsBelow(ctx, 1)
if !ok {
return fmt.Errorf("terminated")
}

err := d.fillSegmentQueue(ctx)
err := d.fillSegmentQueue(ctx, segmentQueue)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +204,7 @@ func (d *clientDownloaderStream) downloadSegment(ctx context.Context,
return byts, nil
}

func (d *clientDownloaderStream) fillSegmentQueue(ctx context.Context) error {
func (d *clientDownloaderStream) fillSegmentQueue(ctx context.Context, segmentQueue *clientSegmentQueue) error {
pl, err := d.downloadPlaylist(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -233,7 +248,7 @@ func (d *clientDownloaderStream) fillSegmentQueue(ctx context.Context) error {
return err
}

d.segmentQueue.push(byts)
segmentQueue.push(byts)

if pl.Closed && pl.Segments[len(pl.Segments)-1] == seg {
<-ctx.Done()
Expand Down
Loading

0 comments on commit 6718406

Please sign in to comment.