From ed5e1c7cfc729c9d71cdb5a5c3436b8cd2efc8b9 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 29 Dec 2019 01:05:25 +0100 Subject: [PATCH] implement publishing via tcp --- README.md | 1 + main.go | 88 ++++++++++++++++-------------- rtsp_client.go | 142 +++++++++++++++++++++++++++++++++++------------- udp_listener.go | 8 +-- 4 files changed, 153 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index 277f22c56d6..971df863ac1 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ Features: * Supports reading and publishing streams * Supports one publisher at once, while readers can be more than one. * Supports reading via UDP and TCP +* Supports publishing via UDP and TCP
diff --git a/main.go b/main.go index 64f37a17f19..ad33157940f 100644 --- a/main.go +++ b/main.go @@ -36,52 +36,12 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { var err error - p.rtpl, err = newUdpListener(rtpPort, "RTP", func(l *udpListener, buf []byte) { - p.mutex.RLock() - defer p.mutex.RUnlock() - - tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00} - binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf))) - - for c := range p.clients { - if c.state == "PLAY" { - if c.rtpProto == "udp" { - l.nconn.WriteTo(buf, &net.UDPAddr{ - IP: c.IP, - Port: c.rtpPort, - }) - } else { - c.nconn.Write(tcpHeader[:]) - c.nconn.Write(buf) - } - } - } - }) + p.rtpl, err = newUdpListener(rtpPort, "RTP", p.handleRtp) if err != nil { return nil, err } - p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", func(l *udpListener, buf []byte) { - p.mutex.RLock() - defer p.mutex.RUnlock() - - tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00} - binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf))) - - for c := range p.clients { - if c.state == "PLAY" { - if c.rtpProto == "udp" { - l.nconn.WriteTo(buf, &net.UDPAddr{ - IP: c.IP, - Port: c.rtcpPort, - }) - } else { - c.nconn.Write(tcpHeader[:]) - c.nconn.Write(buf) - } - } - } - }) + p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", p.handleRtcp) if err != nil { return nil, err } @@ -109,6 +69,50 @@ func (p *program) run() { wg.Wait() } +func (p *program) handleRtp(buf []byte) { + p.mutex.RLock() + defer p.mutex.RUnlock() + + tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00} + binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf))) + + for c := range p.clients { + if c.state == "PLAY" { + if c.rtpProto == "udp" { + p.rtpl.nconn.WriteTo(buf, &net.UDPAddr{ + IP: c.IP, + Port: c.rtpPort, + }) + } else { + c.nconn.Write(tcpHeader[:]) + c.nconn.Write(buf) + } + } + } +} + +func (p *program) handleRtcp(buf []byte) { + p.mutex.RLock() + defer p.mutex.RUnlock() + + tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00} + binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf))) + + for c := range p.clients { + if c.state == "PLAY" { + if c.rtpProto == "udp" { + p.rtcpl.nconn.WriteTo(buf, &net.UDPAddr{ + IP: c.IP, + Port: c.rtcpPort, + }) + } else { + c.nconn.Write(tcpHeader[:]) + c.nconn.Write(buf) + } + } + } +} + func main() { kingpin.CommandLine.Help = "rtsp-simple-server " + Version + "\n\n" + "RTSP server." diff --git a/rtsp_client.go b/rtsp_client.go index d2843e1b3e5..71fffaec3ea 100644 --- a/rtsp_client.go +++ b/rtsp_client.go @@ -1,6 +1,8 @@ package main import ( + "bufio" + "encoding/binary" "fmt" "io" "log" @@ -326,48 +328,75 @@ func (c *rtspClient) run(wg sync.WaitGroup) { // record case "ANNOUNCE": - if _, ok := transports["RTP/AVP/UDP"]; !ok { - c.log("ERR: transport header does not contain RTP/AVP/UDP") - return - } - if _, ok := transports["mode=record"]; !ok { c.log("ERR: transport header does not contain mode=record") return } - clientPort1, clientPort2 := getPorts() - if clientPort1 == 0 || clientPort2 == 0 { - c.log("ERR: transport header does not have valid client ports (%s)", transport) - return - } + if _, ok := transports["RTP/AVP/UDP"]; ok { + clientPort1, clientPort2 := getPorts() + if clientPort1 == 0 || clientPort2 == 0 { + c.log("ERR: transport header does not have valid client ports (%s)", transport) + return + } + + err = rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP", + "unicast", + fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), + "ssrc=1234ABCD", + }, ";"), + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.rtpProto = "udp" + c.rtpPort = clientPort1 + c.rtcpPort = clientPort2 + c.state = "PRE_RECORD" + c.p.mutex.Unlock() + + } else if _, ok := transports["RTP/AVP/TCP"]; ok { + err = rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP/TCP", + "unicast", + "destionation=127.0.0.1", + "source=127.0.0.1", + }, ";"), + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.rtpProto = "tcp" + c.state = "PRE_RECORD" + c.p.mutex.Unlock() - err = rconn.WriteResponse(&rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Transport": strings.Join([]string{ - "RTP/AVP", - "unicast", - fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), - fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), - "ssrc=1234ABCD", - }, ";"), - "Session": "12345678", - }, - }) - if err != nil { - c.log("ERR: %s", err) + } else { + c.log("ERR: transport header does not contain a valid protocol (RTP/AVP or RTP/AVP/TCP) (%s)", transport) return } - c.p.mutex.Lock() - c.rtpPort = clientPort1 - c.rtcpPort = clientPort2 - c.state = "PRE_RECORD" - c.p.mutex.Unlock() - default: c.log("ERR: client is in state '%s'", c.state) return @@ -398,12 +427,10 @@ func (c *rtspClient) run(wg sync.WaitGroup) { c.state = "PLAY" c.p.mutex.Unlock() - // when rtp protocol is TCP, the RTSP connection - // becomes a RTP connection. - // receive RTP feedback, do not parse it, wait until - // connection closes. + // when rtp protocol is TCP, the RTSP connection becomes a RTP connection. + // receive RTP feedback, do not parse it, wait until connection closes. if c.rtpProto == "tcp" { - buf := make([]byte, 10249) + buf := make([]byte, 1024) for { _, err := c.nconn.Read(buf) if err != nil { @@ -456,12 +483,49 @@ func (c *rtspClient) run(wg sync.WaitGroup) { return } - c.log("is publishing (via udp)") + c.log("is publishing (via %s)", c.rtpProto) c.p.mutex.Lock() c.state = "RECORD" c.p.mutex.Unlock() + // when rtp protocol is TCP, the RTSP connection becomes a RTP connection. + // receive RTP feedback, do not parse it, wait until connection closes. + if c.rtpProto == "tcp" { + packet := make([]byte, 2048) + bconn := bufio.NewReader(c.nconn) + for { + byts, err := bconn.Peek(4) + if err != nil { + return + } + bconn.Discard(4) + + if byts[0] != 0x24 { + c.log("ERR: wrong magic byte") + return + } + + if byts[1] != 0x00 { + c.log("ERR: wrong channel") + return + } + + plen := binary.BigEndian.Uint16(byts[2:]) + if plen > 2048 { + c.log("ERR: packet len > 2048") + return + } + + _, err = io.ReadFull(bconn, packet[:plen]) + if err != nil { + return + } + + c.p.handleRtp(packet[:plen]) + } + } + case "TEARDOWN": return diff --git a/udp_listener.go b/udp_listener.go index 4aabbd279f7..512cb891448 100644 --- a/udp_listener.go +++ b/udp_listener.go @@ -6,15 +6,13 @@ import ( "sync" ) -type udpListenerCb func(*udpListener, []byte) - type udpListener struct { nconn *net.UDPConn logPrefix string - cb udpListenerCb + cb func([]byte) } -func newUdpListener(port int, logPrefix string, cb udpListenerCb) (*udpListener, error) { +func newUdpListener(port int, logPrefix string, cb func([]byte)) (*udpListener, error) { nconn, err := net.ListenUDP("udp", &net.UDPAddr{ Port: port, }) @@ -48,6 +46,6 @@ func (l *udpListener) run(wg sync.WaitGroup) { break } - l.cb(l, buf[:n]) + l.cb(buf[:n]) } }