diff --git a/internal/asyncwriter/async_writer.go b/internal/asyncwriter/async_writer.go index 36761e3658f..2c80f0d3032 100644 --- a/internal/asyncwriter/async_writer.go +++ b/internal/asyncwriter/async_writer.go @@ -50,6 +50,7 @@ func (w *Writer) Error() chan error { func (w *Writer) run() { w.err <- w.runInner() + close(w.err) } func (w *Writer) runInner() error { diff --git a/internal/asyncwriter/async_writer_test.go b/internal/asyncwriter/async_writer_test.go new file mode 100644 index 00000000000..139dd44a593 --- /dev/null +++ b/internal/asyncwriter/async_writer_test.go @@ -0,0 +1,22 @@ +package asyncwriter + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAsyncWriter(t *testing.T) { + w := New(512, nil) + + w.Start() + defer w.Stop() + + w.Push(func() error { + return fmt.Errorf("testerror") + }) + + err := <-w.Error() + require.EqualError(t, err, "testerror") +} diff --git a/internal/servers/hls/muxer.go b/internal/servers/hls/muxer.go index 6a319a5fb4e..ac103641194 100644 --- a/internal/servers/hls/muxer.go +++ b/internal/servers/hls/muxer.go @@ -165,6 +165,12 @@ func (m *muxer) runInner() error { recreateTimer = emptyTimer() } + defer func() { + if mi != nil { + mi.close() + } + }() + var activityCheckTimer *time.Timer if m.remoteAddr != "" { activityCheckTimer = time.NewTimer(closeCheckPeriod) @@ -178,13 +184,12 @@ func (m *muxer) runInner() error { req.res <- mi case err := <-instanceError: - mi.close() - if m.remoteAddr != "" { return err } m.Log(logger.Error, err.Error()) + mi.close() mi = nil instanceError = make(chan error) recreateTimer = time.NewTimer(recreatePause) @@ -215,17 +220,11 @@ func (m *muxer) runInner() error { case <-activityCheckTimer.C: t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime)) if time.Since(t) >= closeAfterInactivity { - if mi != nil { - mi.close() - } return fmt.Errorf("not used anymore") } activityCheckTimer = time.NewTimer(closeCheckPeriod) case <-m.ctx.Done(): - if mi != nil { - mi.close() - } return errors.New("terminated") } } diff --git a/internal/servers/rtmp/conn.go b/internal/servers/rtmp/conn.go index 2b7b8c02c94..4d78b737e52 100644 --- a/internal/servers/rtmp/conn.go +++ b/internal/servers/rtmp/conn.go @@ -237,10 +237,10 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error { c.nconn.SetReadDeadline(time.Time{}) writer.Start() + defer writer.Stop() select { case <-c.ctx.Done(): - writer.Stop() return fmt.Errorf("terminated") case err := <-writer.Error(): diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index b227fe4fcd9..971fb5afcdc 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -327,10 +327,10 @@ func (c *conn) runRead(req srtNewConnReq, streamID *streamID) (bool, error) { sconn.SetReadDeadline(time.Time{}) writer.Start() + defer writer.Stop() select { case <-c.ctx.Done(): - writer.Stop() return true, fmt.Errorf("terminated") case err := <-writer.Error(): diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 67e3a2a8583..96d0f15ea50 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -612,17 +612,16 @@ func (s *session) runRead() (int, error) { defer onUnreadHook() writer.Start() + defer writer.Stop() select { case <-pc.Disconnected(): - writer.Stop() return 0, fmt.Errorf("peer connection closed") case err := <-writer.Error(): return 0, err case <-s.ctx.Done(): - writer.Stop() return 0, fmt.Errorf("terminated") } } diff --git a/internal/test/source_tester.go b/internal/test/source_tester.go index 475e653332d..7a52501aa15 100644 --- a/internal/test/source_tester.go +++ b/internal/test/source_tester.go @@ -69,6 +69,7 @@ func (t *SourceTester) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathS ) t.writer = asyncwriter.New(2048, t) + t.stream.AddReader(t.writer, req.Desc.Medias[0], req.Desc.Medias[0].Formats[0], func(u unit.Unit) error { t.Unit <- u close(t.Unit)