Skip to content

Commit

Permalink
support publishing with WebRTC (#1659)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed May 11, 2023
1 parent 6ea299f commit 1477478
Show file tree
Hide file tree
Showing 17 changed files with 1,274 additions and 632 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Live streams can be published to the server with:
|RTMP servers and cameras|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-2 Audio (MP3)|
|HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)|
|UDP/MPEG-TS streams|Unicast, broadcast, multicast|H265, H264|Opus, MPEG-4 Audio (AAC)|
|WebRTC||VP8|Opus|
|Raspberry Pi Cameras||H264||

And can be read from the server with:
Expand Down Expand Up @@ -86,6 +87,7 @@ In the next months, the repository name and the Docker image name will be change
* [From OBS Studio](#from-obs-studio)
* [From OpenCV](#from-opencv)
* [From a UDP stream](#from-a-udp-stream)
* [From the browser](#from-the-browser)
* [Read from the server](#read-from-the-server)
* [From VLC and Ubuntu](#from-vlc-and-ubuntu)
* [RTSP protocol](#rtsp-protocol)
Expand Down Expand Up @@ -800,6 +802,14 @@ paths:

After starting the server, the stream can be reached on `rtsp://localhost:8554/udp`.

### From the browser

Open the page into the browser:

```
http://localhost:8889/mystream/publish
```

## Read from the server

### From VLC and Ubuntu
Expand Down
4 changes: 1 addition & 3 deletions internal/core/hls_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})

m.path = res.path

defer func() {
m.path.readerRemove(pathReaderRemoveReq{author: m})
}()
defer m.path.readerRemove(pathReaderRemoveReq{author: m})

m.ringBuffer, _ = ringbuffer.New(uint64(m.readBufferCount))

Expand Down
2 changes: 2 additions & 0 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type pathGetPathConfRes struct {

type pathGetPathConfReq struct {
name string
publish bool
credentials authCredentials
res chan pathGetPathConfRes
}
Expand Down Expand Up @@ -130,6 +131,7 @@ type pathPublisherAnnounceRes struct {
type pathPublisherAddReq struct {
author publisher
pathName string
skipAuth bool
credentials authCredentials
res chan pathPublisherAnnounceRes
}
Expand Down
13 changes: 8 additions & 5 deletions internal/core/path_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ outer:
continue
}

err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.name, pathConf, false, req.credentials)
err = authenticate(pm.externalAuthenticationURL, pm.authMethods,
req.name, pathConf, req.publish, req.credentials)
if err != nil {
req.res <- pathGetPathConfRes{err: pathErrAuth{wrapped: err}}
continue
Expand Down Expand Up @@ -266,10 +267,12 @@ outer:
continue
}

err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials)
if err != nil {
req.res <- pathPublisherAnnounceRes{err: pathErrAuth{wrapped: err}}
continue
if !req.skipAuth {
err = authenticate(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials)
if err != nil {
req.res <- pathPublisherAnnounceRes{err: pathErrAuth{wrapped: err}}
continue
}
}

// create path if it doesn't exist
Expand Down
22 changes: 7 additions & 15 deletions internal/core/rtmp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return res.err
}

path := res.path

defer func() {
path.readerRemove(pathReaderRemoveReq{author: c})
}()
defer res.path.readerRemove(pathReaderRemoveReq{author: c})

c.stateMutex.Lock()
c.state = rtmpConnStateRead
Expand Down Expand Up @@ -417,17 +413,17 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
defer res.stream.readerRemove(c)

c.Log(logger.Info, "is reading from path '%s', %s",
path.name, sourceMediaInfo(medias))
res.path.name, sourceMediaInfo(medias))

pathConf := path.safeConf()
pathConf := res.path.safeConf()

if pathConf.RunOnRead != "" {
c.Log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
path.externalCmdEnv(),
res.path.externalCmdEnv(),
func(co int) {
c.Log(logger.Info, "runOnRead command exited with code %d", co)
})
Expand Down Expand Up @@ -733,11 +729,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
return res.err
}

path := res.path

defer func() {
path.publisherRemove(pathPublisherRemoveReq{author: c})
}()
defer res.path.publisherRemove(pathPublisherRemoveReq{author: c})

c.stateMutex.Lock()
c.state = rtmpConnStatePublish
Expand Down Expand Up @@ -768,7 +760,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
medias = append(medias, audioMedia)
}

rres := path.publisherStart(pathPublisherStartReq{
rres := res.path.publisherStart(pathPublisherStartReq{
author: c,
medias: medias,
generateRTPPackets: true,
Expand All @@ -778,7 +770,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
}

c.Log(logger.Info, "is publishing to path '%s', %s",
path.name,
res.path.name,
sourceMediaInfo(medias))

// disable write deadline to allow outgoing acknowledges
Expand Down
4 changes: 1 addition & 3 deletions internal/core/rtmp_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha

s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))

defer func() {
s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}()
defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})

videoWriteFunc := getRTMPWriteFunc(videoMedia, videoFormat, res.stream)
audioWriteFunc := getRTMPWriteFunc(audioMedia, audioFormat, res.stream)
Expand Down
4 changes: 2 additions & 2 deletions internal/core/rtsp_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

type rtspWriteFunc func(*rtp.Packet)

func getRTSPWriteFunc(medi *media.Media, forma formats.Format, stream *stream) rtspWriteFunc {
func getRTPWriteFunc(medi *media.Media, forma formats.Format, stream *stream) rtspWriteFunc {
switch forma.(type) {
case *formats.H264:
return func(pkt *rtp.Packet) {
Expand Down Expand Up @@ -387,7 +387,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R

for _, medi := range s.session.AnnouncedMedias() {
for _, forma := range medi.Formats {
writeFunc := getRTSPWriteFunc(medi, forma, s.stream)
writeFunc := getRTPWriteFunc(medi, forma, s.stream)

ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
writeFunc(pkt)
Expand Down
6 changes: 2 additions & 4 deletions internal/core/rtsp_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,11 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha

s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))

defer func() {
s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}()
defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})

for _, medi := range medias {
for _, forma := range medi.Formats {
writeFunc := getRTSPWriteFunc(medi, forma, res.stream)
writeFunc := getRTPWriteFunc(medi, forma, res.stream)

c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
writeFunc(pkt)
Expand Down
4 changes: 1 addition & 3 deletions internal/core/udp_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
return res.err
}

defer func() {
s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}()
defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})

s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))

Expand Down
Loading

0 comments on commit 1477478

Please sign in to comment.