From 14774783505948ac6d6970640af750099d69a790 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 11 May 2023 00:06:10 +0200 Subject: [PATCH] support publishing with WebRTC (#1659) --- README.md | 10 + internal/core/hls_muxer.go | 4 +- internal/core/path.go | 2 + internal/core/path_manager.go | 13 +- internal/core/rtmp_conn.go | 22 +- internal/core/rtmp_source.go | 4 +- internal/core/rtsp_session.go | 4 +- internal/core/rtsp_source.go | 6 +- internal/core/udp_source.go | 4 +- internal/core/webrtc_conn.go | 908 +++++++----------- internal/core/webrtc_incoming_track.go | 104 ++ internal/core/webrtc_outgoing_track.go | 333 +++++++ internal/core/webrtc_pc.go | 193 ++++ internal/core/webrtc_publish_index.html | 199 ++++ ...brtc_index.html => webrtc_read_index.html} | 4 +- internal/core/webrtc_server.go | 94 +- internal/core/webrtc_server_test.go | 2 +- 17 files changed, 1274 insertions(+), 632 deletions(-) create mode 100644 internal/core/webrtc_incoming_track.go create mode 100644 internal/core/webrtc_outgoing_track.go create mode 100644 internal/core/webrtc_pc.go create mode 100644 internal/core/webrtc_publish_index.html rename internal/core/{webrtc_index.html => webrtc_read_index.html} (97%) diff --git a/README.md b/README.md index eb99c5db580..5d04b03eac3 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Live streams can be published to the server with: |RTMP servers and cameras|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-2 Audio (MP3)| |HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)| |UDP/MPEG-TS streams|Unicast, broadcast, multicast|H265, H264|Opus, MPEG-4 Audio (AAC)| +|WebRTC||VP8|Opus| |Raspberry Pi Cameras||H264|| And can be read from the server with: @@ -86,6 +87,7 @@ In the next months, the repository name and the Docker image name will be change * [From OBS Studio](#from-obs-studio) * [From OpenCV](#from-opencv) * [From a UDP stream](#from-a-udp-stream) + * [From the browser](#from-the-browser) * [Read from the server](#read-from-the-server) * [From VLC and Ubuntu](#from-vlc-and-ubuntu) * [RTSP protocol](#rtsp-protocol) @@ -800,6 +802,14 @@ paths: After starting the server, the stream can be reached on `rtsp://localhost:8554/udp`. +### From the browser + +Open the page into the browser: + +``` +http://localhost:8889/mystream/publish +``` + ## Read from the server ### From VLC and Ubuntu diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index f089d04d751..c0f04a19ac8 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -275,9 +275,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) m.path = res.path - defer func() { - m.path.readerRemove(pathReaderRemoveReq{author: m}) - }() + defer m.path.readerRemove(pathReaderRemoveReq{author: m}) m.ringBuffer, _ = ringbuffer.New(uint64(m.readBufferCount)) diff --git a/internal/core/path.go b/internal/core/path.go index a3927703050..fef552cc29c 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -90,6 +90,7 @@ type pathGetPathConfRes struct { type pathGetPathConfReq struct { name string + publish bool credentials authCredentials res chan pathGetPathConfRes } @@ -130,6 +131,7 @@ type pathPublisherAnnounceRes struct { type pathPublisherAddReq struct { author publisher pathName string + skipAuth bool credentials authCredentials res chan pathPublisherAnnounceRes } diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 31972bd6f0b..ce06f7bbed9 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -209,7 +209,8 @@ outer: continue } - err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.name, pathConf, false, req.credentials) + err = authenticate(pm.externalAuthenticationURL, pm.authMethods, + req.name, pathConf, req.publish, req.credentials) if err != nil { req.res <- pathGetPathConfRes{err: pathErrAuth{wrapped: err}} continue @@ -266,10 +267,12 @@ outer: continue } - err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials) - if err != nil { - req.res <- pathPublisherAnnounceRes{err: pathErrAuth{wrapped: err}} - continue + if !req.skipAuth { + err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials) + if err != nil { + req.res <- pathPublisherAnnounceRes{err: pathErrAuth{wrapped: err}} + continue + } } // create path if it doesn't exist diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index c0197c05745..f2c47f5212e 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -377,11 +377,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { return res.err } - path := res.path - - defer func() { - path.readerRemove(pathReaderRemoveReq{author: c}) - }() + defer res.path.readerRemove(pathReaderRemoveReq{author: c}) c.stateMutex.Lock() c.state = rtmpConnStateRead @@ -417,9 +413,9 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { defer res.stream.readerRemove(c) c.Log(logger.Info, "is reading from path '%s', %s", - path.name, sourceMediaInfo(medias)) + res.path.name, sourceMediaInfo(medias)) - pathConf := path.safeConf() + pathConf := res.path.safeConf() if pathConf.RunOnRead != "" { c.Log(logger.Info, "runOnRead command started") @@ -427,7 +423,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { c.externalCmdPool, pathConf.RunOnRead, pathConf.RunOnReadRestart, - path.externalCmdEnv(), + res.path.externalCmdEnv(), func(co int) { c.Log(logger.Info, "runOnRead command exited with code %d", co) }) @@ -733,11 +729,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { return res.err } - path := res.path - - defer func() { - path.publisherRemove(pathPublisherRemoveReq{author: c}) - }() + defer res.path.publisherRemove(pathPublisherRemoveReq{author: c}) c.stateMutex.Lock() c.state = rtmpConnStatePublish @@ -768,7 +760,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { medias = append(medias, audioMedia) } - rres := path.publisherStart(pathPublisherStartReq{ + rres := res.path.publisherStart(pathPublisherStartReq{ author: c, medias: medias, generateRTPPackets: true, @@ -778,7 +770,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { } c.Log(logger.Info, "is publishing to path '%s', %s", - path.name, + res.path.name, sourceMediaInfo(medias)) // disable write deadline to allow outgoing acknowledges diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 1a0ae55bd90..5acf0a6a6e6 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -148,9 +148,7 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) - defer func() { - s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) - }() + defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) videoWriteFunc := getRTMPWriteFunc(videoMedia, videoFormat, res.stream) audioWriteFunc := getRTMPWriteFunc(audioMedia, audioFormat, res.stream) diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 20344f37f06..2dc703c48c5 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -24,7 +24,7 @@ import ( type rtspWriteFunc func(*rtp.Packet) -func getRTSPWriteFunc(medi *media.Media, forma formats.Format, stream *stream) rtspWriteFunc { +func getRTPWriteFunc(medi *media.Media, forma formats.Format, stream *stream) rtspWriteFunc { switch forma.(type) { case *formats.H264: return func(pkt *rtp.Packet) { @@ -387,7 +387,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R for _, medi := range s.session.AnnouncedMedias() { for _, forma := range medi.Formats { - writeFunc := getRTSPWriteFunc(medi, forma, s.stream) + writeFunc := getRTPWriteFunc(medi, forma, s.stream) ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { writeFunc(pkt) diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index 3e59e3f43c3..2210a0ed3ed 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -131,13 +131,11 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) - defer func() { - s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) - }() + defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) for _, medi := range medias { for _, forma := range medi.Formats { - writeFunc := getRTSPWriteFunc(medi, forma, res.stream) + writeFunc := getRTPWriteFunc(medi, forma, res.stream) c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { writeFunc(pkt) diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index 9a2a75316fc..35550455cf3 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -304,9 +304,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan return res.err } - defer func() { - s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) - }() + defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) diff --git a/internal/core/webrtc_conn.go b/internal/core/webrtc_conn.go index 953dbc7b417..4a760ca5dc6 100644 --- a/internal/core/webrtc_conn.go +++ b/internal/core/webrtc_conn.go @@ -14,16 +14,11 @@ import ( "sync" "time" - "github.com/bluenviron/gortsplib/v3/pkg/formats" - "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpav1" - "github.com/bluenviron/gortsplib/v3/pkg/formats/rtph264" - "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpvp8" - "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpvp9" "github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/gortsplib/v3/pkg/ringbuffer" "github.com/google/uuid" "github.com/pion/ice/v2" - "github.com/pion/interceptor" + "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" "github.com/aler9/mediamtx/internal/formatprocessor" @@ -32,67 +27,58 @@ import ( ) const ( - webrtcHandshakeDeadline = 10 * time.Second - webrtcWsWriteDeadline = 2 * time.Second - webrtcPayloadMaxSize = 1188 // 1200 - 12 (RTP header) + webrtcHandshakeTimeout = 10 * time.Second + webrtcTrackGatherTimeout = 2 * time.Second + webrtcPayloadMaxSize = 1188 // 1200 - 12 (RTP header) ) -// newPeerConnection creates a PeerConnection with the default codecs and -// interceptors. See RegisterDefaultCodecs and RegisterDefaultInterceptors. -// -// This function is a copy of webrtc/peerconnection.go -// unlike the original one, allows you to add additional custom options -func newPeerConnection(configuration webrtc.Configuration, - options ...func(*webrtc.API), -) (*webrtc.PeerConnection, error) { - m := &webrtc.MediaEngine{} - - if err := m.RegisterDefaultCodecs(); err != nil { - return nil, err - } +type trackRecvPair struct { + track *webrtc.TrackRemote + receiver *webrtc.RTPReceiver +} - err := m.RegisterCodec(webrtc.RTPCodecParameters{ - RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeAV1, - ClockRate: 90000, - }, - PayloadType: 96, - }, - webrtc.RTPCodecTypeVideo) - if err != nil { - return nil, err +func mediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) media.Medias { + ret := make(media.Medias, len(tracks)) + for i, track := range tracks { + ret[i] = track.media } + return ret +} - i := &interceptor.Registry{} - if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { - return nil, err +func mediasOfIncomingTracks(tracks []*webRTCIncomingTrack) media.Medias { + ret := make(media.Medias, len(tracks)) + for i, track := range tracks { + ret[i] = track.media } - - options = append(options, webrtc.WithMediaEngine(m)) - options = append(options, webrtc.WithInterceptorRegistry(i)) - - api := webrtc.NewAPI(options...) - return api.NewPeerConnection(configuration) + return ret } -type webRTCTrack struct { - media *media.Media - format formats.Format - webRTCTrack *webrtc.TrackLocalStaticRTP - cb func(formatprocessor.Unit, context.Context, chan error) -} +func insertTias(offer *webrtc.SessionDescription) { + var sd sdp.SessionDescription + err := sd.Unmarshal([]byte(offer.SDP)) + if err != nil { + return + } -func gatherMedias(tracks []*webRTCTrack) media.Medias { - var ret media.Medias + for _, media := range sd.MediaDescriptions { + if media.MediaName.Media == "video" { + media.Bandwidth = append(media.Bandwidth, sdp.Bandwidth{ + Type: "TIAS", + Bandwidth: 40000000, + }) + } + } - for _, track := range tracks { - ret = append(ret, track.media) + enc, err := sd.Marshal() + if err != nil { + return } - return ret + offer.SDP = string(enc) } type webRTCConnPathManager interface { + publisherAdd(req pathPublisherAddReq) pathPublisherAnnounceRes readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes } @@ -104,7 +90,8 @@ type webRTCConnParent interface { type webRTCConn struct { readBufferCount int pathName string - wsconn *websocket.ServerConn + publish bool + ws *websocket.ServerConn iceServers []string wg *sync.WaitGroup pathManager webRTCConnPathManager @@ -117,7 +104,7 @@ type webRTCConn struct { ctxCancel func() uuid uuid.UUID created time.Time - curPC *webrtc.PeerConnection + pc *peerConnection mutex sync.RWMutex closed chan struct{} @@ -127,7 +114,8 @@ func newWebRTCConn( parentCtx context.Context, readBufferCount int, pathName string, - wsconn *websocket.ServerConn, + publish bool, + ws *websocket.ServerConn, iceServers []string, wg *sync.WaitGroup, pathManager webRTCConnPathManager, @@ -141,7 +129,8 @@ func newWebRTCConn( c := &webRTCConn{ readBufferCount: readBufferCount, pathName: pathName, - wsconn: wsconn, + publish: publish, + ws: ws, iceServers: iceServers, wg: wg, pathManager: pathManager, @@ -173,100 +162,17 @@ func (c *webRTCConn) wait() { } func (c *webRTCConn) remoteAddr() net.Addr { - return c.wsconn.RemoteAddr() -} - -func (c *webRTCConn) peerConnectionEstablished() bool { - c.mutex.RLock() - defer c.mutex.RUnlock() - - return c.curPC != nil -} - -func (c *webRTCConn) localCandidate() string { - c.mutex.RLock() - defer c.mutex.RUnlock() - - if c.curPC != nil { - var cid string - for _, stats := range c.curPC.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { - cid = tstats.LocalCandidateID - break - } - } - - if cid != "" { - for _, stats := range c.curPC.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { - return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + - tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) - } - } - } - } - return "" -} - -func (c *webRTCConn) remoteCandidate() string { - c.mutex.RLock() - defer c.mutex.RUnlock() - - if c.curPC != nil { - var cid string - for _, stats := range c.curPC.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { - cid = tstats.RemoteCandidateID - break - } - } - - if cid != "" { - for _, stats := range c.curPC.GetStats() { - if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { - return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + - tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) - } - } - } - } - return "" + return c.ws.RemoteAddr() } -func (c *webRTCConn) bytesReceived() uint64 { +func (c *webRTCConn) safePC() *peerConnection { c.mutex.RLock() defer c.mutex.RUnlock() - - if c.curPC != nil { - for _, stats := range c.curPC.GetStats() { - if tstats, ok := stats.(webrtc.TransportStats); ok { - if tstats.ID == "iceTransport" { - return tstats.BytesReceived - } - } - } - } - return 0 -} - -func (c *webRTCConn) bytesSent() uint64 { - c.mutex.RLock() - defer c.mutex.RUnlock() - - if c.curPC != nil { - for _, stats := range c.curPC.GetStats() { - if tstats, ok := stats.(webrtc.TransportStats); ok { - if tstats.ID == "iceTransport" { - return tstats.BytesSent - } - } - } - } - return 0 + return c.pc } func (c *webRTCConn) Log(level logger.Level, format string, args ...interface{}) { - c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.wsconn.RemoteAddr()}, args...)...) + c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.ws.RemoteAddr()}, args...)...) } func (c *webRTCConn) run() { @@ -298,7 +204,14 @@ func (c *webRTCConn) run() { } func (c *webRTCConn) runInner(ctx context.Context) error { - res := c.pathManager.readerAdd(pathReaderAddReq{ + if c.publish { + return c.runPublish(ctx) + } + return c.runRead(ctx) +} + +func (c *webRTCConn) runPublish(ctx context.Context) error { + res := c.pathManager.publisherAdd(pathPublisherAddReq{ author: c, pathName: c.pathName, skipAuth: true, @@ -307,132 +220,162 @@ func (c *webRTCConn) runInner(ctx context.Context) error { return res.err } - path := res.path - - defer func() { - path.readerRemove(pathReaderRemoveReq{author: c}) - }() - - var tracks []*webRTCTrack + defer res.path.publisherRemove(pathPublisherRemoveReq{author: c}) - videoTrack, err := c.createVideoTrack(res.stream.medias()) + err := c.writeICEServers() if err != nil { return err } - if videoTrack != nil { - tracks = append(tracks, videoTrack) + pc, err := newPeerConnection( + c.genICEServers(), + c.iceHostNAT1To1IPs, + c.iceUDPMux, + c.iceTCPMux, + c) + if err != nil { + return err } + defer pc.close() - audioTrack, err := c.createAudioTrack(res.stream.medias()) + _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) if err != nil { return err } - if audioTrack != nil { - tracks = append(tracks, audioTrack) + _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionRecvonly, + }) + if err != nil { + return err } - if tracks == nil { - return fmt.Errorf( - "the stream doesn't contain any supported codec, which are currently H264, VP8, VP9, G711, G722, Opus") - } + trackRecv := make(chan trackRecvPair) + + pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + select { + case trackRecv <- trackRecvPair{track, receiver}: + case <-pc.closed: + } + }) - err = c.wsconn.WriteJSON(c.genICEServers()) + offer, err := pc.CreateOffer(nil) if err != nil { return err } - offer, err := c.readOffer() + err = pc.SetLocalDescription(offer) if err != nil { return err } - configuration := webrtc.Configuration{ICEServers: c.genICEServers()} - settingsEngine := webrtc.SettingEngine{} + insertTias(&offer) - if len(c.iceHostNAT1To1IPs) != 0 { - settingsEngine.SetNAT1To1IPs(c.iceHostNAT1To1IPs, webrtc.ICECandidateTypeHost) + err = c.writeOffer(&offer) + if err != nil { + return err } - if c.iceUDPMux != nil { - settingsEngine.SetICEUDPMux(c.iceUDPMux) + answer, err := c.readAnswer() + if err != nil { + return err } - if c.iceTCPMux != nil { - settingsEngine.SetICETCPMux(c.iceTCPMux) - settingsEngine.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4}) + err = pc.SetRemoteDescription(*answer) + if err != nil { + return err } - pc, err := newPeerConnection(configuration, webrtc.WithSettingEngine(settingsEngine)) + wsReadError := make(chan error) + + err = c.establishConnection(ctx, pc, wsReadError) if err != nil { return err } - pcConnected := make(chan struct{}) - pcDisconnected := make(chan struct{}) - pcClosed := make(chan struct{}) - var stateChangeMutex sync.Mutex + tracks, err := c.gatherTracks(ctx, pc, wsReadError, trackRecv) + if err != nil { + return err + } + medias := mediasOfIncomingTracks(tracks) - pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - stateChangeMutex.Lock() - defer stateChangeMutex.Unlock() + rres := res.path.publisherStart(pathPublisherStartReq{ + author: c, + medias: medias, + generateRTPPackets: false, + }) + if rres.err != nil { + return rres.err + } - select { - case <-pcClosed: - return - default: - } + c.Log(logger.Info, "is publishing to path '%s', %s", + res.path.name, + sourceMediaInfo(medias)) - c.Log(logger.Debug, "peer connection state: "+state.String()) + for _, track := range tracks { + track.start(rres.stream) + } - switch state { - case webrtc.PeerConnectionStateConnected: - close(pcConnected) + select { + case <-pc.disconnected: + return fmt.Errorf("peer connection closed") - case webrtc.PeerConnectionStateDisconnected: - close(pcDisconnected) + case err := <-wsReadError: + return fmt.Errorf("websocket error: %v", err) - case webrtc.PeerConnectionStateClosed: - close(pcClosed) - } + case <-ctx.Done(): + return fmt.Errorf("terminated") + } +} + +func (c *webRTCConn) runRead(ctx context.Context) error { + res := c.pathManager.readerAdd(pathReaderAddReq{ + author: c, + pathName: c.pathName, + skipAuth: true, }) + if res.err != nil { + return res.err + } - defer func() { - pc.Close() - <-pcClosed - }() + defer res.path.readerRemove(pathReaderRemoveReq{author: c}) + + tracks, err := c.findTracks(res.stream.medias()) + if err != nil { + return err + } + + err = c.writeICEServers() + if err != nil { + return err + } + + offer, err := c.readOffer() + if err != nil { + return err + } + + pc, err := newPeerConnection( + c.genICEServers(), + c.iceHostNAT1To1IPs, + c.iceUDPMux, + c.iceTCPMux, + c) + if err != nil { + return err + } + defer pc.close() for _, track := range tracks { - rtpSender, err := pc.AddTrack(track.webRTCTrack) + var err error + track.sender, err = pc.AddTrack(track.track) if err != nil { return err } - - // read incoming RTCP packets in order to make interceptors work - go func() { - buf := make([]byte, 1500) - for { - _, _, err := rtpSender.Read(buf) - if err != nil { - return - } - } - }() } - localCandidate := make(chan *webrtc.ICECandidateInit) - - pc.OnICECandidate(func(i *webrtc.ICECandidate) { - if i != nil { - v := i.ToJSON() - select { - case localCandidate <- &v: - case <-pcConnected: - case <-ctx.Done(): - } - } - }) err = pc.SetRemoteDescription(*offer) if err != nil { return err @@ -448,79 +391,21 @@ func (c *webRTCConn) runInner(ctx context.Context) error { return err } - err = c.wsconn.WriteJSON(&answer) + err = c.writeAnswer(&answer) if err != nil { return err } wsReadError := make(chan error) - remoteCandidate := make(chan *webrtc.ICECandidateInit) - go func() { - for { - candidate, err := c.readCandidate() - if err != nil { - select { - case wsReadError <- err: - case <-ctx.Done(): - } - return - } - - select { - case remoteCandidate <- candidate: - case <-pcConnected: - case <-ctx.Done(): - } - } - }() - - t := time.NewTimer(webrtcHandshakeDeadline) - defer t.Stop() - -outer: - for { - select { - case candidate := <-localCandidate: - c.Log(logger.Debug, "local candidate: %+v", candidate.Candidate) - err := c.wsconn.WriteJSON(candidate) - if err != nil { - return err - } - - case candidate := <-remoteCandidate: - c.Log(logger.Debug, "remote candidate: %+v", candidate.Candidate) - err := pc.AddICECandidate(*candidate) - if err != nil { - return err - } - - case err := <-wsReadError: - return err - - case <-t.C: - return fmt.Errorf("deadline exceeded") - - case <-pcConnected: - break outer - - case <-ctx.Done(): - return fmt.Errorf("terminated") - } + err = c.establishConnection(ctx, pc, wsReadError) + if err != nil { + return err } - // Keep WebSocket connection open and use it to notify shutdowns. - // This is because pion/webrtc doesn't write yet a WebRTC shutdown - // message to clients (like a DTLS close alert or a RTCP BYE), - // therefore browsers do not properly detect shutdowns and do not - // attempt to restart the connection immediately. - - c.mutex.Lock() - c.curPC = pc - c.mutex.Unlock() - - c.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", - c.localCandidate(), c.remoteCandidate()) + for _, track := range tracks { + track.start() + } ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount)) defer ringBuffer.Close() @@ -538,7 +423,7 @@ outer: defer res.stream.readerRemove(c) c.Log(logger.Info, "is reading from path '%s', %s", - path.name, sourceMediaInfo(gatherMedias(tracks))) + res.path.name, sourceMediaInfo(mediasOfOutgoingTracks(tracks))) go func() { for { @@ -551,7 +436,7 @@ outer: }() select { - case <-pcDisconnected: + case <-pc.disconnected: return fmt.Errorf("peer connection closed") case err := <-wsReadError: @@ -565,300 +450,72 @@ outer: } } -func (c *webRTCConn) createVideoTrack(medias media.Medias) (*webRTCTrack, error) { - var av1Format *formats.AV1 - av1Media := medias.FindFormat(&av1Format) - - if av1Format != nil { - webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeAV1, - ClockRate: 90000, - }, - "av1", - "rtspss", - ) - if err != nil { - return nil, err - } - - encoder := &rtpav1.Encoder{ - PayloadType: 96, - PayloadMaxSize: webrtcPayloadMaxSize, - } - encoder.Init() - - return &webRTCTrack{ - media: av1Media, - format: av1Format, - webRTCTrack: webRTCTrak, - cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { - tunit := unit.(*formatprocessor.UnitAV1) +func (c *webRTCConn) findTracks(medias media.Medias) ([]*webRTCOutgoingTrack, error) { + var tracks []*webRTCOutgoingTrack - if tunit.OBUs == nil { - return - } - - packets, err := encoder.Encode(tunit.OBUs, tunit.PTS) - if err != nil { - panic(err) - } - - for _, pkt := range packets { - webRTCTrak.WriteRTP(pkt) - } - }, - }, nil - } - - var vp9Format *formats.VP9 - vp9Media := medias.FindFormat(&vp9Format) - - if vp9Format != nil { - webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeVP9, - ClockRate: uint32(vp9Format.ClockRate()), - }, - "vp9", - "rtspss", - ) - if err != nil { - return nil, err - } - - encoder := &rtpvp9.Encoder{ - PayloadType: 96, - PayloadMaxSize: webrtcPayloadMaxSize, - } - encoder.Init() - - return &webRTCTrack{ - media: vp9Media, - format: vp9Format, - webRTCTrack: webRTCTrak, - cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { - tunit := unit.(*formatprocessor.UnitVP9) - - if tunit.Frame == nil { - return - } - - packets, err := encoder.Encode(tunit.Frame, tunit.PTS) - if err != nil { - return - } - - for _, pkt := range packets { - webRTCTrak.WriteRTP(pkt) - } - }, - }, nil - } - - var vp8Format *formats.VP8 - vp8Media := medias.FindFormat(&vp8Format) - - if vp8Format != nil { - webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeVP8, - ClockRate: uint32(vp8Format.ClockRate()), - }, - "vp8", - "rtspss", - ) - if err != nil { - return nil, err - } - - encoder := &rtpvp8.Encoder{ - PayloadType: 96, - PayloadMaxSize: webrtcPayloadMaxSize, - } - encoder.Init() - - return &webRTCTrack{ - media: vp8Media, - format: vp8Format, - webRTCTrack: webRTCTrak, - cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { - tunit := unit.(*formatprocessor.UnitVP8) - - if tunit.Frame == nil { - return - } - - packets, err := encoder.Encode(tunit.Frame, tunit.PTS) - if err != nil { - return - } - - for _, pkt := range packets { - webRTCTrak.WriteRTP(pkt) - } - }, - }, nil - } - - var h264Format *formats.H264 - h264Media := medias.FindFormat(&h264Format) - - if h264Format != nil { - webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeH264, - ClockRate: uint32(h264Format.ClockRate()), - }, - "h264", - "rtspss", - ) - if err != nil { - return nil, err - } - - encoder := &rtph264.Encoder{ - PayloadType: 96, - PayloadMaxSize: webrtcPayloadMaxSize, - } - encoder.Init() - - var lastPTS time.Duration - firstNALUReceived := false - - return &webRTCTrack{ - media: h264Media, - format: h264Format, - webRTCTrack: webRTCTrak, - cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { - tunit := unit.(*formatprocessor.UnitH264) + videoTrack, err := newWebRTCOutgoingTrackVideo(medias) + if err != nil { + return nil, err + } - if tunit.AU == nil { - return - } + if videoTrack != nil { + tracks = append(tracks, videoTrack) + } - if !firstNALUReceived { - firstNALUReceived = true - lastPTS = tunit.PTS - } else { - if tunit.PTS < lastPTS { - select { - case writeError <- fmt.Errorf("WebRTC doesn't support H264 streams with B-frames"): - case <-ctx.Done(): - } - return - } - lastPTS = tunit.PTS - } + audioTrack, err := newWebRTCOutgoingTrackAudio(medias) + if err != nil { + return nil, err + } - packets, err := encoder.Encode(tunit.AU, tunit.PTS) - if err != nil { - return - } + if audioTrack != nil { + tracks = append(tracks, audioTrack) + } - for _, pkt := range packets { - webRTCTrak.WriteRTP(pkt) - } - }, - }, nil + if tracks == nil { + return nil, fmt.Errorf( + "the stream doesn't contain any supported codec, which are currently H264, VP8, VP9, G711, G722, Opus") } - return nil, nil + return tracks, nil } -func (c *webRTCConn) createAudioTrack(medias media.Medias) (*webRTCTrack, error) { - var opusFormat *formats.Opus - opusMedia := medias.FindFormat(&opusFormat) - - if opusFormat != nil { - webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeOpus, - ClockRate: uint32(opusFormat.ClockRate()), - }, - "opus", - "rtspss", - ) - if err != nil { - return nil, err - } +func (c *webRTCConn) gatherTracks( + ctx context.Context, + pc *peerConnection, + wsReadError chan error, + trackRecv chan trackRecvPair, +) ([]*webRTCIncomingTrack, error) { + var tracks []*webRTCIncomingTrack - return &webRTCTrack{ - media: opusMedia, - format: opusFormat, - webRTCTrack: webRTCTrak, - cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { - for _, pkt := range unit.GetRTPPackets() { - webRTCTrak.WriteRTP(pkt) - } - }, - }, nil - } - - var g722Format *formats.G722 - g722Media := medias.FindFormat(&g722Format) - - if g722Format != nil { - webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeG722, - ClockRate: uint32(g722Format.ClockRate()), - }, - "g722", - "rtspss", - ) - if err != nil { - return nil, err - } + t := time.NewTimer(webrtcTrackGatherTimeout) + defer t.Stop() - return &webRTCTrack{ - media: g722Media, - format: g722Format, - webRTCTrack: webRTCTrak, - cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { - for _, pkt := range unit.GetRTPPackets() { - webRTCTrak.WriteRTP(pkt) - } - }, - }, nil - } + for { + select { + case <-t.C: + return tracks, nil - var g711Format *formats.G711 - g711Media := medias.FindFormat(&g711Format) + case pair := <-trackRecv: + track, err := newWebRTCIncomingTrack(pair.track, pair.receiver, pc.WriteRTCP) + if err != nil { + return nil, err + } + tracks = append(tracks, track) - if g711Format != nil { - var mtyp string - if g711Format.MULaw { - mtyp = webrtc.MimeTypePCMU - } else { - mtyp = webrtc.MimeTypePCMA - } + if len(tracks) == 2 { + return tracks, nil + } - webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( - webrtc.RTPCodecCapability{ - MimeType: mtyp, - ClockRate: uint32(g711Format.ClockRate()), - }, - "g711", - "rtspss", - ) - if err != nil { - return nil, err - } + case <-pc.disconnected: + return nil, fmt.Errorf("peer connection closed") - return &webRTCTrack{ - media: g711Media, - format: g711Format, - webRTCTrack: webRTCTrak, - cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { - for _, pkt := range unit.GetRTPPackets() { - webRTCTrak.WriteRTP(pkt) - } - }, - }, nil - } + case err := <-wsReadError: + return nil, fmt.Errorf("websocket error: %v", err) - return nil, nil + case <-ctx.Done(): + return nil, fmt.Errorf("terminated") + } + } } func (c *webRTCConn) genICEServers() []webrtc.ICEServer { @@ -904,9 +561,71 @@ func (c *webRTCConn) genICEServers() []webrtc.ICEServer { return ret } +func (c *webRTCConn) establishConnection( + ctx context.Context, + pc *peerConnection, + wsReadError chan error, +) error { + remoteCandidate := make(chan *webrtc.ICECandidateInit) + go c.readCandidates(ctx, pc.connected, wsReadError, remoteCandidate) + + t := time.NewTimer(webrtcHandshakeTimeout) + defer t.Stop() + +outer: + for { + select { + case candidate := <-pc.localCandidateRecv: + c.Log(logger.Debug, "local candidate: %+v", candidate.Candidate) + err := c.ws.WriteJSON(candidate) + if err != nil { + return err + } + + case candidate := <-remoteCandidate: + c.Log(logger.Debug, "remote candidate: %+v", candidate.Candidate) + err := pc.AddICECandidate(*candidate) + if err != nil { + return err + } + + case err := <-wsReadError: + return err + + case <-t.C: + return fmt.Errorf("deadline exceeded") + + case <-pc.connected: + break outer + + case <-ctx.Done(): + return fmt.Errorf("terminated") + } + } + + // Keep WebSocket connection open and use it to notify shutdowns. + // This is because pion/webrtc doesn't write yet a WebRTC shutdown + // message to clients (like a DTLS close alert or a RTCP BYE), + // therefore browsers do not properly detect shutdowns and do not + // attempt to restart the connection immediately. + + c.mutex.Lock() + c.pc = pc + c.mutex.Unlock() + + c.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", + pc.localCandidate(), pc.remoteCandidate()) + + return nil +} + +func (c *webRTCConn) writeICEServers() error { + return c.ws.WriteJSON(c.genICEServers()) +} + func (c *webRTCConn) readOffer() (*webrtc.SessionDescription, error) { var offer webrtc.SessionDescription - err := c.wsconn.ReadJSON(&offer) + err := c.ws.ReadJSON(&offer) if err != nil { return nil, err } @@ -918,9 +637,55 @@ func (c *webRTCConn) readOffer() (*webrtc.SessionDescription, error) { return &offer, nil } +func (c *webRTCConn) writeOffer(offer *webrtc.SessionDescription) error { + return c.ws.WriteJSON(offer) +} + +func (c *webRTCConn) readAnswer() (*webrtc.SessionDescription, error) { + var answer webrtc.SessionDescription + err := c.ws.ReadJSON(&answer) + if err != nil { + return nil, err + } + + if answer.Type != webrtc.SDPTypeAnswer { + return nil, fmt.Errorf("received SDP is not an offer") + } + + return &answer, nil +} + +func (c *webRTCConn) writeAnswer(answer *webrtc.SessionDescription) error { + return c.ws.WriteJSON(answer) +} + +func (c *webRTCConn) readCandidates( + ctx context.Context, + pcConnected chan struct{}, + wsReadError chan error, + remoteCandidate chan *webrtc.ICECandidateInit, +) { + for { + candidate, err := c.readCandidate() + if err != nil { + select { + case wsReadError <- err: + case <-ctx.Done(): + } + return + } + + select { + case remoteCandidate <- candidate: + case <-pcConnected: + case <-ctx.Done(): + } + } +} + func (c *webRTCConn) readCandidate() (*webrtc.ICECandidateInit, error) { var candidate webrtc.ICECandidateInit - err := c.wsconn.ReadJSON(&candidate) + err := c.ws.ReadJSON(&candidate) if err != nil { return nil, err } @@ -928,6 +693,11 @@ func (c *webRTCConn) readCandidate() (*webrtc.ICECandidateInit, error) { return &candidate, err } +// apiSourceDescribe implements sourceStaticImpl. +func (c *webRTCConn) apiSourceDescribe() interface{} { + return c.apiReaderDescribe() +} + // apiReaderDescribe implements reader. func (c *webRTCConn) apiReaderDescribe() interface{} { return struct { diff --git a/internal/core/webrtc_incoming_track.go b/internal/core/webrtc_incoming_track.go new file mode 100644 index 00000000000..e468d065f11 --- /dev/null +++ b/internal/core/webrtc_incoming_track.go @@ -0,0 +1,104 @@ +package core + +import ( + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v3/pkg/formats" + "github.com/bluenviron/gortsplib/v3/pkg/media" + "github.com/pion/rtcp" + "github.com/pion/webrtc/v3" +) + +const ( + keyFrameInterval = 2 * time.Second +) + +type webRTCIncomingTrack struct { + track *webrtc.TrackRemote + receiver *webrtc.RTPReceiver + writeRTCP func([]rtcp.Packet) error + + mediaType media.Type + format formats.Format + media *media.Media +} + +func newWebRTCIncomingTrack( + track *webrtc.TrackRemote, + receiver *webrtc.RTPReceiver, + writeRTCP func([]rtcp.Packet) error, +) (*webRTCIncomingTrack, error) { + t := &webRTCIncomingTrack{ + track: track, + receiver: receiver, + writeRTCP: writeRTCP, + } + + switch track.Codec().MimeType { + case webrtc.MimeTypeVP8: + t.mediaType = media.TypeVideo + t.format = &formats.VP8{ + PayloadTyp: uint8(track.PayloadType()), + } + + case webrtc.MimeTypeOpus: + t.mediaType = media.TypeAudio + t.format = &formats.Opus{ + PayloadTyp: uint8(track.PayloadType()), + } + + default: + return nil, fmt.Errorf("unsupported codec: %v", track.Codec()) + } + + t.media = &media.Media{ + Type: t.mediaType, + Formats: []formats.Format{t.format}, + } + + return t, nil +} + +func (t *webRTCIncomingTrack) start(stream *stream) { + writeFunc := getRTPWriteFunc(t.media, t.format, stream) + + go func() { + for { + pkt, _, err := t.track.ReadRTP() + if err != nil { + return + } + + writeFunc(pkt) + } + }() + + // read incoming RTCP packets to make interceptors work + go func() { + buf := make([]byte, 1500) + for { + _, _, err := t.receiver.Read(buf) + if err != nil { + return + } + } + }() + + if t.mediaType == media.TypeVideo { + go func() { + keyframeTicker := time.NewTicker(keyFrameInterval) + + for range keyframeTicker.C { + err := t.writeRTCP([]rtcp.Packet{ + &rtcp.PictureLossIndication{ + MediaSSRC: uint32(t.track.SSRC()), + }, + }) + if err != nil { + return + } + } + }() + } +} diff --git a/internal/core/webrtc_outgoing_track.go b/internal/core/webrtc_outgoing_track.go new file mode 100644 index 00000000000..3aecf5545b9 --- /dev/null +++ b/internal/core/webrtc_outgoing_track.go @@ -0,0 +1,333 @@ +package core + +import ( + "context" + "fmt" + "time" + + "github.com/aler9/mediamtx/internal/formatprocessor" + "github.com/bluenviron/gortsplib/v3/pkg/formats" + "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpav1" + "github.com/bluenviron/gortsplib/v3/pkg/formats/rtph264" + "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpvp8" + "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpvp9" + "github.com/bluenviron/gortsplib/v3/pkg/media" + "github.com/pion/webrtc/v3" +) + +type webRTCOutgoingTrack struct { + sender *webrtc.RTPSender + media *media.Media + format formats.Format + track *webrtc.TrackLocalStaticRTP + cb func(formatprocessor.Unit, context.Context, chan error) +} + +func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, error) { + var av1Format *formats.AV1 + av1Media := medias.FindFormat(&av1Format) + + if av1Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeAV1, + ClockRate: 90000, + }, + "av1", + "rtspss", + ) + if err != nil { + return nil, err + } + + encoder := &rtpav1.Encoder{ + PayloadType: 105, + PayloadMaxSize: webrtcPayloadMaxSize, + } + encoder.Init() + + return &webRTCOutgoingTrack{ + media: av1Media, + format: av1Format, + track: webRTCTrak, + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + tunit := unit.(*formatprocessor.UnitAV1) + + if tunit.OBUs == nil { + return + } + + packets, err := encoder.Encode(tunit.OBUs, tunit.PTS) + if err != nil { + return + } + + for _, pkt := range packets { + webRTCTrak.WriteRTP(pkt) + } + }, + }, nil + } + + var vp9Format *formats.VP9 + vp9Media := medias.FindFormat(&vp9Format) + + if vp9Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: uint32(vp9Format.ClockRate()), + }, + "vp9", + "rtspss", + ) + if err != nil { + return nil, err + } + + encoder := &rtpvp9.Encoder{ + PayloadType: 96, + PayloadMaxSize: webrtcPayloadMaxSize, + } + encoder.Init() + + return &webRTCOutgoingTrack{ + media: vp9Media, + format: vp9Format, + track: webRTCTrak, + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + tunit := unit.(*formatprocessor.UnitVP9) + + if tunit.Frame == nil { + return + } + + packets, err := encoder.Encode(tunit.Frame, tunit.PTS) + if err != nil { + return + } + + for _, pkt := range packets { + webRTCTrak.WriteRTP(pkt) + } + }, + }, nil + } + + var vp8Format *formats.VP8 + vp8Media := medias.FindFormat(&vp8Format) + + if vp8Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, + ClockRate: uint32(vp8Format.ClockRate()), + }, + "vp8", + "rtspss", + ) + if err != nil { + return nil, err + } + + encoder := &rtpvp8.Encoder{ + PayloadType: 96, + PayloadMaxSize: webrtcPayloadMaxSize, + } + encoder.Init() + + return &webRTCOutgoingTrack{ + media: vp8Media, + format: vp8Format, + track: webRTCTrak, + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + tunit := unit.(*formatprocessor.UnitVP8) + + if tunit.Frame == nil { + return + } + + packets, err := encoder.Encode(tunit.Frame, tunit.PTS) + if err != nil { + return + } + + for _, pkt := range packets { + webRTCTrak.WriteRTP(pkt) + } + }, + }, nil + } + + var h264Format *formats.H264 + h264Media := medias.FindFormat(&h264Format) + + if h264Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: uint32(h264Format.ClockRate()), + }, + "h264", + "rtspss", + ) + if err != nil { + return nil, err + } + + encoder := &rtph264.Encoder{ + PayloadType: 96, + PayloadMaxSize: webrtcPayloadMaxSize, + } + encoder.Init() + + var lastPTS time.Duration + firstNALUReceived := false + + return &webRTCOutgoingTrack{ + media: h264Media, + format: h264Format, + track: webRTCTrak, + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + tunit := unit.(*formatprocessor.UnitH264) + + if tunit.AU == nil { + return + } + + if !firstNALUReceived { + firstNALUReceived = true + lastPTS = tunit.PTS + } else { + if tunit.PTS < lastPTS { + select { + case writeError <- fmt.Errorf("WebRTC doesn't support H264 streams with B-frames"): + case <-ctx.Done(): + } + return + } + lastPTS = tunit.PTS + } + + packets, err := encoder.Encode(tunit.AU, tunit.PTS) + if err != nil { + return + } + + for _, pkt := range packets { + webRTCTrak.WriteRTP(pkt) + } + }, + }, nil + } + + return nil, nil +} + +func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, error) { + var opusFormat *formats.Opus + opusMedia := medias.FindFormat(&opusFormat) + + if opusFormat != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: uint32(opusFormat.ClockRate()), + }, + "opus", + "rtspss", + ) + if err != nil { + return nil, err + } + + return &webRTCOutgoingTrack{ + media: opusMedia, + format: opusFormat, + track: webRTCTrak, + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + for _, pkt := range unit.GetRTPPackets() { + webRTCTrak.WriteRTP(pkt) + } + }, + }, nil + } + + var g722Format *formats.G722 + g722Media := medias.FindFormat(&g722Format) + + if g722Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeG722, + ClockRate: uint32(g722Format.ClockRate()), + }, + "g722", + "rtspss", + ) + if err != nil { + return nil, err + } + + return &webRTCOutgoingTrack{ + media: g722Media, + format: g722Format, + track: webRTCTrak, + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + for _, pkt := range unit.GetRTPPackets() { + webRTCTrak.WriteRTP(pkt) + } + }, + }, nil + } + + var g711Format *formats.G711 + g711Media := medias.FindFormat(&g711Format) + + if g711Format != nil { + var mtyp string + if g711Format.MULaw { + mtyp = webrtc.MimeTypePCMU + } else { + mtyp = webrtc.MimeTypePCMA + } + + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: mtyp, + ClockRate: uint32(g711Format.ClockRate()), + }, + "g711", + "rtspss", + ) + if err != nil { + return nil, err + } + + return &webRTCOutgoingTrack{ + media: g711Media, + format: g711Format, + track: webRTCTrak, + cb: func(unit formatprocessor.Unit, ctx context.Context, writeError chan error) { + for _, pkt := range unit.GetRTPPackets() { + webRTCTrak.WriteRTP(pkt) + } + }, + }, nil + } + + return nil, nil +} + +func (t *webRTCOutgoingTrack) start() { + // read incoming RTCP packets to make interceptors work + go func() { + buf := make([]byte, 1500) + for { + _, _, err := t.sender.Read(buf) + if err != nil { + return + } + } + }() +} diff --git a/internal/core/webrtc_pc.go b/internal/core/webrtc_pc.go new file mode 100644 index 00000000000..344df689fe2 --- /dev/null +++ b/internal/core/webrtc_pc.go @@ -0,0 +1,193 @@ +package core + +import ( + "strconv" + "sync" + + "github.com/pion/ice/v2" + "github.com/pion/interceptor" + "github.com/pion/webrtc/v3" + + "github.com/aler9/mediamtx/internal/logger" +) + +type peerConnection struct { + *webrtc.PeerConnection + stateChangeMutex sync.Mutex + localCandidateRecv chan *webrtc.ICECandidateInit + connected chan struct{} + disconnected chan struct{} + closed chan struct{} +} + +func newPeerConnection( + iceServers []webrtc.ICEServer, + iceHostNAT1To1IPs []string, + iceUDPMux ice.UDPMux, + iceTCPMux ice.TCPMux, + log logger.Writer, +) (*peerConnection, error) { + configuration := webrtc.Configuration{ICEServers: iceServers} + settingsEngine := webrtc.SettingEngine{} + + if len(iceHostNAT1To1IPs) != 0 { + settingsEngine.SetNAT1To1IPs(iceHostNAT1To1IPs, webrtc.ICECandidateTypeHost) + } + + if iceUDPMux != nil { + settingsEngine.SetICEUDPMux(iceUDPMux) + } + + if iceTCPMux != nil { + settingsEngine.SetICETCPMux(iceTCPMux) + settingsEngine.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4}) + } + + mediaEngine := &webrtc.MediaEngine{} + + if err := mediaEngine.RegisterDefaultCodecs(); err != nil { + return nil, err + } + + err := mediaEngine.RegisterCodec( + webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeAV1, + ClockRate: 90000, + }, + PayloadType: 105, + }, + webrtc.RTPCodecTypeVideo) + if err != nil { + return nil, err + } + + interceptorRegistry := &interceptor.Registry{} + if err := webrtc.RegisterDefaultInterceptors(mediaEngine, interceptorRegistry); err != nil { + return nil, err + } + + api := webrtc.NewAPI( + webrtc.WithSettingEngine(settingsEngine), + webrtc.WithMediaEngine(mediaEngine), + webrtc.WithInterceptorRegistry(interceptorRegistry)) + + pc, err := api.NewPeerConnection(configuration) + if err != nil { + return nil, err + } + + co := &peerConnection{ + PeerConnection: pc, + localCandidateRecv: make(chan *webrtc.ICECandidateInit), + connected: make(chan struct{}), + disconnected: make(chan struct{}), + closed: make(chan struct{}), + } + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + co.stateChangeMutex.Lock() + defer co.stateChangeMutex.Unlock() + + select { + case <-co.closed: + return + default: + } + + log.Log(logger.Debug, "peer connection state: "+state.String()) + + switch state { + case webrtc.PeerConnectionStateConnected: + close(co.connected) + + case webrtc.PeerConnectionStateDisconnected: + close(co.disconnected) + + case webrtc.PeerConnectionStateClosed: + close(co.closed) + } + }) + + pc.OnICECandidate(func(i *webrtc.ICECandidate) { + if i != nil { + v := i.ToJSON() + select { + case co.localCandidateRecv <- &v: + case <-co.connected: + case <-co.closed: + } + } + }) + + return co, nil +} + +func (co *peerConnection) close() { + co.PeerConnection.Close() + <-co.closed +} + +func (co *peerConnection) localCandidate() string { + var cid string + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { + cid = tstats.LocalCandidateID + break + } + } + + if cid != "" { + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { + return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + + tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) + } + } + } + + return "" +} + +func (co *peerConnection) remoteCandidate() string { + var cid string + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated { + cid = tstats.RemoteCandidateID + break + } + } + + if cid != "" { + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid { + return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" + + tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10) + } + } + } + + return "" +} + +func (co *peerConnection) bytesReceived() uint64 { + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.TransportStats); ok { + if tstats.ID == "iceTransport" { + return tstats.BytesReceived + } + } + } + return 0 +} + +func (co *peerConnection) bytesSent() uint64 { + for _, stats := range co.GetStats() { + if tstats, ok := stats.(webrtc.TransportStats); ok { + if tstats.ID == "iceTransport" { + return tstats.BytesSent + } + } + } + return 0 +} diff --git a/internal/core/webrtc_publish_index.html b/internal/core/webrtc_publish_index.html new file mode 100644 index 00000000000..c33d2ce4587 --- /dev/null +++ b/internal/core/webrtc_publish_index.html @@ -0,0 +1,199 @@ + + +
+ + + + + + + +