diff --git a/README.md b/README.md index b20dcf90b90..6b18d70df68 100644 --- a/README.md +++ b/README.md @@ -3,20 +3,33 @@ rtsp-simple-server

-_rtsp-simple-server_ is a ready-to-use and zero-dependency server and proxy that allows users to publish, read and proxy live video and audio streams through various protocols: +_rtsp-simple-server_ is a ready-to-use and zero-dependency server and proxy that allows users to publish, read and proxy live video and audio streams. -|protocol|description|variants|publish|read|proxy| -|--------|-----------|--------|-------|----|-----| -|RTSP|fastest way to publish and read streams|RTSP, RTSPS|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| -|RTMP|allows to interact with legacy software|RTMP, RTMPS|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| -|HLS|allows to embed streams into a web page|Low-Latency HLS, standard HLS|:x:|:heavy_check_mark:|:heavy_check_mark:| +Live streams can be published to the server with: + +|protocol|variants|codecs| +|--------|--------|------| +|RTSP clients (FFmpeg, GStreamer, etc)|UDP, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG2, JPEG, MP3, MPEG4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| +|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG2, JPEG, MP3, MPEG4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| +|RTMP clients (OBS Studio)|RTMP, RTMPS|H264, MPEG4 Audio (AAC)| +|RTMP servers and cameras|RTMP, RTMPS|H264, MPEG4 Audio (AAC)| +|HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, MPEG4 Audio (AAC)| +|Raspberry Pi Cameras||H264| + +And can be read from the server with: + +|protocol|variants|codecs| +|--------|--------|------| +|RTSP|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG2, JPEG, MP3, MPEG4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| +|RTMP|RTMP, RTMPS|H264, MPEG4 Audio (AAC)| +|HLS|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, MPEG4 Audio (AAC)| +|WebRTC||H264, VP8, VP9, Opus, G711, G722| Features: * Publish live streams to the server * Read live streams from the server * Proxy streams from other servers or cameras, always or on-demand -* Each stream can have multiple video and audio tracks, encoded with any RTP-compatible codec, including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM, JPEG * Streams are automatically converted from a protocol to another. For instance, it's possible to publish a stream with RTSP and read it with HLS * Serve multiple streams at once in separate paths * Authenticate users; use internal or external authentication @@ -446,6 +459,10 @@ Obtaining: paths{name="[path_name]",state="[state]"} 1 paths_bytes_received{name="[path_name]",state="[state]"} 1234 +# metrics of every HLS muxer +hls_muxers{name="[name]"} 1 +hls_muxers_bytes_sent{name="[name]"} 187 + # metrics of every RTSP connection rtsp_conns{id="[id]"} 1 rtsp_conns_bytes_received{id="[id]"} 1234 @@ -471,9 +488,10 @@ rtmp_conns{id="[id]",state="[state]"} 1 rtmp_conns_bytes_received{id="[id]",state="[state]"} 1234 rtmp_conns_bytes_sent{id="[id]",state="[state]"} 187 -# metrics of every HLS muxer -hls_muxers{name="[name]"} 1 -hls_muxers_bytes_sent{name="[name]"} 187 +# metrics of every WebRTC connection +webrtc_conns{id="[id]"} 1 +webrtc_conns_bytes_received{id="[id]",state="[state]"} 1234 +webrtc_conns_bytes_sent{id="[id]",state="[state]"} 187 ``` ### pprof diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 869fe689927..7e364581a82 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -429,6 +429,14 @@ components: bytesSent: type: number + HLSMuxersList: + type: object + properties: + items: + type: object + additionalProperties: + $ref: '#/components/schemas/HLSMuxer' + PathsList: type: object properties: @@ -437,6 +445,14 @@ components: additionalProperties: $ref: '#/components/schemas/Path' + RTMPConnsList: + type: object + properties: + items: + type: object + additionalProperties: + $ref: '#/components/schemas/RTMPConn' + RTSPConnsList: type: object properties: @@ -461,21 +477,25 @@ components: additionalProperties: $ref: '#/components/schemas/RTSPSession' - RTMPConnsList: + WebRTCConn: type: object properties: - items: - type: object - additionalProperties: - $ref: '#/components/schemas/RTMPConn' + created: + type: string + remoteAddr: + type: string + bytesReceived: + type: number + bytesSent: + type: number - HLSMuxersList: + WebRTCConnsList: type: object properties: items: type: object additionalProperties: - $ref: '#/components/schemas/HLSMuxer' + $ref: '#/components/schemas/WebRTCConn' paths: /v1/config/get: @@ -586,10 +606,10 @@ paths: '500': description: internal server error. - /v1/paths/list: + /v1/hlsmuxers/list: get: - operationId: pathsList - summary: returns all paths. + operationId: hlsMuxersList + summary: returns all HLS muxers. description: '' responses: '200': @@ -597,16 +617,16 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/PathsList' + $ref: '#/components/schemas/HLSMuxersList' '400': description: invalid request. '500': description: internal server error. - /v1/rtspconns/list: + /v1/paths/list: get: - operationId: rtspConnsList - summary: returns all RTSP connections. + operationId: pathsList + summary: returns all paths. description: '' responses: '200': @@ -614,16 +634,16 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/RTSPConnsList' + $ref: '#/components/schemas/PathsList' '400': description: invalid request. '500': description: internal server error. - /v1/rtspsessions/list: + /v1/rtspconns/list: get: - operationId: rtspSessionsList - summary: returns all RTSP sessions. + operationId: rtspConnsList + summary: returns all RTSP connections. description: '' responses: '200': @@ -631,16 +651,16 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/RTSPSessionsList' + $ref: '#/components/schemas/RTSPConnsList' '400': description: invalid request. '500': description: internal server error. - /v1/rtspsconns/list: + /v1/rtspsessions/list: get: - operationId: rtspsConnsList - summary: returns all RTSPS connections. + operationId: rtspSessionsList + summary: returns all RTSP sessions. description: '' responses: '200': @@ -648,7 +668,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/RTSPConnsList' + $ref: '#/components/schemas/RTSPSessionsList' '400': description: invalid request. '500': @@ -674,6 +694,23 @@ paths: '500': description: internal server error. + /v1/rtspsconns/list: + get: + operationId: rtspsConnsList + summary: returns all RTSPS connections. + description: '' + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTSPConnsList' + '400': + description: invalid request. + '500': + description: internal server error. + /v1/rtspssessions/list: get: operationId: rtspsSessionsList @@ -785,10 +822,10 @@ paths: '500': description: internal server error. - /v1/hlsmuxers/list: + /v1/webrtcconns/list: get: - operationId: hlsMuxersList - summary: returns all HLS muxers. + operationId: webrtcConnsList + summary: returns all WebRTC connections. description: '' responses: '200': @@ -796,7 +833,27 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/HLSMuxersList' + $ref: '#/components/schemas/WebRTCConnsList' + '400': + description: invalid request. + '500': + description: internal server error. + + /v1/webrtcconns/kick/{id}: + post: + operationId: webrtcConnsKick + summary: kicks out a WebRTC connection from the server. + description: '' + parameters: + - name: id + in: path + required: true + description: the ID of the session. + schema: + type: string + responses: + '200': + description: the request was successful. '400': description: invalid request. '500': diff --git a/go.mod b/go.mod index f7010a7b23a..6451c90540b 100644 --- a/go.mod +++ b/go.mod @@ -5,19 +5,21 @@ go 1.18 require ( code.cloudfoundry.org/bytefmt v0.0.0-20211005130812-5bb3c17173e5 github.com/abema/go-mp4 v0.8.0 - github.com/aler9/gortsplib/v2 v2.0.0-20221214165733-d43cb0455e33 + github.com/aler9/gortsplib/v2 v2.0.0-20221214210611-e1c07a1c8d71 github.com/asticode/go-astits v1.10.1-0.20220319093903-4abe66a9b757 github.com/fsnotify/fsnotify v1.4.9 github.com/gin-gonic/gin v1.8.1 - github.com/google/uuid v1.1.2 + github.com/google/uuid v1.3.0 github.com/gookit/color v1.4.2 + github.com/gorilla/websocket v1.5.0 github.com/grafov/m3u8 v0.11.1 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/notedit/rtmp v0.0.2 github.com/orcaman/writerseeker v0.0.0 github.com/pion/rtp v1.7.13 + github.com/pion/webrtc/v3 v3.1.47 github.com/stretchr/testify v1.7.1 - golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 + golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/yaml.v2 v2.4.0 ) @@ -38,14 +40,26 @@ require ( github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.1 // indirect + github.com/pion/datachannel v1.5.2 // indirect + github.com/pion/dtls/v2 v2.1.5 // indirect + github.com/pion/ice/v2 v2.2.11 // indirect + github.com/pion/interceptor v0.1.11 // indirect + github.com/pion/logging v0.2.2 // indirect + github.com/pion/mdns v0.0.5 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtcp v1.2.9 // indirect - github.com/pion/sdp/v3 v3.0.5 // indirect + github.com/pion/rtcp v1.2.10 // indirect + github.com/pion/sctp v1.8.2 // indirect + github.com/pion/sdp/v3 v3.0.6 // indirect + github.com/pion/srtp/v2 v2.0.10 // indirect + github.com/pion/stun v0.3.5 // indirect + github.com/pion/transport v0.13.1 // indirect + github.com/pion/turn/v2 v2.0.8 // indirect + github.com/pion/udp v0.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/ugorji/go/codec v1.2.7 // indirect github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect - golang.org/x/net v0.0.0-20220526153639-5463443f8c37 // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/net v0.0.0-20221004154528-8021a29435af // indirect + golang.org/x/sys v0.0.0-20221010170243-090e33056c14 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index 910fd80ab33..44eafb7c331 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib/v2 v2.0.0-20221214165733-d43cb0455e33 h1:7r2VpQoRSYOCU9qSXit9A4RKI7ufdI5UAxDHHjZ1Occ= -github.com/aler9/gortsplib/v2 v2.0.0-20221214165733-d43cb0455e33/go.mod h1:zJ+fWtakOMN6cKV169EMNVBLPTITArrJKu/fyM+dov8= +github.com/aler9/gortsplib/v2 v2.0.0-20221214210611-e1c07a1c8d71 h1:dgKa+8HxFRliWSRFHyYg1Fz2F6OlDapT81oDGS6kits= +github.com/aler9/gortsplib/v2 v2.0.0-20221214210611-e1c07a1c8d71/go.mod h1:zJ+fWtakOMN6cKV169EMNVBLPTITArrJKu/fyM+dov8= github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9ToSVV7sRTBYZ1GGOZUpq4+5H3SN0UZq4= github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= @@ -52,10 +52,13 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gookit/color v1.4.2 h1:tXy44JFSFkKnELV6WaMo/lLfu/meqITX3iAV52do7lk= github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafov/m3u8 v0.11.1 h1:igZ7EBIB2IAsPPazKwRKdbhxcoBKO3lO1UY57PZDeNA= github.com/grafov/m3u8 v0.11.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -87,22 +90,55 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV0GUKU= github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo= +github.com/pion/datachannel v1.5.2 h1:piB93s8LGmbECrpO84DnkIVWasRMk3IimbcXkTQLE6E= +github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ= +github.com/pion/dtls/v2 v2.1.5 h1:jlh2vtIyUBShchoTDqpCCqiYCyRFJ/lvf/gQ8TALs+c= +github.com/pion/dtls/v2 v2.1.5/go.mod h1:BqCE7xPZbPSubGasRoDFJeTsyJtdD1FanJYL0JGheqY= +github.com/pion/ice/v2 v2.2.11 h1:wiAy7TSrVZ4KdyjC0CcNTkwltz9ywetbe4wbHLKUbIg= +github.com/pion/ice/v2 v2.2.11/go.mod h1:NqUDUao6SjSs1+4jrqpexDmFlptlVhGxQjcymXLaVvE= +github.com/pion/interceptor v0.1.11 h1:00U6OlqxA3FFB50HSg25J/8cWi7P6FbSzw4eFn24Bvs= +github.com/pion/interceptor v0.1.11/go.mod h1:tbtKjZY14awXd7Bq0mmWvgtHB5MDaRN7HV3OZ/uy7s8= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/mdns v0.0.5 h1:Q2oj/JB3NqfzY9xGZ1fPzZzK7sDSD8rZPOvcIQ10BCw= +github.com/pion/mdns v0.0.5/go.mod h1:UgssrvdD3mxpi8tMxAXbsppL3vJ4Jipw1mTCW+al01g= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= -github.com/pion/rtcp v1.2.9 h1:1ujStwg++IOLIEoOiIQ2s+qBuJ1VN81KW+9pMPsif+U= github.com/pion/rtcp v1.2.9/go.mod h1:qVPhiCzAm4D/rxb6XzKeyZiQK69yJpbUDJSF7TgrqNo= +github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= +github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= github.com/pion/rtp v1.7.13 h1:qcHwlmtiI50t1XivvoawdCGTP4Uiypzfrsap+bijcoA= github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= -github.com/pion/sdp/v3 v3.0.5 h1:ouvI7IgGl+V4CrqskVtr3AaTrPvPisEOxwgpdktctkU= -github.com/pion/sdp/v3 v3.0.5/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= +github.com/pion/sctp v1.8.0/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= +github.com/pion/sctp v1.8.2 h1:yBBCIrUMJ4yFICL3RIvR4eh/H2BTTvlligmSTy+3kiA= +github.com/pion/sctp v1.8.2/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= +github.com/pion/sdp/v3 v3.0.6 h1:WuDLhtuFUUVpTfus9ILC4HRyHsW6TdugjEX/QY9OiUw= +github.com/pion/sdp/v3 v3.0.6/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= +github.com/pion/srtp/v2 v2.0.10 h1:b8ZvEuI+mrL8hbr/f1YiJFB34UMrOac3R3N1yq2UN0w= +github.com/pion/srtp/v2 v2.0.10/go.mod h1:XEeSWaK9PfuMs7zxXyiN252AHPbH12NX5q/CFDWtUuA= +github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg= +github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA= +github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q= +github.com/pion/transport v0.12.3/go.mod h1:OViWW9SP2peE/HbwBvARicmAVnesphkNkCVZIWJ6q9A= +github.com/pion/transport v0.13.0/go.mod h1:yxm9uXpK9bpBBWkITk13cLo1y5/ur5VQpG22ny6EP7g= +github.com/pion/transport v0.13.1 h1:/UH5yLeQtwm2VZIPjxwnNFxjS4DFhyLfS4GlfuKUzfA= +github.com/pion/transport v0.13.1/go.mod h1:EBxbqzyv+ZrmDb82XswEE0BjfQFtuw1Nu6sjnjWCsGg= +github.com/pion/turn/v2 v2.0.8 h1:KEstL92OUN3k5k8qxsXHpr7WWfrdp7iJZHx99ud8muw= +github.com/pion/turn/v2 v2.0.8/go.mod h1:+y7xl719J8bAEVpSXBXvTxStjJv3hbz9YFflvkpcGPw= +github.com/pion/udp v0.1.1 h1:8UAPvyqmsxK8oOjloDk4wUt63TzFe9WEJkg5lChlj7o= +github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M= +github.com/pion/webrtc/v3 v3.1.47 h1:2dFEKRI1rzFvehXDq43hK9OGGyTGJSusUi3j6QKHC5s= +github.com/pion/webrtc/v3 v3.1.47/go.mod h1:8U39MYZCLVV4sIBn01htASVNkWQN2zDa/rx5xisEXWs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -110,6 +146,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -128,18 +165,27 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 h1:/UOmuWzQfxxo9UtlXMwuQU8CMgg1eZXqTRwkSQJWKOI= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2 h1:x8vtB3zMecnlqZIwJNUUpwYKYSqCz5jXbiyv0ZJJZeI= +golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= -golang.org/x/net v0.0.0-20220526153639-5463443f8c37 h1:lUkvobShwKsOesNfWWlCS5q7fnbG1MEliIzwu886fn8= -golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211201190559-0a0e4e1bb54c/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20221002022538-bcab6841153b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.0.0-20221004154528-8021a29435af h1:wv66FM3rLZGPdxpYL+ApnDe2HzHcTFta3z5nsc13wI4= +golang.org/x/net v0.0.0-20221004154528-8021a29435af/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -159,8 +205,13 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20221010170243-090e33056c14 h1:k5II8e6QD8mITdi+okbbmR/cIyEbeXLBhy5Ha4nevyc= +golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -190,6 +241,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/internal/conf/authmethod.go b/internal/conf/authmethod.go index 05a5d4c4437..3c9c6e17e25 100644 --- a/internal/conf/authmethod.go +++ b/internal/conf/authmethod.go @@ -54,6 +54,7 @@ func (d *AuthMethods) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *AuthMethods) unmarshalEnv(s string) error { byts, _ := json.Marshal(strings.Split(s, ",")) return d.UnmarshalJSON(byts) diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 72cd7d0241b..830dc0210dc 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -211,6 +211,9 @@ type Conf struct { // HLS HLSDisable bool `json:"hlsDisable"` HLSAddress string `json:"hlsAddress"` + HLSEncryption bool `json:"hlsEncryption"` + HLSServerKey string `json:"hlsServerKey"` + HLSServerCert string `json:"hlsServerCert"` HLSAlwaysRemux bool `json:"hlsAlwaysRemux"` HLSVariant HLSVariant `json:"hlsVariant"` HLSSegmentCount int `json:"hlsSegmentCount"` @@ -218,11 +221,17 @@ type Conf struct { HLSPartDuration StringDuration `json:"hlsPartDuration"` HLSSegmentMaxSize StringSize `json:"hlsSegmentMaxSize"` HLSAllowOrigin string `json:"hlsAllowOrigin"` - HLSEncryption bool `json:"hlsEncryption"` - HLSServerKey string `json:"hlsServerKey"` - HLSServerCert string `json:"hlsServerCert"` HLSTrustedProxies IPsOrCIDRs `json:"hlsTrustedProxies"` + // WebRTC + WebRTC bool `json:"webrtc"` + WebRTCAddress string `json:"webrtcAddress"` + WebRTCServerKey string `json:"webrtcServerKey"` + WebRTCServerCert string `json:"webrtcServerCert"` + WebRTCAllowOrigin string `json:"webrtcAllowOrigin"` + WebRTCTrustedProxies IPsOrCIDRs `json:"webrtcTrustedProxies"` + WebRTCICEServers []string `json:"webrtcICEServers"` + // paths Paths map[string]*PathConf `json:"paths"` } @@ -251,52 +260,45 @@ func Load(fpath string) (*Conf, bool, error) { // CheckAndFillMissing checks the configuration for errors and fills missing parameters. func (conf *Conf) CheckAndFillMissing() error { + // general if conf.LogLevel == 0 { conf.LogLevel = LogLevel(logger.Info) } - if len(conf.LogDestinations) == 0 { conf.LogDestinations = LogDestinations{logger.DestinationStdout: {}} } - if conf.LogFile == "" { conf.LogFile = "rtsp-simple-server.log" } - if conf.ReadTimeout == 0 { conf.ReadTimeout = 10 * StringDuration(time.Second) } - if conf.WriteTimeout == 0 { conf.WriteTimeout = 10 * StringDuration(time.Second) } - if conf.ReadBufferCount == 0 { conf.ReadBufferCount = 512 } if (conf.ReadBufferCount & (conf.ReadBufferCount - 1)) != 0 { return fmt.Errorf("'ReadBufferCount' must be a power of two") } - if conf.ExternalAuthenticationURL != "" { if !strings.HasPrefix(conf.ExternalAuthenticationURL, "http://") && !strings.HasPrefix(conf.ExternalAuthenticationURL, "https://") { return fmt.Errorf("'externalAuthenticationURL' must be a HTTP URL") } } - if conf.APIAddress == "" { conf.APIAddress = "127.0.0.1:9997" } - if conf.MetricsAddress == "" { conf.MetricsAddress = "127.0.0.1:9998" } - if conf.PPROFAddress == "" { conf.PPROFAddress = "127.0.0.1:9999" } + // RTSP if len(conf.Protocols) == 0 { conf.Protocols = Protocols{ Protocol(gortsplib.TransportUDP): {}, @@ -304,7 +306,6 @@ func (conf *Conf) CheckAndFillMissing() error { Protocol(gortsplib.TransportTCP): {}, } } - if conf.Encryption == EncryptionStrict { if _, ok := conf.Protocols[Protocol(gortsplib.TransportUDP)]; ok { return fmt.Errorf("strict encryption can't be used with the UDP transport protocol") @@ -314,87 +315,70 @@ func (conf *Conf) CheckAndFillMissing() error { return fmt.Errorf("strict encryption can't be used with the UDP-multicast transport protocol") } } - if conf.RTSPAddress == "" { conf.RTSPAddress = ":8554" } - if conf.RTSPSAddress == "" { conf.RTSPSAddress = ":8322" } - if conf.RTPAddress == "" { conf.RTPAddress = ":8000" } - if conf.RTCPAddress == "" { conf.RTCPAddress = ":8001" } - if conf.MulticastIPRange == "" { conf.MulticastIPRange = "224.1.0.0/16" } - if conf.MulticastRTPPort == 0 { conf.MulticastRTPPort = 8002 } - if conf.MulticastRTCPPort == 0 { conf.MulticastRTCPPort = 8003 } - if conf.ServerKey == "" { conf.ServerKey = "server.key" } - if conf.ServerCert == "" { conf.ServerCert = "server.crt" } - if len(conf.AuthMethods) == 0 { conf.AuthMethods = AuthMethods{headers.AuthBasic, headers.AuthDigest} } + // RTMP if conf.RTMPAddress == "" { conf.RTMPAddress = ":1935" } - if conf.RTMPSAddress == "" { conf.RTMPSAddress = ":1936" } + // HLS if conf.HLSAddress == "" { conf.HLSAddress = ":8888" } - + if conf.HLSServerKey == "" { + conf.HLSServerKey = "server.key" + } + if conf.HLSServerCert == "" { + conf.HLSServerCert = "server.crt" + } if conf.HLSSegmentCount == 0 { conf.HLSSegmentCount = 7 } - if conf.HLSSegmentDuration == 0 { conf.HLSSegmentDuration = 1 * StringDuration(time.Second) } - if conf.HLSPartDuration == 0 { conf.HLSPartDuration = 200 * StringDuration(time.Millisecond) } - if conf.HLSSegmentMaxSize == 0 { conf.HLSSegmentMaxSize = 50 * 1024 * 1024 } - if conf.HLSAllowOrigin == "" { conf.HLSAllowOrigin = "*" } - - if conf.HLSServerKey == "" { - conf.HLSServerKey = "server.key" - } - - if conf.HLSServerCert == "" { - conf.HLSServerCert = "server.crt" - } - switch conf.HLSVariant { case HLSVariantLowLatency: if conf.HLSSegmentCount < 7 { @@ -411,6 +395,23 @@ func (conf *Conf) CheckAndFillMissing() error { } } + // WebRTC + if conf.WebRTCAddress == "" { + conf.WebRTCAddress = ":8889" + } + if conf.WebRTCServerKey == "" { + conf.WebRTCServerKey = "server.key" + } + if conf.WebRTCServerCert == "" { + conf.WebRTCServerCert = "server.crt" + } + if conf.WebRTCAllowOrigin == "" { + conf.WebRTCAllowOrigin = "*" + } + if conf.WebRTCICEServers == nil { + conf.WebRTCICEServers = []string{"stun:stun.l.google.com:19302"} + } + // do not add automatically "all", since user may want to // initialize all paths through API or hot reloading. if conf.Paths == nil { diff --git a/internal/conf/credential.go b/internal/conf/credential.go index cf5de2d5079..85d1997808b 100644 --- a/internal/conf/credential.go +++ b/internal/conf/credential.go @@ -36,6 +36,7 @@ func (d *Credential) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *Credential) unmarshalEnv(s string) error { return d.UnmarshalJSON([]byte(`"` + s + `"`)) } diff --git a/internal/conf/encryption.go b/internal/conf/encryption.go index 007f43876ff..dc7fe5f7653 100644 --- a/internal/conf/encryption.go +++ b/internal/conf/encryption.go @@ -57,6 +57,7 @@ func (d *Encryption) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *Encryption) unmarshalEnv(s string) error { return d.UnmarshalJSON([]byte(`"` + s + `"`)) } diff --git a/internal/conf/env.go b/internal/conf/env.go index 599ca43ce49..f6c8aab3d0f 100644 --- a/internal/conf/env.go +++ b/internal/conf/env.go @@ -131,6 +131,14 @@ func loadEnvInternal(env map[string]string, prefix string, rv reflect.Value) err } } return nil + + case reflect.Slice: + if rt.Elem() == reflect.TypeOf("") { + if ev, ok := env[prefix]; ok { + rv.Set(reflect.ValueOf(strings.Split(ev, ","))) + } + return nil + } } return fmt.Errorf("unsupported type: %v", rt) diff --git a/internal/conf/env_test.go b/internal/conf/env_test.go index 92915920ee9..384e5ab0ebe 100644 --- a/internal/conf/env_test.go +++ b/internal/conf/env_test.go @@ -24,6 +24,7 @@ type testStruct struct { MyBool bool MyDuration StringDuration MyMap map[string]*mapEntry + MySlice []string } func TestEnvironment(t *testing.T) { @@ -51,6 +52,9 @@ func TestEnvironment(t *testing.T) { os.Setenv("MYPREFIX_MYMAP_MYKEY2_MYSTRUCT_MYPARAM", "456") defer os.Unsetenv("MYPREFIX_MYMAP_MYKEY2_MYSTRUCT_MYPARAM") + os.Setenv("MYPREFIX_MYSLICE", "val1,val2") + defer os.Unsetenv("MYPREFIX_MYSLICE") + var s testStruct err := loadFromEnvironment("MYPREFIX", &s) require.NoError(t, err) @@ -68,4 +72,6 @@ func TestEnvironment(t *testing.T) { require.Equal(t, true, ok) require.Equal(t, "asd", v.MyValue) require.Equal(t, 456, v.MyStruct.MyParam) + + require.Equal(t, []string{"val1", "val2"}, s.MySlice) } diff --git a/internal/conf/hlsvariant.go b/internal/conf/hlsvariant.go index 80aaee0c58d..133e0f63c5e 100644 --- a/internal/conf/hlsvariant.go +++ b/internal/conf/hlsvariant.go @@ -59,6 +59,7 @@ func (d *HLSVariant) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *HLSVariant) unmarshalEnv(s string) error { return d.UnmarshalJSON([]byte(`"` + s + `"`)) } diff --git a/internal/conf/ipsorcidrs.go b/internal/conf/ipsorcidrs.go index 42efe7e34bd..ac18bd66992 100644 --- a/internal/conf/ipsorcidrs.go +++ b/internal/conf/ipsorcidrs.go @@ -48,6 +48,7 @@ func (d *IPsOrCIDRs) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *IPsOrCIDRs) unmarshalEnv(s string) error { byts, _ := json.Marshal(strings.Split(s, ",")) return d.UnmarshalJSON(byts) diff --git a/internal/conf/logdestination.go b/internal/conf/logdestination.go index fc2ac3d55c4..24d37ce1d1e 100644 --- a/internal/conf/logdestination.go +++ b/internal/conf/logdestination.go @@ -68,6 +68,7 @@ func (d *LogDestinations) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *LogDestinations) unmarshalEnv(s string) error { byts, _ := json.Marshal(strings.Split(s, ",")) return d.UnmarshalJSON(byts) diff --git a/internal/conf/loglevel.go b/internal/conf/loglevel.go index cb31e0821b7..dce4c097c80 100644 --- a/internal/conf/loglevel.go +++ b/internal/conf/loglevel.go @@ -58,6 +58,7 @@ func (d *LogLevel) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *LogLevel) unmarshalEnv(s string) error { return d.UnmarshalJSON([]byte(`"` + s + `"`)) } diff --git a/internal/conf/protocol.go b/internal/conf/protocol.go index 6ec0fb41d5d..83c97928be9 100644 --- a/internal/conf/protocol.go +++ b/internal/conf/protocol.go @@ -71,6 +71,7 @@ func (d *Protocols) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *Protocols) unmarshalEnv(s string) error { byts, _ := json.Marshal(strings.Split(s, ",")) return d.UnmarshalJSON(byts) diff --git a/internal/conf/sourceprotocol.go b/internal/conf/sourceprotocol.go index aad58ec878f..db334927ca4 100644 --- a/internal/conf/sourceprotocol.go +++ b/internal/conf/sourceprotocol.go @@ -63,6 +63,7 @@ func (d *SourceProtocol) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *SourceProtocol) unmarshalEnv(s string) error { return d.UnmarshalJSON([]byte(`"` + s + `"`)) } diff --git a/internal/conf/stringduration.go b/internal/conf/stringduration.go index 825f098b06b..51ebcb256a8 100644 --- a/internal/conf/stringduration.go +++ b/internal/conf/stringduration.go @@ -30,6 +30,7 @@ func (d *StringDuration) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (d *StringDuration) unmarshalEnv(s string) error { return d.UnmarshalJSON([]byte(`"` + s + `"`)) } diff --git a/internal/conf/stringsize.go b/internal/conf/stringsize.go index 65829b273f2..185b3b85a0a 100644 --- a/internal/conf/stringsize.go +++ b/internal/conf/stringsize.go @@ -30,6 +30,7 @@ func (s *StringSize) UnmarshalJSON(b []byte) error { return nil } +// unmarshalEnv implements envUnmarshaler. func (s *StringSize) unmarshalEnv(v string) error { return s.UnmarshalJSON([]byte(`"` + v + `"`)) } diff --git a/internal/core/api.go b/internal/core/api.go index 02e5fa0ffc2..448bb63fb77 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -5,7 +5,6 @@ import ( "encoding/json" "net" "net/http" - "net/http/httputil" "reflect" "sync" @@ -86,6 +85,10 @@ type apiPathManager interface { apiPathsList() pathAPIPathsListRes } +type apiHLSServer interface { + apiMuxersList() hlsServerAPIMuxersListRes +} + type apiRTSPServer interface { apiConnsList() rtspServerAPIConnsListRes apiSessionsList() rtspServerAPISessionsListRes @@ -97,24 +100,26 @@ type apiRTMPServer interface { apiConnsKick(id string) rtmpServerAPIConnsKickRes } -type apiHLSServer interface { - apiHLSMuxersList() hlsServerAPIMuxersListRes -} - type apiParent interface { Log(logger.Level, string, ...interface{}) apiConfigSet(conf *conf.Conf) } +type apiWebRTCServer interface { + apiConnsList() webRTCServerAPIConnsListRes + apiConnsKick(id string) webRTCServerAPIConnsKickRes +} + type api struct { - conf *conf.Conf - pathManager apiPathManager - rtspServer apiRTSPServer - rtspsServer apiRTSPServer - rtmpServer apiRTMPServer - rtmpsServer apiRTMPServer - hlsServer apiHLSServer - parent apiParent + conf *conf.Conf + pathManager apiPathManager + rtspServer apiRTSPServer + rtspsServer apiRTSPServer + rtmpServer apiRTMPServer + rtmpsServer apiRTMPServer + hlsServer apiHLSServer + webRTCServer apiWebRTCServer + parent apiParent ln net.Listener mutex sync.Mutex @@ -130,6 +135,7 @@ func newAPI( rtmpServer apiRTMPServer, rtmpsServer apiRTMPServer, hlsServer apiHLSServer, + webRTCServer apiWebRTCServer, parent apiParent, ) (*api, error) { ln, err := net.Listen("tcp", address) @@ -138,21 +144,23 @@ func newAPI( } a := &api{ - conf: conf, - pathManager: pathManager, - rtspServer: rtspServer, - rtspsServer: rtspsServer, - rtmpServer: rtmpServer, - rtmpsServer: rtmpsServer, - hlsServer: hlsServer, - parent: parent, - ln: ln, + conf: conf, + pathManager: pathManager, + rtspServer: rtspServer, + rtspsServer: rtspsServer, + rtmpServer: rtmpServer, + rtmpsServer: rtmpsServer, + hlsServer: hlsServer, + webRTCServer: webRTCServer, + parent: parent, + ln: ln, } router := gin.New() router.SetTrustedProxies(nil) - router.NoRoute(a.mwLog) - group := router.Group("/", a.mwLog) + mwLog := httpLoggerMiddleware(a) + router.NoRoute(mwLog) + group := router.Group("/", mwLog) group.GET("/v1/config/get", a.onConfigGet) group.POST("/v1/config/set", a.onConfigSet) @@ -160,6 +168,10 @@ func newAPI( group.POST("/v1/config/paths/edit/*name", a.onConfigPathsEdit) group.POST("/v1/config/paths/remove/*name", a.onConfigPathsDelete) + if !interfaceIsEmpty(a.hlsServer) { + group.GET("/v1/hlsmuxers/list", a.onHLSMuxersList) + } + group.GET("/v1/paths/list", a.onPathsList) if !interfaceIsEmpty(a.rtspServer) { @@ -184,8 +196,9 @@ func newAPI( group.POST("/v1/rtmpsconns/kick/:id", a.onRTMPSConnsKick) } - if !interfaceIsEmpty(a.hlsServer) { - group.GET("/v1/hlsmuxers/list", a.onHLSMuxersList) + if !interfaceIsEmpty(a.webRTCServer) { + group.GET("/v1/webrtcconns/list", a.onWebRTCConnsList) + group.POST("/v1/webrtcconns/kick/:id", a.onWebRTCConnsKick) } a.s = &http.Server{Handler: router} @@ -207,22 +220,6 @@ func (a *api) log(level logger.Level, format string, args ...interface{}) { a.parent.Log(level, "[API] "+format, args...) } -func (a *api) mwLog(ctx *gin.Context) { - a.log(logger.Info, "[conn %v] %s %s", ctx.Request.RemoteAddr, ctx.Request.Method, ctx.Request.URL.Path) - - byts, _ := httputil.DumpRequest(ctx.Request, true) - a.log(logger.Debug, "[conn %v] [c->s] %s", ctx.Request.RemoteAddr, string(byts)) - - logw := &httpLogWriter{ResponseWriter: ctx.Writer} - ctx.Writer = logw - - ctx.Writer.Header().Set("Server", "rtsp-simple-server") - - ctx.Next() - - a.log(logger.Debug, "[conn %v] [s->c] %s", ctx.Request.RemoteAddr, logw.dump()) -} - func (a *api) onConfigGet(ctx *gin.Context) { a.mutex.Lock() c := a.conf @@ -419,7 +416,6 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { res := a.rtspServer.apiSessionsKick(id) if res.err != nil { - ctx.AbortWithStatus(http.StatusNotFound) return } @@ -451,7 +447,6 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { res := a.rtspsServer.apiSessionsKick(id) if res.err != nil { - ctx.AbortWithStatus(http.StatusNotFound) return } @@ -473,7 +468,6 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { res := a.rtmpServer.apiConnsKick(id) if res.err != nil { - ctx.AbortWithStatus(http.StatusNotFound) return } @@ -495,7 +489,6 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) { res := a.rtmpsServer.apiConnsKick(id) if res.err != nil { - ctx.AbortWithStatus(http.StatusNotFound) return } @@ -503,7 +496,17 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) { } func (a *api) onHLSMuxersList(ctx *gin.Context) { - res := a.hlsServer.apiHLSMuxersList() + res := a.hlsServer.apiMuxersList() + if res.err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + + ctx.JSON(http.StatusOK, res.data) +} + +func (a *api) onWebRTCConnsList(ctx *gin.Context) { + res := a.webRTCServer.apiConnsList() if res.err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -512,6 +515,17 @@ func (a *api) onHLSMuxersList(ctx *gin.Context) { ctx.JSON(http.StatusOK, res.data) } +func (a *api) onWebRTCConnsKick(ctx *gin.Context) { + id := ctx.Param("id") + + res := a.webRTCServer.apiConnsKick(id) + if res.err != nil { + return + } + + ctx.Status(http.StatusOK) +} + // confReload is called by core. func (a *api) confReload(conf *conf.Conf) { a.mutex.Lock() diff --git a/internal/core/core.go b/internal/core/core.go index bc549f98d27..0ce3b75134c 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -38,6 +38,7 @@ type Core struct { rtmpServer *rtmpServer rtmpsServer *rtmpServer hlsServer *hlsServer + webRTCServer *webRTCServer api *api confWatcher *confwatcher.ConfWatcher @@ -180,7 +181,8 @@ func (p *Core) createResources(initial bool) error { p.logger, err = logger.New( logger.Level(p.conf.LogLevel), p.conf.LogDestinations, - p.conf.LogFile) + p.conf.LogFile, + ) if err != nil { return err } @@ -201,7 +203,8 @@ func (p *Core) createResources(initial bool) error { if p.metrics == nil { p.metrics, err = newMetrics( p.conf.MetricsAddress, - p) + p, + ) if err != nil { return err } @@ -212,7 +215,8 @@ func (p *Core) createResources(initial bool) error { if p.pprof == nil { p.pprof, err = newPPROF( p.conf.PPROFAddress, - p) + p, + ) if err != nil { return err } @@ -229,7 +233,8 @@ func (p *Core) createResources(initial bool) error { p.conf.Paths, p.externalCmdPool, p.metrics, - p) + p, + ) } if !p.conf.RTSPDisable && @@ -263,7 +268,8 @@ func (p *Core) createResources(initial bool) error { p.externalCmdPool, p.metrics, p.pathManager, - p) + p, + ) if err != nil { return err } @@ -299,7 +305,8 @@ func (p *Core) createResources(initial bool) error { p.externalCmdPool, p.metrics, p.pathManager, - p) + p, + ) if err != nil { return err } @@ -326,7 +333,8 @@ func (p *Core) createResources(initial bool) error { p.externalCmdPool, p.metrics, p.pathManager, - p) + p, + ) if err != nil { return err } @@ -353,7 +361,8 @@ func (p *Core) createResources(initial bool) error { p.externalCmdPool, p.metrics, p.pathManager, - p) + p, + ) if err != nil { return err } @@ -365,6 +374,9 @@ func (p *Core) createResources(initial bool) error { p.hlsServer, err = newHLSServer( p.ctx, p.conf.HLSAddress, + p.conf.HLSEncryption, + p.conf.HLSServerKey, + p.conf.HLSServerCert, p.conf.ExternalAuthenticationURL, p.conf.HLSAlwaysRemux, p.conf.HLSVariant, @@ -373,14 +385,34 @@ func (p *Core) createResources(initial bool) error { p.conf.HLSPartDuration, p.conf.HLSSegmentMaxSize, p.conf.HLSAllowOrigin, - p.conf.HLSEncryption, - p.conf.HLSServerKey, - p.conf.HLSServerCert, p.conf.HLSTrustedProxies, p.conf.ReadBufferCount, p.pathManager, p.metrics, - p) + p, + ) + if err != nil { + return err + } + } + } + + if p.conf.WebRTC { + if p.webRTCServer == nil { + p.webRTCServer, err = newWebRTCServer( + p.ctx, + p.conf.ExternalAuthenticationURL, + p.conf.WebRTCAddress, + p.conf.WebRTCServerKey, + p.conf.WebRTCServerCert, + p.conf.WebRTCAllowOrigin, + p.conf.WebRTCTrustedProxies, + p.conf.WebRTCICEServers, + p.conf.ReadBufferCount, + p.pathManager, + p.metrics, + p, + ) if err != nil { return err } @@ -398,7 +430,9 @@ func (p *Core) createResources(initial bool) error { p.rtmpServer, p.rtmpsServer, p.hlsServer, - p) + p.webRTCServer, + p, + ) if err != nil { return err } @@ -416,41 +450,29 @@ func (p *Core) createResources(initial bool) error { } func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { - closeLogger := false - if newConf == nil || + closeLogger := newConf == nil || !reflect.DeepEqual(newConf.LogDestinations, p.conf.LogDestinations) || - newConf.LogFile != p.conf.LogFile { - closeLogger = true - } + newConf.LogFile != p.conf.LogFile - closeMetrics := false - if newConf == nil || + closeMetrics := newConf == nil || newConf.Metrics != p.conf.Metrics || - newConf.MetricsAddress != p.conf.MetricsAddress { - closeMetrics = true - } + newConf.MetricsAddress != p.conf.MetricsAddress - closePPROF := false - if newConf == nil || + closePPROF := newConf == nil || newConf.PPROF != p.conf.PPROF || - newConf.PPROFAddress != p.conf.PPROFAddress { - closePPROF = true - } + newConf.PPROFAddress != p.conf.PPROFAddress - closePathManager := false - if newConf == nil || + closePathManager := newConf == nil || newConf.RTSPAddress != p.conf.RTSPAddress || newConf.ReadTimeout != p.conf.ReadTimeout || newConf.WriteTimeout != p.conf.WriteTimeout || newConf.ReadBufferCount != p.conf.ReadBufferCount || - closeMetrics { - closePathManager = true - } else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { + closeMetrics + if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { p.pathManager.confReload(newConf.Paths) } - closeRTSPServer := false - if newConf == nil || + closeRTSPServer := newConf == nil || newConf.RTSPDisable != p.conf.RTSPDisable || newConf.Encryption != p.conf.Encryption || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || @@ -470,12 +492,9 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || closeMetrics || - closePathManager { - closeRTSPServer = true - } + closePathManager - closeRTSPSServer := false - if newConf == nil || + closeRTSPSServer := newConf == nil || newConf.RTSPDisable != p.conf.RTSPDisable || newConf.Encryption != p.conf.Encryption || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || @@ -491,12 +510,9 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || closeMetrics || - closePathManager { - closeRTSPSServer = true - } + closePathManager - closeRTMPServer := false - if newConf == nil || + closeRTMPServer := newConf == nil || newConf.RTMPDisable != p.conf.RTMPDisable || newConf.RTMPEncryption != p.conf.RTMPEncryption || newConf.RTMPAddress != p.conf.RTMPAddress || @@ -508,12 +524,9 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || closeMetrics || - closePathManager { - closeRTMPServer = true - } + closePathManager - closeRTMPSServer := false - if newConf == nil || + closeRTMPSServer := newConf == nil || newConf.RTMPDisable != p.conf.RTMPDisable || newConf.RTMPEncryption != p.conf.RTMPEncryption || newConf.RTMPSAddress != p.conf.RTMPSAddress || @@ -527,14 +540,14 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || closeMetrics || - closePathManager { - closeRTMPSServer = true - } + closePathManager - closeHLSServer := false - if newConf == nil || + closeHLSServer := newConf == nil || newConf.HLSDisable != p.conf.HLSDisable || newConf.HLSAddress != p.conf.HLSAddress || + newConf.HLSEncryption != p.conf.HLSEncryption || + newConf.HLSServerKey != p.conf.HLSServerKey || + newConf.HLSServerCert != p.conf.HLSServerCert || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || newConf.HLSAlwaysRemux != p.conf.HLSAlwaysRemux || newConf.HLSVariant != p.conf.HLSVariant || @@ -543,27 +556,33 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.HLSPartDuration != p.conf.HLSPartDuration || newConf.HLSSegmentMaxSize != p.conf.HLSSegmentMaxSize || newConf.HLSAllowOrigin != p.conf.HLSAllowOrigin || - newConf.HLSEncryption != p.conf.HLSEncryption || - newConf.HLSServerKey != p.conf.HLSServerKey || - newConf.HLSServerCert != p.conf.HLSServerCert || !reflect.DeepEqual(newConf.HLSTrustedProxies, p.conf.HLSTrustedProxies) || newConf.ReadBufferCount != p.conf.ReadBufferCount || closePathManager || - closeMetrics { - closeHLSServer = true - } + closeMetrics + + closeWebrtcServer := newConf == nil || + newConf.WebRTC != p.conf.WebRTC || + newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || + newConf.WebRTCAddress != p.conf.WebRTCAddress || + newConf.WebRTCServerKey != p.conf.WebRTCServerKey || + newConf.WebRTCServerCert != p.conf.WebRTCServerCert || + newConf.WebRTCAllowOrigin != p.conf.WebRTCAllowOrigin || + !reflect.DeepEqual(newConf.WebRTCTrustedProxies, p.conf.WebRTCTrustedProxies) || + !reflect.DeepEqual(newConf.WebRTCICEServers, p.conf.WebRTCICEServers) || + newConf.ReadBufferCount != p.conf.ReadBufferCount || + closeMetrics || + closePathManager - closeAPI := false - if newConf == nil || + closeAPI := newConf == nil || newConf.API != p.conf.API || newConf.APIAddress != p.conf.APIAddress || closePathManager || closeRTSPServer || closeRTSPSServer || closeRTMPServer || - closeHLSServer { - closeAPI = true - } + closeHLSServer || + closeWebrtcServer if newConf == nil && p.confWatcher != nil { p.confWatcher.Close() @@ -594,6 +613,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { p.pathManager = nil } + if closeWebrtcServer && p.webRTCServer != nil { + p.webRTCServer.close() + p.webRTCServer = nil + } + if closeHLSServer && p.hlsServer != nil { p.hlsServer.close() p.hlsServer = nil @@ -632,7 +656,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { func (p *Core) reloadConf(newConf *conf.Conf, calledByAPI bool) error { p.closeResources(newConf, calledByAPI) - p.conf = newConf return p.createResources(false) } diff --git a/internal/core/formatprocessor.go b/internal/core/formatprocessor.go index 92386ed059a..ca714694cf8 100644 --- a/internal/core/formatprocessor.go +++ b/internal/core/formatprocessor.go @@ -16,12 +16,15 @@ func newFormatProcessor(forma format.Format, generateRTPPackets bool) (formatPro case *format.H265: return newFormatProcessorH265(forma, generateRTPPackets) + case *format.VP8: + return newFormatProcessorVP8(forma, generateRTPPackets) + + case *format.VP9: + return newFormatProcessorVP9(forma, generateRTPPackets) + case *format.MPEG4Audio: return newFormatProcessorMPEG4Audio(forma, generateRTPPackets) - case *format.Opus: - return newFormatProcessorOpus(forma, generateRTPPackets) - default: return newFormatProcessorGeneric(forma, generateRTPPackets) } diff --git a/internal/core/formatprocessor_mpeg4audio.go b/internal/core/formatprocessor_mpeg4audio.go index 58f89c9ea4c..f82fc94310e 100644 --- a/internal/core/formatprocessor_mpeg4audio.go +++ b/internal/core/formatprocessor_mpeg4audio.go @@ -45,7 +45,7 @@ func newFormatProcessorMPEG4Audio( return t, nil } -func (t *formatProcessorMPEG4Audio) process(dat data, hasNonRTSPReaders bool) error { +func (t *formatProcessorMPEG4Audio) process(dat data, hasNonRTSPReaders bool) error { //nolint:dupl tdata := dat.(*dataMPEG4Audio) if tdata.rtpPackets != nil { diff --git a/internal/core/formatprocessor_opus.go b/internal/core/formatprocessor_vp8.go similarity index 53% rename from internal/core/formatprocessor_opus.go rename to internal/core/formatprocessor_vp8.go index 9920924641d..cd771a27709 100644 --- a/internal/core/formatprocessor_opus.go +++ b/internal/core/formatprocessor_vp8.go @@ -5,36 +5,36 @@ import ( "time" "github.com/aler9/gortsplib/v2/pkg/format" - "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpsimpleaudio" + "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpvp8" "github.com/pion/rtp" ) -type dataOpus struct { +type dataVP8 struct { rtpPackets []*rtp.Packet ntp time.Time pts time.Duration - au []byte + frame []byte } -func (d *dataOpus) getRTPPackets() []*rtp.Packet { +func (d *dataVP8) getRTPPackets() []*rtp.Packet { return d.rtpPackets } -func (d *dataOpus) getNTP() time.Time { +func (d *dataVP8) getNTP() time.Time { return d.ntp } -type formatProcessorOpus struct { - format *format.Opus - encoder *rtpsimpleaudio.Encoder - decoder *rtpsimpleaudio.Decoder +type formatProcessorVP8 struct { + format *format.VP8 + encoder *rtpvp8.Encoder + decoder *rtpvp8.Decoder } -func newFormatProcessorOpus( - forma *format.Opus, +func newFormatProcessorVP8( + forma *format.VP8, allocateEncoder bool, -) (*formatProcessorOpus, error) { - t := &formatProcessorOpus{ +) (*formatProcessorVP8, error) { + t := &formatProcessorVP8{ format: forma, } @@ -45,18 +45,8 @@ func newFormatProcessorOpus( return t, nil } -func (t *formatProcessorOpus) generateRTPPackets(tdata *dataOpus) error { - pkt, err := t.encoder.Encode(tdata.au, tdata.pts) - if err != nil { - return err - } - - tdata.rtpPackets = []*rtp.Packet{pkt} - return nil -} - -func (t *formatProcessorOpus) process(dat data, hasNonRTSPReaders bool) error { - tdata := dat.(*dataOpus) +func (t *formatProcessorVP8) process(dat data, hasNonRTSPReaders bool) error { //nolint:dupl + tdata := dat.(*dataVP8) if tdata.rtpPackets != nil { pkt := tdata.rtpPackets[0] @@ -76,12 +66,15 @@ func (t *formatProcessorOpus) process(dat data, hasNonRTSPReaders bool) error { t.decoder = t.format.CreateDecoder() } - au, pts, err := t.decoder.Decode(pkt) + frame, pts, err := t.decoder.Decode(pkt) if err != nil { + if err == rtpvp8.ErrMorePacketsNeeded { + return nil + } return err } - tdata.au = au + tdata.frame = frame tdata.pts = pts } @@ -89,5 +82,11 @@ func (t *formatProcessorOpus) process(dat data, hasNonRTSPReaders bool) error { return nil } - return t.generateRTPPackets(tdata) + pkts, err := t.encoder.Encode(tdata.frame, tdata.pts) + if err != nil { + return err + } + + tdata.rtpPackets = pkts + return nil } diff --git a/internal/core/formatprocessor_vp9.go b/internal/core/formatprocessor_vp9.go new file mode 100644 index 00000000000..85e7ea84bdf --- /dev/null +++ b/internal/core/formatprocessor_vp9.go @@ -0,0 +1,92 @@ +package core + +import ( + "fmt" + "time" + + "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpvp9" + "github.com/pion/rtp" +) + +type dataVP9 struct { + rtpPackets []*rtp.Packet + ntp time.Time + pts time.Duration + frame []byte +} + +func (d *dataVP9) getRTPPackets() []*rtp.Packet { + return d.rtpPackets +} + +func (d *dataVP9) getNTP() time.Time { + return d.ntp +} + +type formatProcessorVP9 struct { + format *format.VP9 + encoder *rtpvp9.Encoder + decoder *rtpvp9.Decoder +} + +func newFormatProcessorVP9( + forma *format.VP9, + allocateEncoder bool, +) (*formatProcessorVP9, error) { + t := &formatProcessorVP9{ + format: forma, + } + + if allocateEncoder { + t.encoder = forma.CreateEncoder() + } + + return t, nil +} + +func (t *formatProcessorVP9) process(dat data, hasNonRTSPReaders bool) error { //nolint:dupl + tdata := dat.(*dataVP9) + + if tdata.rtpPackets != nil { + pkt := tdata.rtpPackets[0] + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > maxPacketSize { + return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), maxPacketSize) + } + + // decode from RTP + if hasNonRTSPReaders { + if t.decoder == nil { + t.decoder = t.format.CreateDecoder() + } + + frame, pts, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpvp9.ErrMorePacketsNeeded { + return nil + } + return err + } + + tdata.frame = frame + tdata.pts = pts + } + + // route packet as is + return nil + } + + pkts, err := t.encoder.Encode(tdata.frame, tdata.pts) + if err != nil { + return err + } + + tdata.rtpPackets = pkts + return nil +} diff --git a/internal/core/hls_index.html b/internal/core/hls_index.html new file mode 100644 index 00000000000..3c4be2c94d7 --- /dev/null +++ b/internal/core/hls_index.html @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index ee14596b1b2..53e9d86c442 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -3,6 +3,7 @@ package core import ( "bytes" "context" + _ "embed" "errors" "fmt" "net" @@ -27,73 +28,8 @@ const ( closeAfterInactivity = 60 * time.Second ) -const index = ` - - - - - - - - - - - - - - - -` +//go:embed hls_index.html +var hlsIndex []byte type hlsMuxerResponse struct { muxer *hlsMuxer @@ -294,9 +230,8 @@ func (m *hlsMuxer) run() { func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error { res := m.pathManager.readerAdd(pathReaderAddReq{ - author: m, - pathName: m.pathName, - authenticate: nil, + author: m, + pathName: m.pathName, }) if res.err != nil { return res.err @@ -487,7 +422,7 @@ func (m *hlsMuxer) handleRequest(req *hlsMuxerRequest) func() *hls.MuxerFileResp Header: map[string]string{ "Content-Type": `text/html`, }, - Body: bytes.NewReader([]byte(index)), + Body: bytes.NewReader(hlsIndex), } } } @@ -574,8 +509,8 @@ func (m *hlsMuxer) request(req *hlsMuxerRequest) { } } -// apiHLSMuxersList is called by api. -func (m *hlsMuxer) apiHLSMuxersList(req hlsServerAPIMuxersListSubReq) { +// apiMuxersList is called by api. +func (m *hlsMuxer) apiMuxersList(req hlsServerAPIMuxersListSubReq) { req.res = make(chan struct{}) select { case m.chAPIHLSMuxersList <- req: diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index 3bd4b6fbb9a..7aec08c8259 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -8,7 +8,6 @@ import ( "log" "net" "net/http" - "net/http/httputil" gopath "path" "strings" "sync" @@ -57,14 +56,14 @@ type hlsServerParent interface { type hlsServer struct { externalAuthenticationURL string - hlsAlwaysRemux bool - hlsVariant conf.HLSVariant - hlsSegmentCount int - hlsSegmentDuration conf.StringDuration - hlsPartDuration conf.StringDuration - hlsSegmentMaxSize conf.StringSize - hlsAllowOrigin string - hlsTrustedProxies conf.IPsOrCIDRs + alwaysRemux bool + variant conf.HLSVariant + segmentCount int + segmentDuration conf.StringDuration + partDuration conf.StringDuration + segmentMaxSize conf.StringSize + allowOrigin string + trustedProxies conf.IPsOrCIDRs readBufferCount int pathManager *pathManager metrics *metrics @@ -88,18 +87,18 @@ type hlsServer struct { func newHLSServer( parentCtx context.Context, address string, + encryption bool, + serverKey string, + serverCert string, externalAuthenticationURL string, - hlsAlwaysRemux bool, - hlsVariant conf.HLSVariant, - hlsSegmentCount int, - hlsSegmentDuration conf.StringDuration, - hlsPartDuration conf.StringDuration, - hlsSegmentMaxSize conf.StringSize, - hlsAllowOrigin string, - hlsEncryption bool, - hlsServerKey string, - hlsServerCert string, - hlsTrustedProxies conf.IPsOrCIDRs, + alwaysRemux bool, + variant conf.HLSVariant, + segmentCount int, + segmentDuration conf.StringDuration, + partDuration conf.StringDuration, + segmentMaxSize conf.StringSize, + allowOrigin string, + trustedProxies conf.IPsOrCIDRs, readBufferCount int, pathManager *pathManager, metrics *metrics, @@ -111,8 +110,8 @@ func newHLSServer( } var tlsConfig *tls.Config - if hlsEncryption { - crt, err := tls.LoadX509KeyPair(hlsServerCert, hlsServerKey) + if encryption { + crt, err := tls.LoadX509KeyPair(serverCert, serverKey) if err != nil { ln.Close() return nil, err @@ -127,14 +126,14 @@ func newHLSServer( s := &hlsServer{ externalAuthenticationURL: externalAuthenticationURL, - hlsAlwaysRemux: hlsAlwaysRemux, - hlsVariant: hlsVariant, - hlsSegmentCount: hlsSegmentCount, - hlsSegmentDuration: hlsSegmentDuration, - hlsPartDuration: hlsPartDuration, - hlsSegmentMaxSize: hlsSegmentMaxSize, - hlsAllowOrigin: hlsAllowOrigin, - hlsTrustedProxies: hlsTrustedProxies, + alwaysRemux: alwaysRemux, + variant: variant, + segmentCount: segmentCount, + segmentDuration: segmentDuration, + partDuration: partDuration, + segmentMaxSize: segmentMaxSize, + allowOrigin: allowOrigin, + trustedProxies: trustedProxies, readBufferCount: readBufferCount, pathManager: pathManager, parent: parent, @@ -180,10 +179,10 @@ func (s *hlsServer) run() { defer s.wg.Done() router := gin.New() - router.NoRoute(s.onRequest) + router.NoRoute(httpLoggerMiddleware(s), s.onRequest) - tmp := make([]string, len(s.hlsTrustedProxies)) - for i, entry := range s.hlsTrustedProxies { + tmp := make([]string, len(s.trustedProxies)) + for i, entry := range s.trustedProxies { tmp[i] = entry.String() } router.SetTrustedProxies(tmp) @@ -204,12 +203,12 @@ outer: for { select { case pa := <-s.chPathSourceReady: - if s.hlsAlwaysRemux { + if s.alwaysRemux { s.findOrCreateMuxer(pa.Name(), "", nil) } case pa := <-s.chPathSourceNotReady: - if s.hlsAlwaysRemux { + if s.alwaysRemux { c, ok := s.muxers[pa.Name()] if ok { c.close() @@ -226,7 +225,7 @@ outer: } delete(s.muxers, c.PathName()) - if s.hlsAlwaysRemux && c.remoteAddr == "" { + if s.alwaysRemux && c.remoteAddr == "" { s.findOrCreateMuxer(c.PathName(), "", nil) } @@ -259,16 +258,7 @@ outer: } func (s *hlsServer) onRequest(ctx *gin.Context) { - s.log(logger.Debug, "[conn %v] %s %s", ctx.ClientIP(), ctx.Request.Method, ctx.Request.URL.Path) - - byts, _ := httputil.DumpRequest(ctx.Request, true) - s.log(logger.Debug, "[conn %v] [c->s] %s", ctx.ClientIP(), string(byts)) - - logw := &httpLogWriter{ResponseWriter: ctx.Writer} - ctx.Writer = logw - - ctx.Writer.Header().Set("Server", "rtsp-simple-server") - ctx.Writer.Header().Set("Access-Control-Allow-Origin", s.hlsAllowOrigin) + ctx.Writer.Header().Set("Access-Control-Allow-Origin", s.allowOrigin) ctx.Writer.Header().Set("Access-Control-Allow-Credentials", "true") switch ctx.Request.Method { @@ -281,7 +271,6 @@ func (s *hlsServer) onRequest(ctx *gin.Context) { return default: - ctx.Writer.WriteHeader(http.StatusNotFound) return } @@ -290,7 +279,6 @@ func (s *hlsServer) onRequest(ctx *gin.Context) { switch pa { case "", "favicon.ico": - ctx.Writer.WriteHeader(http.StatusNotFound) return } @@ -336,8 +324,6 @@ func (s *hlsServer) onRequest(ctx *gin.Context) { case <-s.ctx.Done(): } - - s.log(logger.Debug, "[conn %v] [s->c] %s", ctx.ClientIP(), logw.dump()) } func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *hlsMuxerRequest) *hlsMuxer { @@ -348,11 +334,11 @@ func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *h pathName, remoteAddr, s.externalAuthenticationURL, - s.hlsVariant, - s.hlsSegmentCount, - s.hlsSegmentDuration, - s.hlsPartDuration, - s.hlsSegmentMaxSize, + s.variant, + s.segmentCount, + s.segmentDuration, + s.partDuration, + s.segmentMaxSize, s.readBufferCount, req, &s.wg, @@ -390,8 +376,8 @@ func (s *hlsServer) pathSourceNotReady(pa *path) { } } -// apiHLSMuxersList is called by api. -func (s *hlsServer) apiHLSMuxersList() hlsServerAPIMuxersListRes { +// apiMuxersList is called by api. +func (s *hlsServer) apiMuxersList() hlsServerAPIMuxersListRes { req := hlsServerAPIMuxersListReq{ res: make(chan hlsServerAPIMuxersListRes), } @@ -405,7 +391,7 @@ func (s *hlsServer) apiHLSMuxersList() hlsServerAPIMuxersListRes { } for _, pa := range res.muxers { - pa.apiHLSMuxersList(hlsServerAPIMuxersListSubReq{data: res.data}) + pa.apiMuxersList(hlsServerAPIMuxersListSubReq{data: res.data}) } return res diff --git a/internal/core/http_logger.go b/internal/core/http_logger.go new file mode 100644 index 00000000000..ab67223a6fe --- /dev/null +++ b/internal/core/http_logger.go @@ -0,0 +1,60 @@ +package core + +import ( + "bytes" + "fmt" + "net/http" + "net/http/httputil" + + "github.com/gin-gonic/gin" + + "github.com/aler9/rtsp-simple-server/internal/logger" +) + +type httpLoggerWriter struct { + gin.ResponseWriter + buf bytes.Buffer +} + +func (w *httpLoggerWriter) Write(b []byte) (int, error) { + w.buf.Write(b) + return w.ResponseWriter.Write(b) +} + +func (w *httpLoggerWriter) WriteString(s string) (int, error) { + w.buf.WriteString(s) + return w.ResponseWriter.WriteString(s) +} + +func (w *httpLoggerWriter) dump() string { + var buf bytes.Buffer + fmt.Fprintf(&buf, "%s %d %s\n", "HTTP/1.1", w.ResponseWriter.Status(), http.StatusText(w.ResponseWriter.Status())) + w.ResponseWriter.Header().Write(&buf) + buf.Write([]byte("\n")) + if w.buf.Len() > 0 { + fmt.Fprintf(&buf, "(body of %d bytes)", w.buf.Len()) + } + return buf.String() +} + +type httpLoggerParent interface { + log(logger.Level, string, ...interface{}) +} + +func httpLoggerMiddleware(p httpLoggerParent) func(*gin.Context) { + return func(ctx *gin.Context) { + p.log(logger.Debug, "[conn %v] %s %s", ctx.ClientIP(), ctx.Request.Method, ctx.Request.URL.Path) + + byts, _ := httputil.DumpRequest(ctx.Request, true) + p.log(logger.Debug, "[conn %v] [c->s] %s", ctx.ClientIP(), string(byts)) + + logw := &httpLoggerWriter{ResponseWriter: ctx.Writer} + ctx.Writer = logw + + ctx.Writer.Header().Set("Server", "rtsp-simple-server") + + ctx.Next() + + p.log(logger.Debug, "[conn %v] [s->c] %s", ctx.ClientIP(), logw.dump()) + } +} diff --git a/internal/core/httplogwriter.go b/internal/core/httplogwriter.go deleted file mode 100644 index 44b064d6581..00000000000 --- a/internal/core/httplogwriter.go +++ /dev/null @@ -1,35 +0,0 @@ -package core - -import ( - "bytes" - "fmt" - "net/http" - - "github.com/gin-gonic/gin" -) - -type httpLogWriter struct { - gin.ResponseWriter - buf bytes.Buffer -} - -func (w *httpLogWriter) Write(b []byte) (int, error) { - w.buf.Write(b) - return w.ResponseWriter.Write(b) -} - -func (w *httpLogWriter) WriteString(s string) (int, error) { - w.buf.WriteString(s) - return w.ResponseWriter.WriteString(s) -} - -func (w *httpLogWriter) dump() string { - var buf bytes.Buffer - fmt.Fprintf(&buf, "%s %d %s\n", "HTTP/1.1", w.ResponseWriter.Status(), http.StatusText(w.ResponseWriter.Status())) - w.ResponseWriter.Header().Write(&buf) - buf.Write([]byte("\n")) - if w.buf.Len() > 0 { - fmt.Fprintf(&buf, "(body of %d bytes)", w.buf.Len()) - } - return buf.String() -} diff --git a/internal/core/metrics.go b/internal/core/metrics.go index 013d4d88781..a34f71bccfb 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -17,23 +17,6 @@ func metric(key string, value int64) string { return key + " " + strconv.FormatInt(value, 10) + "\n" } -type metricsPathManager interface { - apiPathsList() pathAPIPathsListRes -} - -type metricsRTSPServer interface { - apiConnsList() rtspServerAPIConnsListRes - apiSessionsList() rtspServerAPISessionsListRes -} - -type metricsRTMPServer interface { - apiConnsList() rtmpServerAPIConnsListRes -} - -type metricsHLSServer interface { - apiHLSMuxersList() hlsServerAPIMuxersListRes -} - type metricsParent interface { Log(logger.Level, string, ...interface{}) } @@ -41,14 +24,15 @@ type metricsParent interface { type metrics struct { parent metricsParent - ln net.Listener - server *http.Server - mutex sync.Mutex - pathManager metricsPathManager - rtspServer metricsRTSPServer - rtspsServer metricsRTSPServer - rtmpServer metricsRTMPServer - hlsServer metricsHLSServer + ln net.Listener + server *http.Server + mutex sync.Mutex + pathManager apiPathManager + rtspServer apiRTSPServer + rtspsServer apiRTSPServer + rtmpServer apiRTMPServer + hlsServer apiHLSServer + webRTCServer apiWebRTCServer } func newMetrics( @@ -107,6 +91,17 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } } + if !interfaceIsEmpty(m.hlsServer) { + res := m.hlsServer.apiMuxersList() + if res.err == nil { + for name, i := range res.data.Items { + tags := "{name=\"" + name + "\"}" + out += metric("hls_muxers"+tags, 1) + out += metric("hls_muxers_bytes_sent"+tags, int64(i.BytesSent)) + } + } + } + if !interfaceIsEmpty(m.rtspServer) { //nolint:dupl func() { res := m.rtspServer.apiConnsList() @@ -171,13 +166,14 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } } - if !interfaceIsEmpty(m.hlsServer) { - res := m.hlsServer.apiHLSMuxersList() + if !interfaceIsEmpty(m.webRTCServer) { + res := m.webRTCServer.apiConnsList() if res.err == nil { - for name, i := range res.data.Items { - tags := "{name=\"" + name + "\"}" - out += metric("hls_muxers"+tags, 1) - out += metric("hls_muxers_bytes_sent"+tags, int64(i.BytesSent)) + for id, i := range res.data.Items { + tags := "{id=\"" + id + "\"}" + out += metric("webrtc_conns"+tags, 1) + out += metric("webrtc_conns_bytes_received"+tags, int64(i.BytesReceived)) + out += metric("webrtc_conns_bytes_sent"+tags, int64(i.BytesSent)) } } } @@ -187,36 +183,43 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } // pathManagerSet is called by pathManager. -func (m *metrics) pathManagerSet(s metricsPathManager) { +func (m *metrics) pathManagerSet(s apiPathManager) { m.mutex.Lock() defer m.mutex.Unlock() m.pathManager = s } +// hlsServerSet is called by hlsServer. +func (m *metrics) hlsServerSet(s apiHLSServer) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.hlsServer = s +} + // rtspServerSet is called by rtspServer (plain). -func (m *metrics) rtspServerSet(s metricsRTSPServer) { +func (m *metrics) rtspServerSet(s apiRTSPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtspServer = s } // rtspsServerSet is called by rtspServer (tls). -func (m *metrics) rtspsServerSet(s metricsRTSPServer) { +func (m *metrics) rtspsServerSet(s apiRTSPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtspsServer = s } // rtmpServerSet is called by rtmpServer. -func (m *metrics) rtmpServerSet(s metricsRTMPServer) { +func (m *metrics) rtmpServerSet(s apiRTMPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtmpServer = s } -// hlsServerSet is called by hlsServer. -func (m *metrics) hlsServerSet(s metricsHLSServer) { +// webRTCServerSet is called by webRTCServer. +func (m *metrics) webRTCServerSet(s apiWebRTCServer) { m.mutex.Lock() defer m.mutex.Unlock() - m.hlsServer = s + m.webRTCServer = s } diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go index cdeb49f9686..7ee1eb50ecc 100644 --- a/internal/core/metrics_test.go +++ b/internal/core/metrics_test.go @@ -27,6 +27,9 @@ func TestMetrics(t *testing.T) { defer os.Remove(serverKeyFpath) p, ok := newInstance("metrics: yes\n" + + "webrtc: yes\n" + + "webrtcServerCert: " + serverCertFpath + "\n" + + "webrtcServerKey: " + serverKeyFpath + "\n" + "encryption: optional\n" + "serverCert: " + serverCertFpath + "\n" + "serverKey: " + serverKeyFpath + "\n" + @@ -99,6 +102,8 @@ func TestMetrics(t *testing.T) { `paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+ `paths\{name=".*?",state="ready"\} 1`+"\n"+ `paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+ + `hls_muxers\{name="rtsp_path"\} 1`+"\n"+ + `hls_muxers_bytes_sent\{name="rtsp_path"\} [0-9]+`+"\n"+ `rtsp_conns\{id=".*?"\} 1`+"\n"+ `rtsp_conns_bytes_received\{id=".*?"\} [0-9]+`+"\n"+ `rtsp_conns_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+ @@ -114,7 +119,6 @@ func TestMetrics(t *testing.T) { `rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+ `rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ `rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - `hls_muxers\{name="rtsp_path"\} 1`+"\n"+ - `hls_muxers_bytes_sent\{name="rtsp_path"\} [0-9]+`+"\n"+"$", + "$", string(bo)) } diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 7cff9b2f0ec..3b79113b056 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -180,13 +180,15 @@ outer: continue } - err = req.authenticate( - pathConf.ReadIPs, - pathConf.ReadUser, - pathConf.ReadPass) - if err != nil { - req.res <- pathDescribeRes{err: err} - continue + if req.authenticate != nil { + err = req.authenticate( + pathConf.ReadIPs, + pathConf.ReadUser, + pathConf.ReadPass) + if err != nil { + req.res <- pathDescribeRes{err: err} + continue + } } // create path if it doesn't exist @@ -352,12 +354,18 @@ func (pm *pathManager) describe(req pathDescribeReq) pathDescribeRes { req.res = make(chan pathDescribeRes) select { case pm.chDescribe <- req: - res := <-req.res - if res.err != nil { - return res + res1 := <-req.res + if res1.err != nil { + return res1 + } + + res2 := res1.path.describe(req) + if res2.err != nil { + return res2 } - return res.path.describe(req) + res2.path = res1.path + return res2 case <-pm.ctx.Done(): return pathDescribeRes{err: fmt.Errorf("terminated")} diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 1bfc2c5913c..9d154f74971 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -72,11 +72,11 @@ type rtmpConn struct { pathManager rtmpConnPathManager parent rtmpConnParent - ctx context.Context - ctxCancel func() - uuid uuid.UUID - created time.Time - path *path + ctx context.Context + ctxCancel func() + uuid uuid.UUID + created time.Time + // path *path state rtmpConnState stateMutex sync.Mutex } @@ -153,46 +153,44 @@ func (c *rtmpConn) safeState() rtmpConnState { func (c *rtmpConn) run() { defer c.wg.Done() - err := func() error { - if c.runOnConnect != "" { - c.log(logger.Info, "runOnConnect command started") - _, port, _ := net.SplitHostPort(c.rtspAddress) - onConnectCmd := externalcmd.NewCmd( - c.externalCmdPool, - c.runOnConnect, - c.runOnConnectRestart, - externalcmd.Environment{ - "RTSP_PATH": "", - "RTSP_PORT": port, - }, - func(co int) { - c.log(logger.Info, "runOnConnect command exited with code %d", co) - }) - - defer func() { - onConnectCmd.Close() - c.log(logger.Info, "runOnConnect command stopped") - }() - } + if c.runOnConnect != "" { + c.log(logger.Info, "runOnConnect command started") + _, port, _ := net.SplitHostPort(c.rtspAddress) + onConnectCmd := externalcmd.NewCmd( + c.externalCmdPool, + c.runOnConnect, + c.runOnConnectRestart, + externalcmd.Environment{ + "RTSP_PATH": "", + "RTSP_PORT": port, + }, + func(co int) { + c.log(logger.Info, "runOnConnect command exited with code %d", co) + }) - ctx, cancel := context.WithCancel(c.ctx) - runErr := make(chan error) - go func() { - runErr <- c.runInner(ctx) + defer func() { + onConnectCmd.Close() + c.log(logger.Info, "runOnConnect command stopped") }() + } - select { - case err := <-runErr: - cancel() - return err - - case <-c.ctx.Done(): - cancel() - <-runErr - return errors.New("terminated") - } + ctx, cancel := context.WithCancel(c.ctx) + runErr := make(chan error) + go func() { + runErr <- c.runInner(ctx) }() + var err error + select { + case err = <-runErr: + cancel() + + case <-c.ctx.Done(): + cancel() + <-runErr + err = errors.New("terminated") + } + c.ctxCancel() c.parent.connClose(c) @@ -243,10 +241,10 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { return res.err } - c.path = res.path + path := res.path defer func() { - c.path.readerRemove(pathReaderRemoveReq{author: c}) + path.readerRemove(pathReaderRemoveReq{author: c}) }() c.stateMutex.Lock() @@ -288,15 +286,15 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { defer res.stream.readerRemove(c) c.log(logger.Info, "is reading from path '%s', %s", - c.path.Name(), sourceMediaInfo(medias)) + path.Name(), sourceMediaInfo(medias)) - if c.path.Conf().RunOnRead != "" { + if path.Conf().RunOnRead != "" { c.log(logger.Info, "runOnRead command started") onReadCmd := externalcmd.NewCmd( c.externalCmdPool, - c.path.Conf().RunOnRead, - c.path.Conf().RunOnReadRestart, - c.path.externalCmdEnv(), + path.Conf().RunOnRead, + path.Conf().RunOnReadRestart, + path.externalCmdEnv(), func(co int) { c.log(logger.Info, "runOnRead command exited with code %d", co) }) @@ -477,10 +475,10 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { return res.err } - c.path = res.path + path := res.path defer func() { - c.path.publisherRemove(pathPublisherRemoveReq{author: c}) + path.publisherRemove(pathPublisherRemoveReq{author: c}) }() c.stateMutex.Lock() @@ -512,7 +510,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { medias = append(medias, audioMedia) } - rres := c.path.publisherStart(pathPublisherStartReq{ + rres := path.publisherStart(pathPublisherStartReq{ author: c, medias: medias, generateRTPPackets: true, @@ -522,7 +520,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { } c.log(logger.Info, "is publishing to path '%s', %s", - c.path.Name(), + path.Name(), sourceMediaInfo(medias)) // disable write deadline to allow outgoing acknowledges diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index 1b14d3558e9..00e72fb714a 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -214,9 +214,6 @@ outer: s.conns[c] = struct{}{} case c := <-s.chConnClose: - if _, ok := s.conns[c]; !ok { - continue - } delete(s.conns, c) case req := <-s.chAPIConnsList: diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index 2a391e247c0..dfaac411c0d 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -180,11 +180,11 @@ func newRTSPServer( s.log(logger.Info, "listener opened on %s", printAddresses(s.srv)) - if s.metrics != nil { + if metrics != nil { if !isTLS { - s.metrics.rtspServerSet(s) + metrics.rtspServerSet(s) } else { - s.metrics.rtspsServerSet(s) + metrics.rtspsServerSet(s) } } diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 61e5c11510a..9734fc7a3ac 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -330,9 +330,9 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R } }) - case *format.MPEG4Audio: + case *format.VP8: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &dataMPEG4Audio{ + err := s.stream.writeData(cmedia, cformat, &dataVP8{ rtpPackets: []*rtp.Packet{pkt}, ntp: time.Now(), }) @@ -341,9 +341,20 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R } }) - case *format.Opus: + case *format.VP9: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := s.stream.writeData(cmedia, cformat, &dataOpus{ + err := s.stream.writeData(cmedia, cformat, &dataVP9{ + rtpPackets: []*rtp.Packet{pkt}, + ntp: time.Now(), + }) + if err != nil { + s.log(logger.Warn, "%v", err) + } + }) + + case *format.MPEG4Audio: + ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + err := s.stream.writeData(cmedia, cformat, &dataMPEG4Audio{ rtpPackets: []*rtp.Packet{pkt}, ntp: time.Now(), }) diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index 1ba14d51f20..1c107ae6b4a 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -170,9 +170,9 @@ func (s *rtspSource) run(ctx context.Context) error { } }) - case *format.MPEG4Audio: + case *format.VP8: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &dataMPEG4Audio{ + err := res.stream.writeData(cmedia, cformat, &dataVP8{ rtpPackets: []*rtp.Packet{pkt}, ntp: time.Now(), }) @@ -181,9 +181,20 @@ func (s *rtspSource) run(ctx context.Context) error { } }) - case *format.Opus: + case *format.VP9: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - err := res.stream.writeData(cmedia, cformat, &dataOpus{ + err := res.stream.writeData(cmedia, cformat, &dataVP9{ + rtpPackets: []*rtp.Packet{pkt}, + ntp: time.Now(), + }) + if err != nil { + s.Log(logger.Warn, "%v", err) + } + }) + + case *format.MPEG4Audio: + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + err := res.stream.writeData(cmedia, cformat, &dataMPEG4Audio{ rtpPackets: []*rtp.Packet{pkt}, ntp: time.Now(), }) diff --git a/internal/core/webrtc_conn.go b/internal/core/webrtc_conn.go new file mode 100644 index 00000000000..d5435c99bff --- /dev/null +++ b/internal/core/webrtc_conn.go @@ -0,0 +1,740 @@ +package core + +import ( + "context" + "crypto/hmac" + "crypto/sha1" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "math/rand" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtph264" + "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpvp8" + "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpvp9" + "github.com/aler9/gortsplib/v2/pkg/h264" + "github.com/aler9/gortsplib/v2/pkg/media" + "github.com/aler9/gortsplib/v2/pkg/ringbuffer" + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/pion/webrtc/v3" + + "github.com/aler9/rtsp-simple-server/internal/conf" + "github.com/aler9/rtsp-simple-server/internal/logger" +) + +type webRTCTrack struct { + media *media.Media + format format.Format + webRTCTrack *webrtc.TrackLocalStaticRTP + cb func(data, context.Context, chan error) +} + +func gatherMedias(tracks []*webRTCTrack) media.Medias { + var ret media.Medias + + for _, track := range tracks { + ret = append(ret, track.media) + } + + return ret +} + +type webRTCConnPathManager interface { + readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes +} + +type webRTCConnParent interface { + log(logger.Level, string, ...interface{}) + connClose(*webRTCConn) +} + +type webRTCConn struct { + readBufferCount int + pathName string + wsconn *websocket.Conn + iceServers []string + wg *sync.WaitGroup + pathManager webRTCConnPathManager + parent webRTCConnParent + + ctx context.Context + ctxCancel func() + uuid uuid.UUID + created time.Time + curPC *webrtc.PeerConnection + mutex sync.RWMutex +} + +func newWebRTCConn( + parentCtx context.Context, + readBufferCount int, + pathName string, + wsconn *websocket.Conn, + iceServers []string, + wg *sync.WaitGroup, + pathManager webRTCConnPathManager, + parent webRTCConnParent, +) *webRTCConn { + ctx, ctxCancel := context.WithCancel(parentCtx) + + c := &webRTCConn{ + readBufferCount: readBufferCount, + pathName: pathName, + wsconn: wsconn, + iceServers: iceServers, + wg: wg, + pathManager: pathManager, + parent: parent, + ctx: ctx, + ctxCancel: ctxCancel, + uuid: uuid.New(), + created: time.Now(), + } + + c.log(logger.Info, "opened") + + wg.Add(1) + go c.run() + + return c +} + +func (c *webRTCConn) close() { + c.ctxCancel() +} + +func (c *webRTCConn) remoteAddr() net.Addr { + return c.wsconn.RemoteAddr() +} + +func (c *webRTCConn) bytesReceived() uint64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + for _, stats := range c.curPC.GetStats() { + if tstats, ok := stats.(webrtc.TransportStats); ok { + if tstats.ID == "iceTransport" { + return tstats.BytesReceived + } + } + } + return 0 +} + +func (c *webRTCConn) bytesSent() uint64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + for _, stats := range c.curPC.GetStats() { + if tstats, ok := stats.(webrtc.TransportStats); ok { + if tstats.ID == "iceTransport" { + return tstats.BytesSent + } + } + } + return 0 +} + +func (c *webRTCConn) log(level logger.Level, format string, args ...interface{}) { + c.parent.log(level, "[conn %v] "+format, append([]interface{}{c.wsconn.RemoteAddr()}, args...)...) +} + +func (c *webRTCConn) run() { + defer c.wg.Done() + + innerCtx, innerCtxCancel := context.WithCancel(c.ctx) + runErr := make(chan error) + go func() { + runErr <- c.runInner(innerCtx) + }() + + var err error + select { + case err = <-runErr: + innerCtxCancel() + + case <-c.ctx.Done(): + innerCtxCancel() + <-runErr + err = errors.New("terminated") + } + + c.ctxCancel() + + c.parent.connClose(c) + + c.log(logger.Info, "closed (%v)", err) +} + +func (c *webRTCConn) runInner(ctx context.Context) error { + go func() { + <-ctx.Done() + c.wsconn.Close() + }() + + res := c.pathManager.readerAdd(pathReaderAddReq{ + author: c, + pathName: c.pathName, + authenticate: func( + pathIPs []fmt.Stringer, + pathUser conf.Credential, + pathPass conf.Credential, + ) error { + return nil + }, + }) + if res.err != nil { + return res.err + } + + path := res.path + + defer func() { + path.readerRemove(pathReaderRemoveReq{author: c}) + }() + + tracks, err := c.allocateTracks(res.stream.medias()) + if err != nil { + return err + } + + // maximum deadline to complete the handshake + c.wsconn.SetReadDeadline(time.Now().Add(10 * time.Second)) + c.wsconn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + + err = c.writeICEServers(c.genICEServers()) + if err != nil { + return err + } + + offer, err := c.readOffer() + if err != nil { + return err + } + + pc, err := webrtc.NewPeerConnection(webrtc.Configuration{ + ICEServers: c.genICEServers(), + }) + if err != nil { + return err + } + defer pc.Close() + + c.mutex.Lock() + c.curPC = pc + c.mutex.Unlock() + + for _, track := range tracks { + _, err = pc.AddTrack(track.webRTCTrack) + if err != nil { + return err + } + } + + outgoingCandidate := make(chan *webrtc.ICECandidate) + pcConnected := make(chan struct{}) + pcDisconnected := make(chan struct{}) + + pc.OnICECandidate(func(i *webrtc.ICECandidate) { + if i != nil { + select { + case outgoingCandidate <- i: + case <-pcConnected: + case <-ctx.Done(): + } + } + }) + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + c.log(logger.Debug, "peer connection state: "+state.String()) + + switch state { + case webrtc.PeerConnectionStateConnected: + close(pcConnected) + + case webrtc.PeerConnectionStateDisconnected: + close(pcDisconnected) + } + }) + + err = pc.SetRemoteDescription(*offer) + if err != nil { + return err + } + + answer, err := pc.CreateAnswer(nil) + if err != nil { + return err + } + + err = pc.SetLocalDescription(answer) + if err != nil { + return err + } + + err = c.writeAnswer(&answer) + if err != nil { + return err + } + + readError := make(chan error) + incomingCandidate := make(chan *webrtc.ICECandidateInit) + + go func() { + for { + candidate, err := c.readCandidate() + if err != nil { + select { + case readError <- err: + case <-pcConnected: + case <-ctx.Done(): + } + return + } + + select { + case incomingCandidate <- candidate: + case <-pcConnected: + case <-ctx.Done(): + } + } + }() + +outer: + for { + select { + case candidate := <-outgoingCandidate: + c.writeCandidate(candidate) + + case candidate := <-incomingCandidate: + err = pc.AddICECandidate(*candidate) + if err != nil { + return err + } + + case err := <-readError: + return err + + case <-pcConnected: + break outer + + case <-ctx.Done(): + return fmt.Errorf("terminated") + } + } + + c.log(logger.Info, "peer connection established") + c.wsconn.Close() + + ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount)) + defer ringBuffer.Close() + + writeError := make(chan error) + + for _, track := range tracks { + res.stream.readerAdd(c, track.media, track.format, func(dat data) { + ringBuffer.Push(func() { + track.cb(dat, ctx, writeError) + }) + }) + } + defer res.stream.readerRemove(c) + + c.log(logger.Info, "is reading from path '%s', %s", + path.Name(), sourceMediaInfo(gatherMedias(tracks))) + + go func() { + for { + item, ok := ringBuffer.Pull() + if !ok { + return + } + item.(func())() + } + }() + + select { + case <-pcDisconnected: + return fmt.Errorf("peer connection closed") + + case err := <-writeError: + return err + + case <-ctx.Done(): + return fmt.Errorf("terminated") + } +} + +func (c *webRTCConn) allocateTracks(medias media.Medias) ([]*webRTCTrack, error) { + var ret []*webRTCTrack + + var vp9Format *format.VP9 + vp9Media := medias.FindFormat(&vp9Format) + + if vp9Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: uint32(vp9Format.ClockRate()), + }, + "vp9", + "rtspss", + ) + if err != nil { + return nil, err + } + + encoder := &rtpvp9.Encoder{ + PayloadType: 96, + PayloadMaxSize: 1200, + } + encoder.Init() + + ret = append(ret, &webRTCTrack{ + media: vp9Media, + format: vp9Format, + webRTCTrack: webRTCTrak, + cb: func(dat data, ctx context.Context, writeError chan error) { + tdata := dat.(*dataVP9) + + if tdata.frame == nil { + return + } + + packets, err := encoder.Encode(tdata.frame, tdata.pts) + if err != nil { + return + } + + for _, pkt := range packets { + webRTCTrak.WriteRTP(pkt) + } + }, + }) + } + + var vp8Format *format.VP8 + + if vp9Format == nil { + vp8Media := medias.FindFormat(&vp8Format) + + if vp8Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, + ClockRate: uint32(vp8Format.ClockRate()), + }, + "vp8", + "rtspss", + ) + if err != nil { + return nil, err + } + + encoder := &rtpvp8.Encoder{ + PayloadType: 96, + PayloadMaxSize: 1200, + } + encoder.Init() + + ret = append(ret, &webRTCTrack{ + media: vp8Media, + format: vp8Format, + webRTCTrack: webRTCTrak, + cb: func(dat data, ctx context.Context, writeError chan error) { + tdata := dat.(*dataVP8) + + if tdata.frame == nil { + return + } + + packets, err := encoder.Encode(tdata.frame, tdata.pts) + if err != nil { + return + } + + for _, pkt := range packets { + webRTCTrak.WriteRTP(pkt) + } + }, + }) + } + } + + if vp9Format == nil && vp8Format == nil { + var h264Format *format.H264 + h264Media := medias.FindFormat(&h264Format) + + if h264Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: uint32(h264Format.ClockRate()), + }, + "h264", + "rtspss", + ) + if err != nil { + return nil, err + } + + encoder := &rtph264.Encoder{ + PayloadType: 96, + PayloadMaxSize: 1200, + } + encoder.Init() + + var lastPTS time.Duration + firstNALUReceived := false + + ret = append(ret, &webRTCTrack{ + media: h264Media, + format: h264Format, + webRTCTrack: webRTCTrak, + cb: func(dat data, ctx context.Context, writeError chan error) { + tdata := dat.(*dataH264) + + if tdata.nalus == nil { + return + } + + if !firstNALUReceived { + if !h264.IDRPresent(tdata.nalus) { + return + } + + firstNALUReceived = true + lastPTS = tdata.pts + } else { + if tdata.pts < lastPTS { + select { + case writeError <- fmt.Errorf("WebRTC doesn't support H264 streams with B-frames"): + case <-ctx.Done(): + } + return + } + lastPTS = tdata.pts + } + + packets, err := encoder.Encode(tdata.nalus, tdata.pts) + if err != nil { + return + } + + for _, pkt := range packets { + webRTCTrak.WriteRTP(pkt) + } + }, + }) + } + } + + var opusFormat *format.Opus + opusMedia := medias.FindFormat(&opusFormat) + + if opusFormat != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: uint32(opusFormat.ClockRate()), + }, + "opus", + "rtspss", + ) + if err != nil { + return nil, err + } + + ret = append(ret, &webRTCTrack{ + media: opusMedia, + format: opusFormat, + webRTCTrack: webRTCTrak, + cb: func(dat data, ctx context.Context, writeError chan error) { + for _, pkt := range dat.getRTPPackets() { + webRTCTrak.WriteRTP(pkt) + } + }, + }) + } + + var g722Format *format.G722 + + if opusFormat == nil { + g722Media := medias.FindFormat(&g722Format) + + if g722Format != nil { + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeG722, + ClockRate: uint32(g722Format.ClockRate()), + }, + "g722", + "rtspss", + ) + if err != nil { + return nil, err + } + + ret = append(ret, &webRTCTrack{ + media: g722Media, + format: g722Format, + webRTCTrack: webRTCTrak, + cb: func(dat data, ctx context.Context, writeError chan error) { + for _, pkt := range dat.getRTPPackets() { + webRTCTrak.WriteRTP(pkt) + } + }, + }) + } + } + + var g711Format *format.G711 + + if opusFormat == nil && g722Format == nil { + g711Media := medias.FindFormat(&g711Format) + + if g711Format != nil { + var mtyp string + if g711Format.MULaw { + mtyp = webrtc.MimeTypePCMU + } else { + mtyp = webrtc.MimeTypePCMA + } + + webRTCTrak, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{ + MimeType: mtyp, + ClockRate: uint32(g711Format.ClockRate()), + }, + "g711", + "rtspss", + ) + if err != nil { + return nil, err + } + + ret = append(ret, &webRTCTrack{ + media: g711Media, + format: g711Format, + webRTCTrack: webRTCTrak, + cb: func(dat data, ctx context.Context, writeError chan error) { + for _, pkt := range dat.getRTPPackets() { + webRTCTrak.WriteRTP(pkt) + } + }, + }) + } + } + + if ret == nil { + return nil, fmt.Errorf( + "the stream doesn't contain any supported codec (which are currently VP9, VP8, H264, Opus, G722, G711)") + } + + return ret, nil +} + +func (c *webRTCConn) genICEServers() []webrtc.ICEServer { + ret := make([]webrtc.ICEServer, len(c.iceServers)) + for i, s := range c.iceServers { + parts := strings.Split(s, ":") + if len(parts) == 5 { + if parts[1] == "AUTH_SECRET" { + s := webrtc.ICEServer{ + URLs: []string{parts[0] + ":" + parts[3] + ":" + parts[4]}, + } + + randomUser := func() string { + const charset = "abcdefghijklmnopqrstuvwxyz1234567890" + b := make([]byte, 20) + for i := range b { + b[i] = charset[rand.Intn(len(charset))] + } + return string(b) + }() + + expireDate := time.Now().Add(24 * 3600 * time.Second).Unix() + s.Username = strconv.FormatInt(expireDate, 10) + ":" + randomUser + + h := hmac.New(sha1.New, []byte(parts[2])) + h.Write([]byte(s.Username)) + s.Credential = base64.StdEncoding.EncodeToString(h.Sum(nil)) + + ret[i] = s + } else { + ret[i] = webrtc.ICEServer{ + URLs: []string{parts[0] + ":" + parts[3] + ":" + parts[4]}, + Username: parts[1], + Credential: parts[2], + } + } + } else { + ret[i] = webrtc.ICEServer{ + URLs: []string{s}, + } + } + } + return ret +} + +func (c *webRTCConn) writeICEServers(iceServers []webrtc.ICEServer) error { + enc, _ := json.Marshal(iceServers) + return c.wsconn.WriteMessage(websocket.TextMessage, enc) +} + +func (c *webRTCConn) readOffer() (*webrtc.SessionDescription, error) { + _, enc, err := c.wsconn.ReadMessage() + if err != nil { + return nil, err + } + + var offer webrtc.SessionDescription + err = json.Unmarshal(enc, &offer) + if err != nil { + return nil, err + } + + if offer.Type != webrtc.SDPTypeOffer { + return nil, fmt.Errorf("received SDP is not an offer") + } + + return &offer, nil +} + +func (c *webRTCConn) writeAnswer(answer *webrtc.SessionDescription) error { + enc, _ := json.Marshal(answer) + return c.wsconn.WriteMessage(websocket.TextMessage, enc) +} + +func (c *webRTCConn) writeCandidate(candidate *webrtc.ICECandidate) error { + enc, _ := json.Marshal(candidate.ToJSON()) + return c.wsconn.WriteMessage(websocket.TextMessage, enc) +} + +func (c *webRTCConn) readCandidate() (*webrtc.ICECandidateInit, error) { + _, enc, err := c.wsconn.ReadMessage() + if err != nil { + return nil, err + } + + var candidate webrtc.ICECandidateInit + err = json.Unmarshal(enc, &candidate) + if err != nil { + return nil, err + } + + return &candidate, err +} + +// apiReaderDescribe implements reader. +func (c *webRTCConn) apiReaderDescribe() interface{} { + return struct { + Type string `json:"type"` + }{"webRTCConn"} +} diff --git a/internal/core/webrtc_index.html b/internal/core/webrtc_index.html new file mode 100644 index 00000000000..cdcf6ac2d5b --- /dev/null +++ b/internal/core/webrtc_index.html @@ -0,0 +1,175 @@ + + + + + + + + + + + + + + diff --git a/internal/core/webrtc_server.go b/internal/core/webrtc_server.go new file mode 100644 index 00000000000..c13d6e248e7 --- /dev/null +++ b/internal/core/webrtc_server.go @@ -0,0 +1,431 @@ +package core + +import ( + "context" + "crypto/tls" + _ "embed" + "fmt" + "log" + "net" + "net/http" + gopath "path" + "strings" + "sync" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + + "github.com/aler9/rtsp-simple-server/internal/conf" + "github.com/aler9/rtsp-simple-server/internal/logger" +) + +//go:embed webrtc_index.html +var webrtcIndex []byte + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type webRTCServerAPIConnsListItem struct { + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` +} + +type webRTCServerAPIConnsListData struct { + Items map[string]webRTCServerAPIConnsListItem `json:"items"` +} + +type webRTCServerAPIConnsListRes struct { + data *webRTCServerAPIConnsListData + err error +} + +type webRTCServerAPIConnsListReq struct { + res chan webRTCServerAPIConnsListRes +} + +type webRTCServerAPIConnsKickRes struct { + err error +} + +type webRTCServerAPIConnsKickReq struct { + id string + res chan webRTCServerAPIConnsKickRes +} + +type webRTCConnNewReq struct { + pathName string + wsconn *websocket.Conn +} + +type webRTCServerParent interface { + Log(logger.Level, string, ...interface{}) +} + +type webRTCServer struct { + externalAuthenticationURL string + allowOrigin string + trustedProxies conf.IPsOrCIDRs + stunServers []string + readBufferCount int + pathManager *pathManager + metrics *metrics + parent webRTCServerParent + + ctx context.Context + ctxCancel func() + wg sync.WaitGroup + ln net.Listener + tlsConfig *tls.Config + conns map[*webRTCConn]struct{} + + // in + connNew chan webRTCConnNewReq + chConnClose chan *webRTCConn + chAPIConnsList chan webRTCServerAPIConnsListReq + chAPIConnsKick chan webRTCServerAPIConnsKickReq +} + +func newWebRTCServer( + parentCtx context.Context, + externalAuthenticationURL string, + address string, + serverKey string, + serverCert string, + allowOrigin string, + trustedProxies conf.IPsOrCIDRs, + stunServers []string, + readBufferCount int, + pathManager *pathManager, + metrics *metrics, + parent webRTCServerParent, +) (*webRTCServer, error) { + ln, err := net.Listen("tcp", address) + if err != nil { + return nil, err + } + + crt, err := tls.LoadX509KeyPair(serverCert, serverKey) + if err != nil { + ln.Close() + return nil, err + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{crt}, + } + + ctx, ctxCancel := context.WithCancel(parentCtx) + + s := &webRTCServer{ + externalAuthenticationURL: externalAuthenticationURL, + allowOrigin: allowOrigin, + trustedProxies: trustedProxies, + stunServers: stunServers, + readBufferCount: readBufferCount, + pathManager: pathManager, + metrics: metrics, + parent: parent, + ctx: ctx, + ctxCancel: ctxCancel, + ln: ln, + tlsConfig: tlsConfig, + conns: make(map[*webRTCConn]struct{}), + connNew: make(chan webRTCConnNewReq), + chConnClose: make(chan *webRTCConn), + chAPIConnsList: make(chan webRTCServerAPIConnsListReq), + chAPIConnsKick: make(chan webRTCServerAPIConnsKickReq), + } + + s.log(logger.Info, "listener opened on "+address) + + if s.metrics != nil { + s.metrics.webRTCServerSet(s) + } + + s.wg.Add(1) + go s.run() + + return s, nil +} + +// Log is the main logging function. +func (s *webRTCServer) log(level logger.Level, format string, args ...interface{}) { + s.parent.Log(level, "[WebRTC] "+format, append([]interface{}{}, args...)...) +} + +func (s *webRTCServer) close() { + s.log(logger.Info, "listener is closing") + s.ctxCancel() + s.wg.Wait() +} + +func (s *webRTCServer) run() { + defer s.wg.Done() + + router := gin.New() + router.NoRoute(httpLoggerMiddleware(s), s.onRequest) + + tmp := make([]string, len(s.trustedProxies)) + for i, entry := range s.trustedProxies { + tmp[i] = entry.String() + } + router.SetTrustedProxies(tmp) + + hs := &http.Server{ + Handler: router, + TLSConfig: s.tlsConfig, + ErrorLog: log.New(&nilWriter{}, "", 0), + } + + if s.tlsConfig != nil { + go hs.ServeTLS(s.ln, "", "") + } else { + go hs.Serve(s.ln) + } + +outer: + for { + select { + case req := <-s.connNew: + c := newWebRTCConn( + s.ctx, + s.readBufferCount, + req.pathName, + req.wsconn, + s.stunServers, + &s.wg, + s.pathManager, + s, + ) + s.conns[c] = struct{}{} + + case conn := <-s.chConnClose: + delete(s.conns, conn) + + case req := <-s.chAPIConnsList: + data := &webRTCServerAPIConnsListData{ + Items: make(map[string]webRTCServerAPIConnsListItem), + } + + for c := range s.conns { + data.Items[c.uuid.String()] = webRTCServerAPIConnsListItem{ + Created: c.created, + RemoteAddr: c.remoteAddr().String(), + BytesReceived: c.bytesReceived(), + BytesSent: c.bytesSent(), + } + } + + req.res <- webRTCServerAPIConnsListRes{data: data} + + case req := <-s.chAPIConnsKick: + res := func() bool { + for c := range s.conns { + if c.uuid.String() == req.id { + delete(s.conns, c) + c.close() + return true + } + } + return false + }() + if res { + req.res <- webRTCServerAPIConnsKickRes{} + } else { + req.res <- webRTCServerAPIConnsKickRes{fmt.Errorf("not found")} + } + + case <-s.ctx.Done(): + break outer + } + } + + s.ctxCancel() + + hs.Shutdown(context.Background()) + s.ln.Close() // in case Shutdown() is called before Serve() +} + +func (s *webRTCServer) onRequest(ctx *gin.Context) { + ctx.Writer.Header().Set("Access-Control-Allow-Origin", s.allowOrigin) + ctx.Writer.Header().Set("Access-Control-Allow-Credentials", "true") + + switch ctx.Request.Method { + case http.MethodGet: + + case http.MethodOptions: + ctx.Writer.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS") + ctx.Writer.Header().Set("Access-Control-Allow-Headers", ctx.Request.Header.Get("Access-Control-Request-Headers")) + ctx.Writer.WriteHeader(http.StatusOK) + return + + default: + return + } + + // remove leading prefix + pa := ctx.Request.URL.Path[1:] + + switch pa { + case "", "favicon.ico": + return + } + + dir, fname := func() (string, string) { + if strings.HasSuffix(pa, "/ws") { + return gopath.Dir(pa), gopath.Base(pa) + } + return pa, "" + }() + + if fname == "" && !strings.HasSuffix(dir, "/") { + ctx.Writer.Header().Set("Location", "/"+dir+"/") + ctx.Writer.WriteHeader(http.StatusMovedPermanently) + return + } + + dir = strings.TrimSuffix(dir, "/") + + res := s.pathManager.describe(pathDescribeReq{ + pathName: dir, + }) + if res.err != nil { + ctx.Writer.WriteHeader(http.StatusNotFound) + return + } + + err := s.authenticate(res.path, ctx) + if err != nil { + if terr, ok := err.(pathErrAuthCritical); ok { + s.log(logger.Info, "authentication error: %s", terr.message) + ctx.Writer.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`) + ctx.Writer.WriteHeader(http.StatusUnauthorized) + return + } + + ctx.Writer.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`) + ctx.Writer.WriteHeader(http.StatusUnauthorized) + return + } + + switch fname { + case "": + ctx.Writer.Header().Set("Content-Type", "text/html") + ctx.Writer.Write(webrtcIndex) + return + + case "ws": + wsconn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil) + if err != nil { + return + } + + select { + case s.connNew <- webRTCConnNewReq{ + pathName: dir, + wsconn: wsconn, + }: + case <-s.ctx.Done(): + } + } +} + +func (s *webRTCServer) authenticate(pa *path, ctx *gin.Context) error { + pathConf := pa.Conf() + pathIPs := pathConf.ReadIPs + pathUser := pathConf.ReadUser + pathPass := pathConf.ReadPass + + if s.externalAuthenticationURL != "" { + ip := net.ParseIP(ctx.ClientIP()) + user, pass, ok := ctx.Request.BasicAuth() + + err := externalAuth( + s.externalAuthenticationURL, + ip.String(), + user, + pass, + pa.name, + false, + ctx.Request.URL.RawQuery) + if err != nil { + if !ok { + return pathErrAuthNotCritical{} + } + + return pathErrAuthCritical{ + message: fmt.Sprintf("external authentication failed: %s", err), + } + } + } + + if pathIPs != nil { + ip := net.ParseIP(ctx.ClientIP()) + + if !ipEqualOrInRange(ip, pathIPs) { + return pathErrAuthCritical{ + message: fmt.Sprintf("IP '%s' not allowed", ip), + } + } + } + + if pathUser != "" { + user, pass, ok := ctx.Request.BasicAuth() + if !ok { + return pathErrAuthNotCritical{} + } + + if user != string(pathUser) || pass != string(pathPass) { + return pathErrAuthCritical{ + message: "invalid credentials", + } + } + } + + return nil +} + +// connClose is called by webRTCConn. +func (s *webRTCServer) connClose(c *webRTCConn) { + select { + case s.chConnClose <- c: + case <-s.ctx.Done(): + } +} + +// apiConnsList is called by api. +func (s *webRTCServer) apiConnsList() webRTCServerAPIConnsListRes { + req := webRTCServerAPIConnsListReq{ + res: make(chan webRTCServerAPIConnsListRes), + } + + select { + case s.chAPIConnsList <- req: + return <-req.res + + case <-s.ctx.Done(): + return webRTCServerAPIConnsListRes{err: fmt.Errorf("terminated")} + } +} + +// apiConnsKick is called by api. +func (s *webRTCServer) apiConnsKick(id string) webRTCServerAPIConnsKickRes { + req := webRTCServerAPIConnsKickReq{ + id: id, + res: make(chan webRTCServerAPIConnsKickRes), + } + + select { + case s.chAPIConnsKick <- req: + return <-req.res + + case <-s.ctx.Done(): + return webRTCServerAPIConnsKickRes{err: fmt.Errorf("terminated")} + } +} diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index 95500395f73..66c18096e9e 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -121,6 +121,16 @@ rtmpServerCert: server.crt hlsDisable: no # Address of the HLS listener. hlsAddress: :8888 +# Enable TLS/HTTPS on the HLS server. +# This is required for Low-Latency HLS. +hlsEncryption: no +# Path to the server key. This is needed only when encryption is yes. +# This can be generated with: +# openssl genrsa -out server.key 2048 +# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 +hlsServerKey: server.key +# Path to the server certificate. +hlsServerCert: server.crt # By default, HLS is generated only when requested by a user. # This option allows to generate it always, avoiding the delay between request and generation. hlsAlwaysRemux: no @@ -151,20 +161,39 @@ hlsSegmentMaxSize: 50M # Value of the Access-Control-Allow-Origin header provided in every HTTP response. # This allows to play the HLS stream from an external website. hlsAllowOrigin: '*' -# Enable TLS/HTTPS on the HLS server. -# This is required for Low-Latency HLS. -hlsEncryption: no -# Path to the server key. This is needed only when encryption is yes. +# List of IPs or CIDRs of proxies placed before the HLS server. +# If the server receives a request from one of these entries, IP in logs +# will be taken from the X-Forwarded-For header. +hlsTrustedProxies: [] + +############################################### +# WebRTC parameters + +# Enable support for the WebRTC protocol. +webrtc: no +# Address of the WebRTC listener. +webrtcAddress: :8889 +# Path to the server key. This is mandatory since HTTPS is mandatory in order to use WebRTC. # This can be generated with: # openssl genrsa -out server.key 2048 # openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 -hlsServerKey: server.key +webrtcServerKey: server.key # Path to the server certificate. -hlsServerCert: server.crt -# List of IPs or CIDRs of proxies behind the HLS server. +webrtcServerCert: server.crt +# Value of the Access-Control-Allow-Origin header provided in every HTTP response. +# This allows to play the WebRTC stream from an external website. +webrtcAllowOrigin: '*' +# List of IPs or CIDRs of proxies placed before the WebRTC server. # If the server receives a request from one of these entries, IP in logs # will be taken from the X-Forwarded-For header. -hlsTrustedProxies: [] +webrtcTrustedProxies: [] +# List of ICE servers, in format type:user:pass:host:port or type:host:port. +# type can be "stun", "turn" or "turns". +# STUN servers are used to get the public IP of both server and clients. +# TURN/TURNS servers are used as relay when a direct connection between server and clients is not possible. +# if user is "AUTH_SECRET", then authentication is secret based. +# the secret must be inserted into the pass field. +webrtcICEServers: [stun:stun.l.google.com:19302] ############################################### # Path parameters