From 603c43b12f50c261636884aa63a6ae39e30cc59b Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 12 Jul 2020 22:53:22 +0200 Subject: [PATCH] update gortsplib --- go.mod | 3 +- go.sum | 4 +- main.go | 46 +++---- server-client.go | 18 +-- server-udpl.go | 32 ++--- streamer-udpl.go | 28 ++-- streamer.go | 327 ++++++----------------------------------------- utils.go | 37 +----- 8 files changed, 107 insertions(+), 388 deletions(-) diff --git a/go.mod b/go.mod index 1c8c36b2..ee78d7bf 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,9 @@ go 1.13 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200712140456-c87fdcdbff66 + github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0 github.com/pion/rtcp v1.2.3 github.com/pion/sdp v1.3.0 - github.com/pkg/errors v0.9.1 // indirect github.com/stretchr/testify v1.5.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index 9329db75..7980594a 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200712140456-c87fdcdbff66 h1:YK6Fjwn5aEnvGjRPodjKmpkQ8VZbA58F5OvAecw0eJM= -github.com/aler9/gortsplib v0.0.0-20200712140456-c87fdcdbff66/go.mod h1:YiIgmmv0ELkWUy11Jj2h5AgfqLCpy8sIX/l9MmS8+uw= +github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0 h1:9Ph5Zl7JkTIEXkot6Q3Acag+9klW9cpwI9navzSE0gs= +github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0/go.mod h1:VsK6bzyxOh2ymYRX/U7ZfM4fEsXKXd1ylL73c2eNzUA= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= diff --git a/main.go b/main.go index cc133f19..c030e893 100644 --- a/main.go +++ b/main.go @@ -110,18 +110,18 @@ type programEventClientRecord struct { func (programEventClientRecord) isProgramEvent() {} type programEventClientFrameUdp struct { - addr *net.UDPAddr - trackFlowType trackFlowType - buf []byte + addr *net.UDPAddr + streamType gortsplib.StreamType + buf []byte } func (programEventClientFrameUdp) isProgramEvent() {} type programEventClientFrameTcp struct { - path string - trackId int - trackFlowType trackFlowType - buf []byte + path string + trackId int + streamType gortsplib.StreamType + buf []byte } func (programEventClientFrameTcp) isProgramEvent() {} @@ -139,10 +139,10 @@ type programEventStreamerNotReady struct { func (programEventStreamerNotReady) isProgramEvent() {} type programEventStreamerFrame struct { - streamer *streamer - trackId int - trackFlowType trackFlowType - buf []byte + streamer *streamer + trackId int + streamType gortsplib.StreamType + buf []byte } func (programEventStreamerFrame) isProgramEvent() {} @@ -226,12 +226,12 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) { http.DefaultServeMux = http.NewServeMux() } - p.rtpl, err = newServerUdpListener(p, conf.RtpPort, _TRACK_FLOW_TYPE_RTP) + p.rtpl, err = newServerUdpListener(p, conf.RtpPort, gortsplib.StreamTypeRtp) if err != nil { return nil, err } - p.rtcpl, err = newServerUdpListener(p, conf.RtcpPort, _TRACK_FLOW_TYPE_RTCP) + p.rtcpl, err = newServerUdpListener(p, conf.RtcpPort, gortsplib.StreamTypeRtcp) if err != nil { return nil, err } @@ -382,16 +382,16 @@ outer: evt.res <- nil case programEventClientFrameUdp: - client, trackId := p.findPublisher(evt.addr, evt.trackFlowType) + client, trackId := p.findPublisher(evt.addr, evt.streamType) if client == nil { continue } - client.rtcpReceivers[trackId].onFrame(evt.trackFlowType, evt.buf) - p.forwardFrame(client.path, trackId, evt.trackFlowType, evt.buf) + client.rtcpReceivers[trackId].onFrame(evt.streamType, evt.buf) + p.forwardFrame(client.path, trackId, evt.streamType, evt.buf) case programEventClientFrameTcp: - p.forwardFrame(evt.path, evt.trackId, evt.trackFlowType, evt.buf) + p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf) case programEventStreamerReady: evt.streamer.ready = true @@ -411,7 +411,7 @@ outer: } case programEventStreamerFrame: - p.forwardFrame(evt.streamer.path, evt.trackId, evt.trackFlowType, evt.buf) + p.forwardFrame(evt.streamer.path, evt.trackId, evt.streamType, evt.buf) case programEventTerminate: break outer @@ -469,7 +469,7 @@ func (p *program) close() { <-p.done } -func (p *program) findPublisher(addr *net.UDPAddr, trackFlowType trackFlowType) (*serverClient, int) { +func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*serverClient, int) { for _, pub := range p.publishers { cl, ok := pub.(*serverClient) if !ok { @@ -483,7 +483,7 @@ func (p *program) findPublisher(addr *net.UDPAddr, trackFlowType trackFlowType) } for i, t := range cl.streamTracks { - if trackFlowType == _TRACK_FLOW_TYPE_RTP { + if streamType == gortsplib.StreamTypeRtp { if t.rtpPort == addr.Port { return cl, i } @@ -497,11 +497,11 @@ func (p *program) findPublisher(addr *net.UDPAddr, trackFlowType trackFlowType) return nil, -1 } -func (p *program) forwardFrame(path string, trackId int, trackFlowType trackFlowType, frame []byte) { +func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) { for client := range p.clients { if client.path == path && client.state == _CLIENT_STATE_PLAY { if client.streamProtocol == _STREAM_PROTOCOL_UDP { - if trackFlowType == _TRACK_FLOW_TYPE_RTP { + if streamType == gortsplib.StreamTypeRtp { p.rtpl.write(&udpAddrBufPair{ addr: &net.UDPAddr{ IP: client.ip(), @@ -522,7 +522,7 @@ func (p *program) forwardFrame(path string, trackId int, trackFlowType trackFlow } } else { - channel := trackFlowTypeToInterleavedChannel(trackId, trackFlowType) + channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, streamType) buf := client.writeBuf.swap() buf = buf[:len(frame)] diff --git a/server-client.go b/server-client.go index 606866cb..a3144cd0 100644 --- a/server-client.go +++ b/server-client.go @@ -87,7 +87,7 @@ func newServerClient(p *program, nconn net.Conn) *serverClient { c := &serverClient{ p: p, conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{ - NConn: nconn, + Conn: nconn, ReadTimeout: p.conf.ReadTimeout, WriteTimeout: p.conf.WriteTimeout, }), @@ -235,7 +235,7 @@ func (c *serverClient) runPlay() bool { case rawEvt := <-c.events: switch evt := rawEvt.(type) { case serverClientEventFrameTcp: - c.conn.WriteInterleavedFrame(evt.frame) + c.conn.WriteFrame(evt.frame) } } } @@ -284,7 +284,7 @@ func (c *serverClient) runRecord() bool { for { frame.Content = c.readBuf.swap() frame.Content = frame.Content[:cap(frame.Content)] - recv, err := c.conn.ReadInterleavedFrameOrRequest(frame) + recv, err := c.conn.ReadFrameOrRequest(frame) if err != nil { readDone <- err break @@ -292,18 +292,18 @@ func (c *serverClient) runRecord() bool { switch recvt := recv.(type) { case *gortsplib.InterleavedFrame: - trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel) + trackId, streamType := gortsplib.ConvChannelToTrackIdAndStreamType(frame.Channel) if trackId >= len(c.streamTracks) { c.log("ERR: invalid track id '%d'", trackId) readDone <- nil break } - c.rtcpReceivers[trackId].onFrame(trackFlowType, frame.Content) + c.rtcpReceivers[trackId].onFrame(streamType, frame.Content) c.p.events <- programEventClientFrameTcp{ c.path, trackId, - trackFlowType, + streamType, frame.Content, } @@ -341,10 +341,10 @@ func (c *serverClient) runRecord() bool { case <-receiverReportTicker.C: for trackId := range c.streamTracks { - channel := trackFlowTypeToInterleavedChannel(trackId, _TRACK_FLOW_TYPE_RTCP) + channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, gortsplib.StreamTypeRtcp) frame := c.rtcpReceivers[trackId].report() - c.conn.WriteInterleavedFrame(&gortsplib.InterleavedFrame{ + c.conn.WriteFrame(&gortsplib.InterleavedFrame{ Channel: channel, Content: frame, }) @@ -668,7 +668,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("invalid SDP: %s", err)) return errClientTerminate } - sdpParsed, req.Content = sdpForServer(sdpParsed, req.Content) + sdpParsed, req.Content = sdpForServer(sdpParsed) if len(sdpParsed.MediaDescriptions) == 0 { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("no tracks defined")) diff --git a/server-udpl.go b/server-udpl.go index 0e27f699..86b5b78b 100644 --- a/server-udpl.go +++ b/server-udpl.go @@ -3,6 +3,8 @@ package main import ( "net" "time" + + "github.com/aler9/gortsplib" ) type udpAddrBufPair struct { @@ -11,17 +13,17 @@ type udpAddrBufPair struct { } type serverUdpListener struct { - p *program - nconn *net.UDPConn - trackFlowType trackFlowType - readBuf *doubleBuffer - writeBuf *doubleBuffer + p *program + nconn *net.UDPConn + streamType gortsplib.StreamType + readBuf *doubleBuffer + writeBuf *doubleBuffer writeChan chan *udpAddrBufPair done chan struct{} } -func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*serverUdpListener, error) { +func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType) (*serverUdpListener, error) { nconn, err := net.ListenUDP("udp", &net.UDPAddr{ Port: port, }) @@ -30,13 +32,13 @@ func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*s } l := &serverUdpListener{ - p: p, - nconn: nconn, - trackFlowType: trackFlowType, - readBuf: newDoubleBuffer(2048), - writeBuf: newDoubleBuffer(2048), - writeChan: make(chan *udpAddrBufPair), - done: make(chan struct{}), + p: p, + nconn: nconn, + streamType: streamType, + readBuf: newDoubleBuffer(2048), + writeBuf: newDoubleBuffer(2048), + writeChan: make(chan *udpAddrBufPair), + done: make(chan struct{}), } l.log("opened on :%d", port) @@ -45,7 +47,7 @@ func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*s func (l *serverUdpListener) log(format string, args ...interface{}) { var label string - if l.trackFlowType == _TRACK_FLOW_TYPE_RTP { + if l.streamType == gortsplib.StreamTypeRtp { label = "RTP" } else { label = "RTCP" @@ -72,7 +74,7 @@ func (l *serverUdpListener) run() { l.p.events <- programEventClientFrameUdp{ addr, - l.trackFlowType, + l.streamType, buf[:n], } } diff --git a/streamer-udpl.go b/streamer-udpl.go index a7686454..910f9640 100644 --- a/streamer-udpl.go +++ b/streamer-udpl.go @@ -3,13 +3,15 @@ package main import ( "net" "time" + + "github.com/aler9/gortsplib" ) type streamerUdpListener struct { p *program streamer *streamer trackId int - trackFlowType trackFlowType + streamType gortsplib.StreamType publisherIp net.IP publisherPort int nconn *net.UDPConn @@ -21,7 +23,7 @@ type streamerUdpListener struct { } func newStreamerUdpListener(p *program, port int, streamer *streamer, - trackId int, trackFlowType trackFlowType, publisherIp net.IP) (*streamerUdpListener, error) { + trackId int, streamType gortsplib.StreamType, publisherIp net.IP) (*streamerUdpListener, error) { nconn, err := net.ListenUDP("udp", &net.UDPAddr{ Port: port, }) @@ -30,15 +32,15 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer, } l := &streamerUdpListener{ - p: p, - streamer: streamer, - trackId: trackId, - trackFlowType: trackFlowType, - publisherIp: publisherIp, - nconn: nconn, - readBuf: newDoubleBuffer(2048), - writeChan: make(chan *udpAddrBufPair), - done: make(chan struct{}), + p: p, + streamer: streamer, + trackId: trackId, + streamType: streamType, + publisherIp: publisherIp, + nconn: nconn, + readBuf: newDoubleBuffer(2048), + writeChan: make(chan *udpAddrBufPair), + done: make(chan struct{}), } return l, nil @@ -78,8 +80,8 @@ func (l *streamerUdpListener) run() { continue } - l.streamer.rtcpReceivers[l.trackId].onFrame(l.trackFlowType, buf[:n]) - l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.trackFlowType, buf[:n]} + l.streamer.rtcpReceivers[l.trackId].onFrame(l.streamType, buf[:n]) + l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.streamType, buf[:n]} } close(l.writeChan) diff --git a/streamer.go b/streamer.go index a5666ef7..2fcdf381 100644 --- a/streamer.go +++ b/streamer.go @@ -5,8 +5,6 @@ import ( "math/rand" "net" "net/url" - "strconv" - "strings" "time" "github.com/aler9/gortsplib" @@ -28,7 +26,7 @@ type streamerUdpListenerPair struct { type streamer struct { p *program path string - ur *url.URL + u *url.URL proto streamProtocol ready bool clientSdpParsed *sdp.SessionDescription @@ -42,19 +40,19 @@ type streamer struct { } func newStreamer(p *program, path string, source string, sourceProtocol string) (*streamer, error) { - ur, err := url.Parse(source) + u, err := url.Parse(source) if err != nil { return nil, fmt.Errorf("'%s' is not a valid source not an RTSP url", source) } - if ur.Scheme != "rtsp" { + if u.Scheme != "rtsp" { return nil, fmt.Errorf("'%s' is not a valid RTSP url", source) } - if ur.Port() == "" { - ur.Host += ":554" + if u.Port() == "" { + u.Host += ":554" } - if ur.User != nil { - pass, _ := ur.User.Password() - user := ur.User.Username() + if u.User != nil { + pass, _ := u.User.Password() + user := u.User.Username() if user != "" && pass == "" || user == "" && pass != "" { fmt.Errorf("username and password must be both provided") @@ -78,7 +76,7 @@ func newStreamer(p *program, path string, source string, sourceProtocol string) s := &streamer{ p: p, path: path, - ur: ur, + u: u, proto: proto, readBuf: newDoubleBuffer(512 * 1024), terminate: make(chan struct{}), @@ -129,7 +127,7 @@ func (s *streamer) do() bool { var err error dialDone := make(chan struct{}) go func() { - nconn, err = net.DialTimeout("tcp", s.ur.Host, s.p.conf.ReadTimeout) + nconn, err = net.DialTimeout("tcp", s.u.Host, s.p.conf.ReadTimeout) close(dialDone) }() @@ -146,20 +144,7 @@ func (s *streamer) do() bool { defer nconn.Close() conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{ - NConn: nconn, - Username: func() string { - if s.ur.User != nil { - return s.ur.User.Username() - } - return "" - }(), - Password: func() string { - if s.ur.User != nil { - pass, _ := s.ur.User.Password() - return pass - } - return "" - }(), + Conn: nconn, ReadTimeout: s.p.conf.ReadTimeout, WriteTimeout: s.p.conf.WriteTimeout, }) @@ -168,64 +153,20 @@ func (s *streamer) do() bool { return true } - res, err := conn.WriteRequest(&gortsplib.Request{ - Method: gortsplib.OPTIONS, - Url: &url.URL{ - Scheme: "rtsp", - Host: s.ur.Host, - Path: "/", - }, - }) + _, err = conn.Options(s.u) if err != nil { s.log("ERR: %s", err) return true } - // OPTIONS is not available in some cameras - if res.StatusCode != gortsplib.StatusOK && res.StatusCode != gortsplib.StatusNotFound { - s.log("ERR: OPTIONS returned code %d (%s)", res.StatusCode, res.StatusMessage) - return true - } - - res, err = conn.WriteRequest(&gortsplib.Request{ - Method: gortsplib.DESCRIBE, - Url: &url.URL{ - Scheme: "rtsp", - Host: s.ur.Host, - Path: s.ur.Path, - RawQuery: s.ur.RawQuery, - }, - }) + clientSdpParsed, _, err := conn.Describe(s.u) if err != nil { s.log("ERR: %s", err) return true } - if res.StatusCode != gortsplib.StatusOK { - s.log("ERR: DESCRIBE returned code %d (%s)", res.StatusCode, res.StatusMessage) - return true - } - - contentType, ok := res.Header["Content-Type"] - if !ok || len(contentType) != 1 { - s.log("ERR: Content-Type not provided") - return true - } - - if contentType[0] != "application/sdp" { - s.log("ERR: wrong Content-Type, expected application/sdp") - return true - } - - clientSdpParsed := &sdp.SessionDescription{} - err = clientSdpParsed.Unmarshal(string(res.Content)) - if err != nil { - s.log("ERR: invalid SDP: %s", err) - return true - } - // create a filtered SDP that is used by the server (not by the client) - serverSdpParsed, serverSdpText := sdpForServer(clientSdpParsed, res.Content) + serverSdpParsed, serverSdpText := sdpForServer(clientSdpParsed) s.clientSdpParsed = clientSdpParsed s.serverSdpText = serverSdpText @@ -264,13 +205,13 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { var err error rtpl, err = newStreamerUdpListener(s.p, rtpPort, s, i, - _TRACK_FLOW_TYPE_RTP, publisherIp) + gortsplib.StreamTypeRtp, publisherIp) if err != nil { continue } rtcpl, err = newStreamerUdpListener(s.p, rtcpPort, s, i, - _TRACK_FLOW_TYPE_RTCP, publisherIp) + gortsplib.StreamTypeRtcp, publisherIp) if err != nil { rtpl.close() continue @@ -280,56 +221,7 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { } }() - res, err := conn.WriteRequest(&gortsplib.Request{ - Method: gortsplib.SETUP, - Url: func() *url.URL { - control := sdpFindAttribute(media.Attributes, "control") - - // no control attribute - if control == "" { - return s.ur - } - - // absolute path - if strings.HasPrefix(control, "rtsp://") { - ur, err := url.Parse(control) - if err != nil { - return s.ur - } - return ur - } - - // relative path - return &url.URL{ - Scheme: "rtsp", - Host: s.ur.Host, - Path: func() string { - ret := s.ur.Path - - if len(ret) == 0 || ret[len(ret)-1] != '/' { - ret += "/" - } - - control := sdpFindAttribute(media.Attributes, "control") - if control != "" { - ret += control - } else { - ret += "trackID=" + strconv.FormatInt(int64(i+1), 10) - } - - return ret - }(), - RawQuery: s.ur.RawQuery, - } - }(), - Header: gortsplib.Header{ - "Transport": []string{strings.Join([]string{ - "RTP/AVP/UDP", - "unicast", - fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort), - }, ";")}, - }, - }) + rtpServerPort, rtcpServerPort, _, err := conn.SetupUdp(s.u, media, rtpPort, rtcpPort) if err != nil { s.log("ERR: %s", err) rtpl.close() @@ -337,30 +229,6 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { return true } - if res.StatusCode != gortsplib.StatusOK { - s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage) - rtpl.close() - rtcpl.close() - return true - } - - tsRaw, ok := res.Header["Transport"] - if !ok || len(tsRaw) != 1 { - s.log("ERR: transport header not provided") - rtpl.close() - rtcpl.close() - return true - } - - th := gortsplib.ReadHeaderTransport(tsRaw[0]) - rtpServerPort, rtcpServerPort := th.GetPorts("server_port") - if rtpServerPort == 0 { - s.log("ERR: server ports not provided") - rtpl.close() - rtcpl.close() - return true - } - rtpl.publisherPort = rtpServerPort rtcpl.publisherPort = rtcpServerPort @@ -370,25 +238,12 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { }) } - res, err := conn.WriteRequest(&gortsplib.Request{ - Method: gortsplib.PLAY, - Url: &url.URL{ - Scheme: "rtsp", - Host: s.ur.Host, - Path: s.ur.Path, - RawQuery: s.ur.RawQuery, - }, - }) + _, err := conn.Play(s.u) if err != nil { s.log("ERR: %s", err) return true } - if res.StatusCode != gortsplib.StatusOK { - s.log("ERR: PLAY returned code %d (%s)", res.StatusCode, res.StatusMessage) - return true - } - s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.MediaDescriptions)) for trackId := range s.clientSdpParsed.MediaDescriptions { s.rtcpReceivers[trackId] = newRtcpReceiver() @@ -415,11 +270,11 @@ outer: break outer case <-sendKeepaliveTicker.C: - _, err = conn.WriteRequest(&gortsplib.Request{ + _, err = conn.Do(&gortsplib.Request{ Method: gortsplib.OPTIONS, Url: &url.URL{ Scheme: "rtsp", - Host: s.ur.Host, + Host: s.u.Host, Path: "/", }, }) @@ -473,123 +328,19 @@ outer: func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { for i, media := range s.clientSdpParsed.MediaDescriptions { - interleaved := fmt.Sprintf("interleaved=%d-%d", (i * 2), (i*2)+1) - - res, err := conn.WriteRequest(&gortsplib.Request{ - Method: gortsplib.SETUP, - Url: func() *url.URL { - control := sdpFindAttribute(media.Attributes, "control") - - // no control attribute - if control == "" { - return s.ur - } - - // absolute path - if strings.HasPrefix(control, "rtsp://") { - ur, err := url.Parse(control) - if err != nil { - return s.ur - } - return ur - } - - // relative path - return &url.URL{ - Scheme: "rtsp", - Host: s.ur.Host, - Path: func() string { - ret := s.ur.Path - - if len(ret) == 0 || ret[len(ret)-1] != '/' { - ret += "/" - } - - control := sdpFindAttribute(media.Attributes, "control") - if control != "" { - ret += control - } else { - ret += "trackID=" + strconv.FormatInt(int64(i+1), 10) - } - - return ret - }(), - RawQuery: s.ur.RawQuery, - } - }(), - Header: gortsplib.Header{ - "Transport": []string{strings.Join([]string{ - "RTP/AVP/TCP", - "unicast", - interleaved, - }, ";")}, - }, - }) + _, err := conn.SetupTcp(s.u, media, i) if err != nil { s.log("ERR: %s", err) return true } - - if res.StatusCode != gortsplib.StatusOK { - s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage) - return true - } - - tsRaw, ok := res.Header["Transport"] - if !ok || len(tsRaw) != 1 { - s.log("ERR: transport header not provided") - return true - } - - th := gortsplib.ReadHeaderTransport(tsRaw[0]) - - _, ok = th[interleaved] - if !ok { - s.log("ERR: transport header does not have %s (%s)", interleaved, tsRaw[0]) - return true - } } - err := conn.WriteRequestNoResponse(&gortsplib.Request{ - Method: gortsplib.PLAY, - Url: &url.URL{ - Scheme: "rtsp", - Host: s.ur.Host, - Path: s.ur.Path, - RawQuery: s.ur.RawQuery, - }, - }) + _, err := conn.Play(s.u) if err != nil { s.log("ERR: %s", err) return true } - frame := &gortsplib.InterleavedFrame{} - -outer1: - for { - frame.Content = s.readBuf.swap() - frame.Content = frame.Content[:cap(frame.Content)] - - recv, err := conn.ReadInterleavedFrameOrResponse(frame) - if err != nil { - s.log("ERR: %s", err) - return true - } - - switch recvt := recv.(type) { - case *gortsplib.Response: - if recvt.StatusCode != gortsplib.StatusOK { - s.log("ERR: PLAY returned code %d (%s)", recvt.StatusCode, recvt.StatusMessage) - return true - } - break outer1 - - case *gortsplib.InterleavedFrame: - // ignore the frames sent before the response - } - } - s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.MediaDescriptions)) for trackId := range s.clientSdpParsed.MediaDescriptions { s.rtcpReceivers[trackId] = newRtcpReceiver() @@ -597,57 +348,52 @@ outer1: s.p.events <- programEventStreamerReady{s} + frame := &gortsplib.InterleavedFrame{} + chanConnError := make(chan struct{}) go func() { for { frame.Content = s.readBuf.swap() frame.Content = frame.Content[:cap(frame.Content)] - err := conn.ReadInterleavedFrame(frame) + + err := conn.ReadFrame(frame) if err != nil { s.log("ERR: %s", err) close(chanConnError) break } - trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel) + trackId, streamType := gortsplib.ConvChannelToTrackIdAndStreamType(frame.Channel) - s.rtcpReceivers[trackId].onFrame(trackFlowType, frame.Content) - s.p.events <- programEventStreamerFrame{s, trackId, trackFlowType, frame.Content} + s.rtcpReceivers[trackId].onFrame(streamType, frame.Content) + s.p.events <- programEventStreamerFrame{s, trackId, streamType, frame.Content} } }() - checkStreamTicker := time.NewTicker(_STREAMER_CHECK_STREAM_INTERVAL) + // a ticker to check the stream is not needed since there's already a deadline + // on the RTSP reads receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL) var ret bool -outer2: +outer: for { select { case <-s.terminate: ret = false - break outer2 + break outer case <-chanConnError: ret = true - break outer2 - - case <-checkStreamTicker.C: - for trackId := range s.clientSdpParsed.MediaDescriptions { - if time.Since(s.rtcpReceivers[trackId].lastFrameTime()) >= s.p.conf.StreamDeadAfter { - s.log("ERR: stream is dead") - ret = true - break outer2 - } - } + break outer case <-receiverReportTicker.C: for trackId := range s.clientSdpParsed.MediaDescriptions { frame := s.rtcpReceivers[trackId].report() - channel := trackFlowTypeToInterleavedChannel(trackId, _TRACK_FLOW_TYPE_RTCP) + channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, gortsplib.StreamTypeRtcp) - conn.WriteInterleavedFrame(&gortsplib.InterleavedFrame{ + conn.WriteFrame(&gortsplib.InterleavedFrame{ Channel: channel, Content: frame, }) @@ -655,7 +401,6 @@ outer2: } } - checkStreamTicker.Stop() receiverReportTicker.Stop() s.p.events <- programEventStreamerNotReady{s} diff --git a/utils.go b/utils.go index 895f0f72..6573d283 100644 --- a/utils.go +++ b/utils.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/aler9/gortsplib" "github.com/pion/rtcp" "github.com/pion/sdp" ) @@ -52,27 +53,6 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool { return false } -type trackFlowType int - -const ( - _TRACK_FLOW_TYPE_RTP trackFlowType = iota - _TRACK_FLOW_TYPE_RTCP -) - -func interleavedChannelToTrackFlowType(channel uint8) (int, trackFlowType) { - if (channel % 2) == 0 { - return int(channel / 2), _TRACK_FLOW_TYPE_RTP - } - return int((channel - 1) / 2), _TRACK_FLOW_TYPE_RTCP -} - -func trackFlowTypeToInterleavedChannel(id int, trackFlowType trackFlowType) uint8 { - if trackFlowType == _TRACK_FLOW_TYPE_RTP { - return uint8(id * 2) - } - return uint8((id * 2) + 1) -} - type doubleBuffer struct { buf1 []byte buf2 []byte @@ -198,8 +178,8 @@ func (rr *rtcpReceiver) close() { <-rr.done } -func (rr *rtcpReceiver) onFrame(trackFlowType trackFlowType, buf []byte) { - if trackFlowType == _TRACK_FLOW_TYPE_RTP { +func (rr *rtcpReceiver) onFrame(streamType gortsplib.StreamType, buf []byte) { + if streamType == gortsplib.StreamTypeRtp { // extract sequence number of first frame if len(buf) >= 3 { sequenceNumber := uint16(uint16(buf[2])<<8 | uint16(buf[1])) @@ -233,16 +213,7 @@ func (rr *rtcpReceiver) report() []byte { return <-res } -func sdpFindAttribute(attributes []sdp.Attribute, key string) string { - for _, attr := range attributes { - if attr.Key == key { - return attr.Value - } - } - return "" -} - -func sdpForServer(sin *sdp.SessionDescription, bytsin []byte) (*sdp.SessionDescription, []byte) { +func sdpForServer(sin *sdp.SessionDescription) (*sdp.SessionDescription, []byte) { sout := &sdp.SessionDescription{ SessionName: "Stream", Origin: sdp.Origin{