Browse Source

implement publishing via tcp

pull/2/head v0.1.1
aler9 5 years ago
parent
commit
ed5e1c7cfc
  1. 1
      README.md
  2. 88
      main.go
  3. 142
      rtsp_client.go
  4. 8
      udp_listener.go

1
README.md

@ -12,6 +12,7 @@ Features: @@ -12,6 +12,7 @@ Features:
* Supports reading and publishing streams
* Supports one publisher at once, while readers can be more than one.
* Supports reading via UDP and TCP
* Supports publishing via UDP and TCP
<br />

88
main.go

@ -36,52 +36,12 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { @@ -36,52 +36,12 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) {
var err error
p.rtpl, err = newUdpListener(rtpPort, "RTP", func(l *udpListener, buf []byte) {
p.mutex.RLock()
defer p.mutex.RUnlock()
tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf)))
for c := range p.clients {
if c.state == "PLAY" {
if c.rtpProto == "udp" {
l.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtpPort,
})
} else {
c.nconn.Write(tcpHeader[:])
c.nconn.Write(buf)
}
}
}
})
p.rtpl, err = newUdpListener(rtpPort, "RTP", p.handleRtp)
if err != nil {
return nil, err
}
p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", func(l *udpListener, buf []byte) {
p.mutex.RLock()
defer p.mutex.RUnlock()
tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf)))
for c := range p.clients {
if c.state == "PLAY" {
if c.rtpProto == "udp" {
l.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtcpPort,
})
} else {
c.nconn.Write(tcpHeader[:])
c.nconn.Write(buf)
}
}
}
})
p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", p.handleRtcp)
if err != nil {
return nil, err
}
@ -109,6 +69,50 @@ func (p *program) run() { @@ -109,6 +69,50 @@ func (p *program) run() {
wg.Wait()
}
func (p *program) handleRtp(buf []byte) {
p.mutex.RLock()
defer p.mutex.RUnlock()
tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf)))
for c := range p.clients {
if c.state == "PLAY" {
if c.rtpProto == "udp" {
p.rtpl.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtpPort,
})
} else {
c.nconn.Write(tcpHeader[:])
c.nconn.Write(buf)
}
}
}
}
func (p *program) handleRtcp(buf []byte) {
p.mutex.RLock()
defer p.mutex.RUnlock()
tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf)))
for c := range p.clients {
if c.state == "PLAY" {
if c.rtpProto == "udp" {
p.rtcpl.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtcpPort,
})
} else {
c.nconn.Write(tcpHeader[:])
c.nconn.Write(buf)
}
}
}
}
func main() {
kingpin.CommandLine.Help = "rtsp-simple-server " + Version + "\n\n" +
"RTSP server."

142
rtsp_client.go

@ -1,6 +1,8 @@ @@ -1,6 +1,8 @@
package main
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"log"
@ -326,48 +328,75 @@ func (c *rtspClient) run(wg sync.WaitGroup) { @@ -326,48 +328,75 @@ func (c *rtspClient) run(wg sync.WaitGroup) {
// record
case "ANNOUNCE":
if _, ok := transports["RTP/AVP/UDP"]; !ok {
c.log("ERR: transport header does not contain RTP/AVP/UDP")
return
}
if _, ok := transports["mode=record"]; !ok {
c.log("ERR: transport header does not contain mode=record")
return
}
clientPort1, clientPort2 := getPorts()
if clientPort1 == 0 || clientPort2 == 0 {
c.log("ERR: transport header does not have valid client ports (%s)", transport)
return
}
if _, ok := transports["RTP/AVP/UDP"]; ok {
clientPort1, clientPort2 := getPorts()
if clientPort1 == 0 || clientPort2 == 0 {
c.log("ERR: transport header does not have valid client ports (%s)", transport)
return
}
err = rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP",
"unicast",
fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
"ssrc=1234ABCD",
}, ";"),
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
return
}
c.p.mutex.Lock()
c.rtpProto = "udp"
c.rtpPort = clientPort1
c.rtcpPort = clientPort2
c.state = "PRE_RECORD"
c.p.mutex.Unlock()
} else if _, ok := transports["RTP/AVP/TCP"]; ok {
err = rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
"destionation=127.0.0.1",
"source=127.0.0.1",
}, ";"),
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
return
}
c.p.mutex.Lock()
c.rtpProto = "tcp"
c.state = "PRE_RECORD"
c.p.mutex.Unlock()
err = rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP",
"unicast",
fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
"ssrc=1234ABCD",
}, ";"),
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
} else {
c.log("ERR: transport header does not contain a valid protocol (RTP/AVP or RTP/AVP/TCP) (%s)", transport)
return
}
c.p.mutex.Lock()
c.rtpPort = clientPort1
c.rtcpPort = clientPort2
c.state = "PRE_RECORD"
c.p.mutex.Unlock()
default:
c.log("ERR: client is in state '%s'", c.state)
return
@ -398,12 +427,10 @@ func (c *rtspClient) run(wg sync.WaitGroup) { @@ -398,12 +427,10 @@ func (c *rtspClient) run(wg sync.WaitGroup) {
c.state = "PLAY"
c.p.mutex.Unlock()
// when rtp protocol is TCP, the RTSP connection
// becomes a RTP connection.
// receive RTP feedback, do not parse it, wait until
// connection closes.
// when rtp protocol is TCP, the RTSP connection becomes a RTP connection.
// receive RTP feedback, do not parse it, wait until connection closes.
if c.rtpProto == "tcp" {
buf := make([]byte, 10249)
buf := make([]byte, 1024)
for {
_, err := c.nconn.Read(buf)
if err != nil {
@ -456,12 +483,49 @@ func (c *rtspClient) run(wg sync.WaitGroup) { @@ -456,12 +483,49 @@ func (c *rtspClient) run(wg sync.WaitGroup) {
return
}
c.log("is publishing (via udp)")
c.log("is publishing (via %s)", c.rtpProto)
c.p.mutex.Lock()
c.state = "RECORD"
c.p.mutex.Unlock()
// when rtp protocol is TCP, the RTSP connection becomes a RTP connection.
// receive RTP feedback, do not parse it, wait until connection closes.
if c.rtpProto == "tcp" {
packet := make([]byte, 2048)
bconn := bufio.NewReader(c.nconn)
for {
byts, err := bconn.Peek(4)
if err != nil {
return
}
bconn.Discard(4)
if byts[0] != 0x24 {
c.log("ERR: wrong magic byte")
return
}
if byts[1] != 0x00 {
c.log("ERR: wrong channel")
return
}
plen := binary.BigEndian.Uint16(byts[2:])
if plen > 2048 {
c.log("ERR: packet len > 2048")
return
}
_, err = io.ReadFull(bconn, packet[:plen])
if err != nil {
return
}
c.p.handleRtp(packet[:plen])
}
}
case "TEARDOWN":
return

8
udp_listener.go

@ -6,15 +6,13 @@ import ( @@ -6,15 +6,13 @@ import (
"sync"
)
type udpListenerCb func(*udpListener, []byte)
type udpListener struct {
nconn *net.UDPConn
logPrefix string
cb udpListenerCb
cb func([]byte)
}
func newUdpListener(port int, logPrefix string, cb udpListenerCb) (*udpListener, error) {
func newUdpListener(port int, logPrefix string, cb func([]byte)) (*udpListener, error) {
nconn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: port,
})
@ -48,6 +46,6 @@ func (l *udpListener) run(wg sync.WaitGroup) { @@ -48,6 +46,6 @@ func (l *udpListener) run(wg sync.WaitGroup) {
break
}
l.cb(l, buf[:n])
l.cb(buf[:n])
}
}

Loading…
Cancel
Save