Skip to content

Commit

Permalink
hls: fix freeze in case of muxing errors (#3135)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Mar 19, 2024
1 parent c7dbb95 commit e494947
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 12 deletions.
1 change: 1 addition & 0 deletions internal/asyncwriter/async_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (w *Writer) Error() chan error {

func (w *Writer) run() {
w.err <- w.runInner()
close(w.err)
}

func (w *Writer) runInner() error {
Expand Down
22 changes: 22 additions & 0 deletions internal/asyncwriter/async_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package asyncwriter

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestAsyncWriter(t *testing.T) {
w := New(512, nil)

w.Start()
defer w.Stop()

w.Push(func() error {
return fmt.Errorf("testerror")
})

err := <-w.Error()
require.EqualError(t, err, "testerror")
}
15 changes: 7 additions & 8 deletions internal/servers/hls/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func (m *muxer) runInner() error {
recreateTimer = emptyTimer()
}

defer func() {
if mi != nil {
mi.close()
}
}()

var activityCheckTimer *time.Timer
if m.remoteAddr != "" {
activityCheckTimer = time.NewTimer(closeCheckPeriod)
Expand All @@ -178,13 +184,12 @@ func (m *muxer) runInner() error {
req.res <- mi

case err := <-instanceError:
mi.close()

if m.remoteAddr != "" {
return err
}

m.Log(logger.Error, err.Error())
mi.close()

Check warning on line 192 in internal/servers/hls/muxer.go

View check run for this annotation

Codecov / codecov/patch

internal/servers/hls/muxer.go#L192

Added line #L192 was not covered by tests
mi = nil
instanceError = make(chan error)
recreateTimer = time.NewTimer(recreatePause)
Expand Down Expand Up @@ -215,17 +220,11 @@ func (m *muxer) runInner() error {
case <-activityCheckTimer.C:
t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime))
if time.Since(t) >= closeAfterInactivity {
if mi != nil {
mi.close()
}
return fmt.Errorf("not used anymore")
}
activityCheckTimer = time.NewTimer(closeCheckPeriod)

case <-m.ctx.Done():
if mi != nil {
mi.close()
}
return errors.New("terminated")
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/servers/rtmp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error {
c.nconn.SetReadDeadline(time.Time{})

writer.Start()
defer writer.Stop()

select {
case <-c.ctx.Done():
writer.Stop()
return fmt.Errorf("terminated")

case err := <-writer.Error():
Expand Down
2 changes: 1 addition & 1 deletion internal/servers/srt/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ func (c *conn) runRead(req srtNewConnReq, streamID *streamID) (bool, error) {
sconn.SetReadDeadline(time.Time{})

writer.Start()
defer writer.Stop()

select {
case <-c.ctx.Done():
writer.Stop()
return true, fmt.Errorf("terminated")

case err := <-writer.Error():
Expand Down
3 changes: 1 addition & 2 deletions internal/servers/webrtc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,17 +612,16 @@ func (s *session) runRead() (int, error) {
defer onUnreadHook()

writer.Start()
defer writer.Stop()

select {
case <-pc.Disconnected():
writer.Stop()
return 0, fmt.Errorf("peer connection closed")

case err := <-writer.Error():
return 0, err

case <-s.ctx.Done():
writer.Stop()
return 0, fmt.Errorf("terminated")
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/test/source_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (t *SourceTester) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathS
)

t.writer = asyncwriter.New(2048, t)

t.stream.AddReader(t.writer, req.Desc.Medias[0], req.Desc.Medias[0].Formats[0], func(u unit.Unit) error {
t.Unit <- u
close(t.Unit)
Expand Down

0 comments on commit e494947

Please sign in to comment.