Skip to content

Commit

Permalink
implement publishing via tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Dec 29, 2019
1 parent b94cbae commit ed5e1c7
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 86 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Features:
* Supports reading and publishing streams
* Supports one publisher at once, while readers can be more than one.
* Supports reading via UDP and TCP
* Supports publishing via UDP and TCP


<br />
Expand Down
88 changes: 46 additions & 42 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,52 +36,12 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) {

var err error

p.rtpl, err = newUdpListener(rtpPort, "RTP", func(l *udpListener, buf []byte) {
p.mutex.RLock()
defer p.mutex.RUnlock()

tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf)))

for c := range p.clients {
if c.state == "PLAY" {
if c.rtpProto == "udp" {
l.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtpPort,
})
} else {
c.nconn.Write(tcpHeader[:])
c.nconn.Write(buf)
}
}
}
})
p.rtpl, err = newUdpListener(rtpPort, "RTP", p.handleRtp)
if err != nil {
return nil, err
}

p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", func(l *udpListener, buf []byte) {
p.mutex.RLock()
defer p.mutex.RUnlock()

tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf)))

for c := range p.clients {
if c.state == "PLAY" {
if c.rtpProto == "udp" {
l.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtcpPort,
})
} else {
c.nconn.Write(tcpHeader[:])
c.nconn.Write(buf)
}
}
}
})
p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", p.handleRtcp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -109,6 +69,50 @@ func (p *program) run() {
wg.Wait()
}

func (p *program) handleRtp(buf []byte) {
p.mutex.RLock()
defer p.mutex.RUnlock()

tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf)))

for c := range p.clients {
if c.state == "PLAY" {
if c.rtpProto == "udp" {
p.rtpl.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtpPort,
})
} else {
c.nconn.Write(tcpHeader[:])
c.nconn.Write(buf)
}
}
}
}

func (p *program) handleRtcp(buf []byte) {
p.mutex.RLock()
defer p.mutex.RUnlock()

tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf)))

for c := range p.clients {
if c.state == "PLAY" {
if c.rtpProto == "udp" {
p.rtcpl.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtcpPort,
})
} else {
c.nconn.Write(tcpHeader[:])
c.nconn.Write(buf)
}
}
}
}

func main() {
kingpin.CommandLine.Help = "rtsp-simple-server " + Version + "\n\n" +
"RTSP server."
Expand Down
142 changes: 103 additions & 39 deletions rtsp_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -326,48 +328,75 @@ func (c *rtspClient) run(wg sync.WaitGroup) {

// record
case "ANNOUNCE":
if _, ok := transports["RTP/AVP/UDP"]; !ok {
c.log("ERR: transport header does not contain RTP/AVP/UDP")
return
}

if _, ok := transports["mode=record"]; !ok {
c.log("ERR: transport header does not contain mode=record")
return
}

clientPort1, clientPort2 := getPorts()
if clientPort1 == 0 || clientPort2 == 0 {
c.log("ERR: transport header does not have valid client ports (%s)", transport)
return
}
if _, ok := transports["RTP/AVP/UDP"]; ok {
clientPort1, clientPort2 := getPorts()
if clientPort1 == 0 || clientPort2 == 0 {
c.log("ERR: transport header does not have valid client ports (%s)", transport)
return
}

err = rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP",
"unicast",
fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
"ssrc=1234ABCD",
}, ";"),
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
return
}

c.p.mutex.Lock()
c.rtpProto = "udp"
c.rtpPort = clientPort1
c.rtcpPort = clientPort2
c.state = "PRE_RECORD"
c.p.mutex.Unlock()

} else if _, ok := transports["RTP/AVP/TCP"]; ok {
err = rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
"destionation=127.0.0.1",
"source=127.0.0.1",
}, ";"),
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
return
}

c.p.mutex.Lock()
c.rtpProto = "tcp"
c.state = "PRE_RECORD"
c.p.mutex.Unlock()

err = rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP",
"unicast",
fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
"ssrc=1234ABCD",
}, ";"),
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
} else {
c.log("ERR: transport header does not contain a valid protocol (RTP/AVP or RTP/AVP/TCP) (%s)", transport)
return
}

c.p.mutex.Lock()
c.rtpPort = clientPort1
c.rtcpPort = clientPort2
c.state = "PRE_RECORD"
c.p.mutex.Unlock()

default:
c.log("ERR: client is in state '%s'", c.state)
return
Expand Down Expand Up @@ -398,12 +427,10 @@ func (c *rtspClient) run(wg sync.WaitGroup) {
c.state = "PLAY"
c.p.mutex.Unlock()

// when rtp protocol is TCP, the RTSP connection
// becomes a RTP connection.
// receive RTP feedback, do not parse it, wait until
// connection closes.
// when rtp protocol is TCP, the RTSP connection becomes a RTP connection.
// receive RTP feedback, do not parse it, wait until connection closes.
if c.rtpProto == "tcp" {
buf := make([]byte, 10249)
buf := make([]byte, 1024)
for {
_, err := c.nconn.Read(buf)
if err != nil {
Expand Down Expand Up @@ -456,12 +483,49 @@ func (c *rtspClient) run(wg sync.WaitGroup) {
return
}

c.log("is publishing (via udp)")
c.log("is publishing (via %s)", c.rtpProto)

c.p.mutex.Lock()
c.state = "RECORD"
c.p.mutex.Unlock()

// when rtp protocol is TCP, the RTSP connection becomes a RTP connection.
// receive RTP feedback, do not parse it, wait until connection closes.
if c.rtpProto == "tcp" {
packet := make([]byte, 2048)
bconn := bufio.NewReader(c.nconn)
for {
byts, err := bconn.Peek(4)
if err != nil {
return
}
bconn.Discard(4)

if byts[0] != 0x24 {
c.log("ERR: wrong magic byte")
return
}

if byts[1] != 0x00 {
c.log("ERR: wrong channel")
return
}

plen := binary.BigEndian.Uint16(byts[2:])
if plen > 2048 {
c.log("ERR: packet len > 2048")
return
}

_, err = io.ReadFull(bconn, packet[:plen])
if err != nil {
return
}

c.p.handleRtp(packet[:plen])
}
}

case "TEARDOWN":
return

Expand Down
8 changes: 3 additions & 5 deletions udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"sync"
)

type udpListenerCb func(*udpListener, []byte)

type udpListener struct {
nconn *net.UDPConn
logPrefix string
cb udpListenerCb
cb func([]byte)
}

func newUdpListener(port int, logPrefix string, cb udpListenerCb) (*udpListener, error) {
func newUdpListener(port int, logPrefix string, cb func([]byte)) (*udpListener, error) {
nconn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: port,
})
Expand Down Expand Up @@ -48,6 +46,6 @@ func (l *udpListener) run(wg sync.WaitGroup) {
break
}

l.cb(l, buf[:n])
l.cb(buf[:n])
}
}

0 comments on commit ed5e1c7

Please sign in to comment.