From bbf3d0c578ae0d574a1aef3c9f1dd20fbb1987ab Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 19 May 2024 19:53:31 +0200 Subject: [PATCH] webrtc: support reading and publishing multichannel Opus (#3371) --- internal/protocols/webrtc/incoming_track.go | 62 ++++- internal/protocols/webrtc/outgoing_track.go | 25 +- internal/protocols/webrtc/peer_connection.go | 12 +- .../protocols/webrtc/peer_connection_test.go | 218 +++++++++++++++++- internal/servers/webrtc/read_index.html | 40 ++++ internal/servers/webrtc/session.go | 2 +- 6 files changed, 346 insertions(+), 13 deletions(-) diff --git a/internal/protocols/webrtc/incoming_track.go b/internal/protocols/webrtc/incoming_track.go index 9146e80884ca..785a432753fa 100644 --- a/internal/protocols/webrtc/incoming_track.go +++ b/internal/protocols/webrtc/incoming_track.go @@ -93,6 +93,60 @@ var incomingVideoCodecs = []webrtc.RTPCodecParameters{ } var incomingAudioCodecs = []webrtc.RTPCodecParameters{ + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeMultiopus, + ClockRate: 48000, + Channels: 3, + SDPFmtpLine: "channel_mapping=0,2,1;num_streams=2;coupled_streams=1", + }, + PayloadType: 112, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeMultiopus, + ClockRate: 48000, + Channels: 4, + SDPFmtpLine: "channel_mapping=0,1,2,3;num_streams=2;coupled_streams=2", + }, + PayloadType: 113, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeMultiopus, + ClockRate: 48000, + Channels: 5, + SDPFmtpLine: "channel_mapping=0,4,1,2,3;num_streams=3;coupled_streams=2", + }, + PayloadType: 114, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeMultiopus, + ClockRate: 48000, + Channels: 6, + SDPFmtpLine: "channel_mapping=0,4,1,2,3,5;num_streams=4;coupled_streams=2", + }, + PayloadType: 115, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeMultiopus, + ClockRate: 48000, + Channels: 7, + SDPFmtpLine: "channel_mapping=0,4,1,2,3,5,6;num_streams=4;coupled_streams=4", + }, + PayloadType: 116, + }, + { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeMultiopus, + ClockRate: 48000, + Channels: 8, + SDPFmtpLine: "channel_mapping=0,6,1,4,5,2,3,7;num_streams=5;coupled_streams=4", + }, + PayloadType: 117, + }, { RTPCodecCapability: webrtc.RTPCodecCapability{ MimeType: webrtc.MimeTypeOpus, @@ -191,6 +245,12 @@ func newIncomingTrack( PacketizationMode: 1, } + case strings.ToLower(mimeMultiopus): + t.format = &format.Opus{ + PayloadTyp: uint8(track.PayloadType()), + ChannelCount: int(track.Codec().Channels), + } + case strings.ToLower(webrtc.MimeTypeOpus): t.format = &format.Opus{ PayloadTyp: uint8(track.PayloadType()), @@ -242,7 +302,7 @@ func newIncomingTrack( } default: - return nil, fmt.Errorf("unsupported codec: %v", track.Codec()) + return nil, fmt.Errorf("unsupported codec: %+v", track.Codec()) } // read incoming RTCP packets to make interceptors work diff --git a/internal/protocols/webrtc/outgoing_track.go b/internal/protocols/webrtc/outgoing_track.go index 9c67d0d7e41f..3f790ba19a22 100644 --- a/internal/protocols/webrtc/outgoing_track.go +++ b/internal/protocols/webrtc/outgoing_track.go @@ -8,6 +8,10 @@ import ( "github.com/pion/webrtc/v3" ) +const ( + mimeMultiopus = "audio/multiopus" +) + // OutgoingTrack is a WebRTC outgoing track type OutgoingTrack struct { Format format.Format @@ -29,9 +33,8 @@ func (t *OutgoingTrack) codecParameters() (webrtc.RTPCodecParameters, error) { case *format.VP9: return webrtc.RTPCodecParameters{ RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeVP9, - ClockRate: 90000, - SDPFmtpLine: "profile-id=1", + MimeType: webrtc.MimeTypeVP9, + ClockRate: 90000, }, PayloadType: 96, }, nil @@ -57,7 +60,14 @@ func (t *OutgoingTrack) codecParameters() (webrtc.RTPCodecParameters, error) { case *format.Opus: if forma.ChannelCount > 2 { - return webrtc.RTPCodecParameters{}, fmt.Errorf("unsupported Opus channel count: %d", forma.ChannelCount) + return webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: mimeMultiopus, + ClockRate: 48000, + Channels: uint16(forma.ChannelCount), + }, + PayloadType: 96, + }, nil } return webrtc.RTPCodecParameters{ @@ -65,6 +75,13 @@ func (t *OutgoingTrack) codecParameters() (webrtc.RTPCodecParameters, error) { MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, + SDPFmtpLine: func() string { + s := "minptime=10;useinbandfec=1" + if forma.ChannelCount == 2 { + s += ";stereo=1;sprop-stereo=1" + } + return s + }(), }, PayloadType: 96, }, nil diff --git a/internal/protocols/webrtc/peer_connection.go b/internal/protocols/webrtc/peer_connection.go index bb984fd77a27..db1f1cb72589 100644 --- a/internal/protocols/webrtc/peer_connection.go +++ b/internal/protocols/webrtc/peer_connection.go @@ -2,6 +2,7 @@ package webrtc import ( "context" + "errors" "fmt" "strconv" "sync" @@ -261,8 +262,8 @@ func (co *PeerConnection) SetAnswer(answer *webrtc.SessionDescription) error { } // AddRemoteCandidate adds a remote candidate. -func (co *PeerConnection) AddRemoteCandidate(candidate webrtc.ICECandidateInit) error { - return co.wr.AddICECandidate(candidate) +func (co *PeerConnection) AddRemoteCandidate(candidate *webrtc.ICECandidateInit) error { + return co.wr.AddICECandidate(*candidate) } // CreateFullAnswer creates a full answer. @@ -277,7 +278,7 @@ func (co *PeerConnection) CreateFullAnswer( answer, err := co.wr.CreateAnswer(nil) if err != nil { - if err.Error() == "unable to populate media section, RTPSender created with no codecs" { + if errors.Is(err, webrtc.ErrSenderWithNoCodecs) { return nil, fmt.Errorf("track codecs are not supported by remote") } return nil, err @@ -288,7 +289,7 @@ func (co *PeerConnection) CreateFullAnswer( return nil, err } - err = co.WaitGatheringDone(ctx) + err = co.waitGatheringDone(ctx) if err != nil { return nil, err } @@ -296,8 +297,7 @@ func (co *PeerConnection) CreateFullAnswer( return co.wr.LocalDescription(), nil } -// WaitGatheringDone waits until candidate gathering is complete. -func (co *PeerConnection) WaitGatheringDone(ctx context.Context) error { +func (co *PeerConnection) waitGatheringDone(ctx context.Context) error { for { select { case <-co.NewLocalCandidate(): diff --git a/internal/protocols/webrtc/peer_connection_test.go b/internal/protocols/webrtc/peer_connection_test.go index b9ed8ba88c67..c537da04af67 100644 --- a/internal/protocols/webrtc/peer_connection_test.go +++ b/internal/protocols/webrtc/peer_connection_test.go @@ -1,15 +1,18 @@ package webrtc import ( + "context" "testing" "time" + "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/test" + "github.com/pion/rtp" "github.com/stretchr/testify/require" ) -func TestPeerConnectionCloseAfterError(t *testing.T) { +func TestPeerConnectionCloseImmediately(t *testing.T) { pc := &PeerConnection{ HandshakeTimeout: conf.StringDuration(10 * time.Second), TrackGatherTimeout: conf.StringDuration(2 * time.Second), @@ -20,6 +23,7 @@ func TestPeerConnectionCloseAfterError(t *testing.T) { } err := pc.Start() require.NoError(t, err) + defer pc.Close() _, err = pc.CreatePartialOffer() require.NoError(t, err) @@ -29,3 +33,215 @@ func TestPeerConnectionCloseAfterError(t *testing.T) { pc.Close() } + +func TestPeerConnectionPublishRead(t *testing.T) { + for _, ca := range []struct { + name string + in format.Format + out format.Format + }{ + { + "av1", + &format.AV1{ + PayloadTyp: 96, + }, + &format.AV1{ + PayloadTyp: 96, + }, + }, + { + "vp9", + &format.VP9{ + PayloadTyp: 96, + }, + &format.VP9{ + PayloadTyp: 96, + }, + }, + { + "vp8", + &format.VP8{ + PayloadTyp: 96, + }, + &format.VP8{ + PayloadTyp: 96, + }, + }, + { + "h264", + test.FormatH264, + &format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }, + }, + { + "opus multichannel", + &format.Opus{ + PayloadTyp: 112, + ChannelCount: 6, + }, + &format.Opus{ + PayloadTyp: 96, + ChannelCount: 6, + }, + }, + { + "opus stereo", + &format.Opus{ + PayloadTyp: 111, + ChannelCount: 2, + }, + &format.Opus{ + PayloadTyp: 96, + ChannelCount: 2, + }, + }, + { + "opus mono", + &format.Opus{ + PayloadTyp: 111, + ChannelCount: 1, + }, + &format.Opus{ + PayloadTyp: 96, + ChannelCount: 1, + }, + }, + { + "g722", + &format.G722{}, + &format.G722{}, + }, + { + "g711 pcma stereo", + &format.G711{ + PayloadTyp: 96, + SampleRate: 8000, + ChannelCount: 2, + }, + &format.G711{ + PayloadTyp: 119, + SampleRate: 8000, + ChannelCount: 2, + }, + }, + { + "g711 pcmu stereo", + &format.G711{ + MULaw: true, + PayloadTyp: 96, + SampleRate: 8000, + ChannelCount: 2, + }, + &format.G711{ + MULaw: true, + PayloadTyp: 118, + SampleRate: 8000, + ChannelCount: 2, + }, + }, + { + "g711 pcma mono", + &format.G711{ + PayloadTyp: 8, + SampleRate: 8000, + ChannelCount: 1, + }, + &format.G711{ + PayloadTyp: 8, + SampleRate: 8000, + ChannelCount: 1, + }, + }, + // TODO: check why this fails + /* { + "g711 pcmu mono", + &format.G711{ + MULaw: true, + PayloadTyp: 0, + SampleRate: 8000, + ChannelCount: 1, + }, + &format.G711{ + MULaw: true, + PayloadTyp: 0, + SampleRate: 8000, + ChannelCount: 1, + }, + }, */ + } { + t.Run(ca.name, func(t *testing.T) { + pc1 := &PeerConnection{ + HandshakeTimeout: conf.StringDuration(10 * time.Second), + TrackGatherTimeout: conf.StringDuration(2 * time.Second), + LocalRandomUDP: true, + IPsFromInterfaces: true, + Publish: true, + OutgoingTracks: []*OutgoingTrack{{ + Format: ca.in, + }}, + Log: test.NilLogger, + } + err := pc1.Start() + require.NoError(t, err) + defer pc1.Close() + + pc2 := &PeerConnection{ + HandshakeTimeout: conf.StringDuration(10 * time.Second), + TrackGatherTimeout: conf.StringDuration(2 * time.Second), + LocalRandomUDP: true, + IPsFromInterfaces: true, + Publish: false, + Log: test.NilLogger, + } + err = pc2.Start() + require.NoError(t, err) + defer pc2.Close() + + offer, err := pc1.CreatePartialOffer() + require.NoError(t, err) + + answer, err := pc2.CreateFullAnswer(context.Background(), offer) + require.NoError(t, err) + + err = pc1.SetAnswer(answer) + require.NoError(t, err) + + go func() { + for { + select { + case cnd := <-pc1.NewLocalCandidate(): + pc2.AddRemoteCandidate(cnd) + + case <-pc1.Connected(): + return + } + } + }() + + err = pc1.WaitUntilConnected(context.Background()) + require.NoError(t, err) + + err = pc2.WaitUntilConnected(context.Background()) + require.NoError(t, err) + + pc1.OutgoingTracks[0].WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 111, + SequenceNumber: 1123, + Timestamp: 45343, + SSRC: 563424, + }, + Payload: []byte{5, 2}, + }) + + inc, err := pc2.GatherIncomingTracks(context.Background(), 1) + require.NoError(t, err) + + require.Equal(t, ca.out, inc[0].Format()) + }) + } +} diff --git a/internal/servers/webrtc/read_index.html b/internal/servers/webrtc/read_index.html index edaa489a11c1..55a2409652d7 100644 --- a/internal/servers/webrtc/read_index.html +++ b/internal/servers/webrtc/read_index.html @@ -122,6 +122,42 @@ return lines.join('\r\n'); }; +const enableMultichannelOpus = (section) => { + let lines = section.split('\r\n'); + + lines[0] += " 112"; + lines.splice(lines.length - 1, 0, "a=rtpmap:112 multiopus/48000/3"); + lines.splice(lines.length - 1, 0, "a=fmtp:112 channel_mapping=0,2,1;num_streams=2;coupled_streams=1"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:112 transport-cc"); + + lines[0] += " 113"; + lines.splice(lines.length - 1, 0, "a=rtpmap:113 multiopus/48000/4"); + lines.splice(lines.length - 1, 0, "a=fmtp:113 channel_mapping=0,1,2,3;num_streams=2;coupled_streams=2"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:113 transport-cc"); + + lines[0] += " 114"; + lines.splice(lines.length - 1, 0, "a=rtpmap:114 multiopus/48000/5"); + lines.splice(lines.length - 1, 0, "a=fmtp:114 channel_mapping=0,4,1,2,3;num_streams=3;coupled_streams=2"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:114 transport-cc"); + + lines[0] += " 115"; + lines.splice(lines.length - 1, 0, "a=rtpmap:115 multiopus/48000/6"); + lines.splice(lines.length - 1, 0, "a=fmtp:115 channel_mapping=0,4,1,2,3,5;num_streams=4;coupled_streams=2"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:115 transport-cc"); + + lines[0] += " 116"; + lines.splice(lines.length - 1, 0, "a=rtpmap:116 multiopus/48000/7"); + lines.splice(lines.length - 1, 0, "a=fmtp:116 channel_mapping=0,4,1,2,3,5,6;num_streams=4;coupled_streams=4"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:116 transport-cc"); + + lines[0] += " 117"; + lines.splice(lines.length - 1, 0, "a=rtpmap:117 multiopus/48000/8"); + lines.splice(lines.length - 1, 0, "a=fmtp:117 channel_mapping=0,6,1,4,5,2,3,7;num_streams=5;coupled_streams=4"); + lines.splice(lines.length - 1, 0, "a=rtcp-fb:117 transport-cc"); + + return lines.join('\r\n'); +}; + const enableStereoOpus = (section) => { let opusPayloadFormat = ''; let lines = section.split('\r\n'); @@ -162,6 +198,10 @@ sections[i] = enableStereoPcmau(sections[i]); } + if (nonAdvertisedCodecs.includes('multiopus/48000/6')) { + sections[i] = enableMultichannelOpus(sections[i]); + } + break; } } diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 4d582582063a..5a7fc416a56f 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -652,7 +652,7 @@ func (s *session) readRemoteCandidates(pc *webrtc.PeerConnection) { select { case req := <-s.chAddCandidates: for _, candidate := range req.candidates { - err := pc.AddRemoteCandidate(*candidate) + err := pc.AddRemoteCandidate(candidate) if err != nil { req.res <- webRTCAddSessionCandidatesRes{err: err} }