From b94cbae246cc22d7d5b8917397679d64f70b279e Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 29 Dec 2019 00:44:26 +0100 Subject: [PATCH] implement reading via TCP --- main.go | 35 ++++++-- rtsp/conn.go | 24 ++---- rtsp_client.go | 210 +++++++++++++++++++++++++++++------------------ rtsp_listener.go | 7 +- 4 files changed, 162 insertions(+), 114 deletions(-) diff --git a/main.go b/main.go index 05819743..64f37a17 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "encoding/binary" "fmt" "log" "net" @@ -38,12 +39,21 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { p.rtpl, err = newUdpListener(rtpPort, "RTP", func(l *udpListener, buf []byte) { p.mutex.RLock() defer p.mutex.RUnlock() + + tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00} + binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf))) + for c := range p.clients { if c.state == "PLAY" { - l.nconn.WriteTo(buf, &net.UDPAddr{ - IP: c.IP, - Port: c.rtpPort, - }) + if c.rtpProto == "udp" { + l.nconn.WriteTo(buf, &net.UDPAddr{ + IP: c.IP, + Port: c.rtpPort, + }) + } else { + c.nconn.Write(tcpHeader[:]) + c.nconn.Write(buf) + } } } }) @@ -54,12 +64,21 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", func(l *udpListener, buf []byte) { p.mutex.RLock() defer p.mutex.RUnlock() + + tcpHeader := [4]byte{0x24, 0x00, 0x00, 0x00} + binary.BigEndian.PutUint16(tcpHeader[2:], uint16(len(buf))) + for c := range p.clients { if c.state == "PLAY" { - l.nconn.WriteTo(buf, &net.UDPAddr{ - IP: c.IP, - Port: c.rtcpPort, - }) + if c.rtpProto == "udp" { + l.nconn.WriteTo(buf, &net.UDPAddr{ + IP: c.IP, + Port: c.rtcpPort, + }) + } else { + c.nconn.Write(tcpHeader[:]) + c.nconn.Write(buf) + } } } }) diff --git a/rtsp/conn.go b/rtsp/conn.go index 3a5aca7b..31fecc76 100644 --- a/rtsp/conn.go +++ b/rtsp/conn.go @@ -5,35 +5,21 @@ import ( ) type Conn struct { - c net.Conn -} - -func NewConn(c net.Conn) *Conn { - return &Conn{ - c: c, - } -} - -func (c *Conn) Close() error { - return c.c.Close() -} - -func (c *Conn) RemoteAddr() net.Addr { - return c.c.RemoteAddr() + net.Conn } func (c *Conn) ReadRequest() (*Request, error) { - return requestDecode(c.c) + return requestDecode(c) } func (c *Conn) WriteRequest(req *Request) error { - return requestEncode(c.c, req) + return requestEncode(c, req) } func (c *Conn) ReadResponse() (*Response, error) { - return responseDecode(c.c) + return responseDecode(c) } func (c *Conn) WriteResponse(res *Response) error { - return responseEncode(c.c, res) + return responseEncode(c, res) } diff --git a/rtsp_client.go b/rtsp_client.go index 5ab7c886..d2843e1b 100644 --- a/rtsp_client.go +++ b/rtsp_client.go @@ -15,17 +15,18 @@ import ( type rtspClient struct { p *program - rconn *rtsp.Conn + nconn net.Conn state string IP net.IP + rtpProto string rtpPort int rtcpPort int } -func newRtspClient(p *program, rconn *rtsp.Conn) *rtspClient { +func newRtspClient(p *program, nconn net.Conn) *rtspClient { c := &rtspClient{ p: p, - rconn: rconn, + nconn: nconn, state: "STARTING", } @@ -50,11 +51,11 @@ func (c *rtspClient) close() error { } } - return c.rconn.Close() + return c.nconn.Close() } func (c *rtspClient) log(format string, args ...interface{}) { - format = "[RTSP client " + c.rconn.RemoteAddr().String() + "] " + format + format = "[RTSP client " + c.nconn.RemoteAddr().String() + "] " + format log.Printf(format, args...) } @@ -67,10 +68,15 @@ func (c *rtspClient) run(wg sync.WaitGroup) { c.close() }() + ipstr, _, _ := net.SplitHostPort(c.nconn.RemoteAddr().String()) + c.IP = net.ParseIP(ipstr) + + rconn := &rtsp.Conn{c.nconn} + c.log("connected") for { - req, err := c.rconn.ReadRequest() + req, err := rconn.ReadRequest() if err != nil { if err != io.EOF { c.log("ERR: %s", err) @@ -97,7 +103,7 @@ func (c *rtspClient) run(wg sync.WaitGroup) { // do not check state, since OPTIONS can be requested // in any state - err = c.rconn.WriteResponse(&rtsp.Response{ + err = rconn.WriteResponse(&rtsp.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -139,7 +145,7 @@ func (c *rtspClient) run(wg sync.WaitGroup) { return } - err = c.rconn.WriteResponse(&rtsp.Response{ + err = rconn.WriteResponse(&rtsp.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -188,7 +194,7 @@ func (c *rtspClient) run(wg sync.WaitGroup) { return } - err = c.rconn.WriteResponse(&rtsp.Response{ + err = rconn.WriteResponse(&rtsp.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -211,23 +217,18 @@ func (c *rtspClient) run(wg sync.WaitGroup) { return } - transports := strings.Split(transport, ";") + transports := make(map[string]struct{}) + for _, t := range strings.Split(transport, ";") { + transports[t] = struct{}{} + } - ok = func() bool { - for _, t := range transports { - if t == "unicast" { - return true - } - } - return false - }() - if !ok { + if _, ok := transports["unicast"]; !ok { c.log("ERR: transport header does not contain unicast") return } - clientPort1, clientPort2 := func() (int, int) { - for _, t := range transports { + getPorts := func() (int, int) { + for t := range transports { if !strings.HasPrefix(t, "client_port=") { continue } @@ -251,83 +252,97 @@ func (c *rtspClient) run(wg sync.WaitGroup) { return int(port1), int(port2) } return 0, 0 - }() - if clientPort1 == 0 || clientPort2 == 0 { - c.log("ERR: transport header does not have valid client ports (%s)", transport) - return } switch c.state { // play case "STARTING": - ok = func() bool { - for _, t := range transports { - if t == "RTP/AVP" { - return true - } + // UDP + if _, ok := transports["RTP/AVP"]; ok { + clientPort1, clientPort2 := getPorts() + if clientPort1 == 0 || clientPort2 == 0 { + c.log("ERR: transport header does not have valid client ports (%s)", transport) + return } - return false - }() - if !ok { - c.log("ERR: transport header does not contain RTP/AVP") - return - } - err = c.rconn.WriteResponse(&rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Transport": strings.Join([]string{ - "RTP/AVP", - "unicast", - fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), - fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), - "ssrc=1234ABCD", - }, ";"), - "Session": "12345678", - }, - }) - if err != nil { - c.log("ERR: %s", err) + err = rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP", + "unicast", + fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), + "ssrc=1234ABCD", + }, ";"), + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.rtpProto = "udp" + c.rtpPort = clientPort1 + c.rtcpPort = clientPort2 + c.state = "PRE_PLAY" + c.p.mutex.Unlock() + + // TCP + } else if _, ok := transports["RTP/AVP/TCP"]; ok { + err = rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP/TCP", + "unicast", + "destionation=127.0.0.1", + "source=127.0.0.1", + "interleaved=0-1", + }, ";"), + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.rtpProto = "tcp" + c.state = "PRE_PLAY" + c.p.mutex.Unlock() + + } else { + c.log("ERR: transport header does not contain a valid protocol (RTP/AVP or RTP/AVP/TCP) (%s)", transport) return } - c.p.mutex.Lock() - c.rtpPort = clientPort1 - c.rtcpPort = clientPort2 - c.state = "PRE_PLAY" - c.p.mutex.Unlock() - // record case "ANNOUNCE": - ok = func() bool { - for _, t := range transports { - if t == "RTP/AVP/UDP" { - return true - } - } - return false - }() - if !ok { + if _, ok := transports["RTP/AVP/UDP"]; !ok { c.log("ERR: transport header does not contain RTP/AVP/UDP") return } - ok = func() bool { - for _, t := range transports { - if t == "mode=record" { - return true - } - } - return false - }() - if !ok { + if _, ok := transports["mode=record"]; !ok { c.log("ERR: transport header does not contain mode=record") return } - err = c.rconn.WriteResponse(&rtsp.Response{ + clientPort1, clientPort2 := getPorts() + if clientPort1 == 0 || clientPort2 == 0 { + c.log("ERR: transport header does not have valid client ports (%s)", transport) + return + } + + err = rconn.WriteResponse(&rtsp.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -348,8 +363,6 @@ func (c *rtspClient) run(wg sync.WaitGroup) { } c.p.mutex.Lock() - ipstr, _, _ := net.SplitHostPort(c.rconn.RemoteAddr().String()) - c.IP = net.ParseIP(ipstr) c.rtpPort = clientPort1 c.rtcpPort = clientPort2 c.state = "PRE_RECORD" @@ -366,7 +379,7 @@ func (c *rtspClient) run(wg sync.WaitGroup) { return } - err = c.rconn.WriteResponse(&rtsp.Response{ + err = rconn.WriteResponse(&rtsp.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -379,27 +392,58 @@ func (c *rtspClient) run(wg sync.WaitGroup) { return } + c.log("is receiving (via %s)", c.rtpProto) + c.p.mutex.Lock() c.state = "PLAY" c.p.mutex.Unlock() + // when rtp protocol is TCP, the RTSP connection + // becomes a RTP connection. + // receive RTP feedback, do not parse it, wait until + // connection closes. + if c.rtpProto == "tcp" { + buf := make([]byte, 10249) + for { + _, err := c.nconn.Read(buf) + if err != nil { + return + } + } + } + case "PAUSE": if c.state != "PLAY" { c.log("ERR: client is in state '%s'", c.state) return } + c.log("paused receiving") + c.p.mutex.Lock() c.state = "PRE_PLAY" c.p.mutex.Unlock() + err = rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + case "RECORD": if c.state != "PRE_RECORD" { c.log("ERR: client is in state '%s'", c.state) return } - err = c.rconn.WriteResponse(&rtsp.Response{ + err = rconn.WriteResponse(&rtsp.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -412,6 +456,8 @@ func (c *rtspClient) run(wg sync.WaitGroup) { return } + c.log("is publishing (via udp)") + c.p.mutex.Lock() c.state = "RECORD" c.p.mutex.Unlock() diff --git a/rtsp_listener.go b/rtsp_listener.go index a677296b..2fb15af2 100644 --- a/rtsp_listener.go +++ b/rtsp_listener.go @@ -4,8 +4,6 @@ import ( "log" "net" "sync" - - "rtsp-server/rtsp" ) type rtspListener struct { @@ -38,13 +36,12 @@ func (l *rtspListener) run(wg sync.WaitGroup) { defer wg.Done() for { - conn, err := l.netl.AcceptTCP() + nconn, err := l.netl.AcceptTCP() if err != nil { break } - rconn := rtsp.NewConn(conn) - rsc := newRtspClient(l.p, rconn) + rsc := newRtspClient(l.p, nconn) wg.Add(1) go rsc.run(wg) }