diff --git a/go.mod b/go.mod index 7c0c307f..9e5b887c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20201128100241-0c292dec9d3f + github.com/aler9/gortsplib v0.0.0-20201128222126-daebb85421c6 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 diff --git a/go.sum b/go.sum index 87e456b3..a4ffc67c 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20201128100241-0c292dec9d3f h1:jGDsaO6o4Iwuk+QbVgN78/DlVhA/pWS6l58XqNZfIq4= -github.com/aler9/gortsplib v0.0.0-20201128100241-0c292dec9d3f/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I= +github.com/aler9/gortsplib v0.0.0-20201128222126-daebb85421c6 h1:Gm50pyakl7N4eTf8w7KVHMK1MXsk+x9e3cj+8hg/0i8= +github.com/aler9/gortsplib v0.0.0-20201128222126-daebb85421c6/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/internal/client/client.go b/internal/client/client.go index 1e173fe7..4a216ac0 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -117,7 +117,7 @@ type Client struct { authFailures int streamProtocol gortsplib.StreamProtocol streamTracks map[int]*streamTrack - rtcpReceivers []*rtcpreceiver.RtcpReceiver + rtcpReceivers map[int]*rtcpreceiver.RtcpReceiver udpLastFrameTimes []*int64 describeCSeq base.HeaderValue describeUrl string @@ -160,10 +160,11 @@ func New( WriteTimeout: writeTimeout, ReadBufferCount: 1, }), - parent: parent, - state: stateInitial, - streamTracks: make(map[int]*streamTrack), - terminate: make(chan struct{}), + parent: parent, + state: stateInitial, + streamTracks: make(map[int]*streamTrack), + rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver), + terminate: make(chan struct{}), } atomic.AddInt64(c.stats.CountClients, 1) @@ -275,7 +276,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ // vlc with login prompt sends 4 requests: // 1) without credentials - // 2) with password but without the username + // 2) with password but without username // 3) without credentials // 4) with password and username // hence we must allow up to 3 failures @@ -429,6 +430,12 @@ func (c *Client) handleRequest(req *base.Request) error { return errStateTerminate } + basePath, ok := req.URL.BasePath() + if !ok { + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("unable to find base path (%s)", req.URL)) + return errStateTerminate + } + ct, ok := req.Header["Content-Type"] if !ok || len(ct) != 1 { c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("Content-Type header missing")) @@ -451,10 +458,12 @@ func (c *Client) handleRequest(req *base.Request) error { return errStateTerminate } - basePath, ok := req.URL.BasePath() - if !ok { - c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("unable to find base path (%s)", req.URL)) - return errStateTerminate + for trackId, t := range tracks { + _, err := t.ClockRate() + if err != nil { + c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("unable to get clock rate of track %d", trackId)) + return errStateTerminate + } } path, err := c.parent.OnClientAnnounce(c, basePath, tracks, req) @@ -474,6 +483,11 @@ func (c *Client) handleRequest(req *base.Request) error { } } + for trackId, t := range tracks { + clockRate, _ := t.ClockRate() + c.rtcpReceivers[trackId] = rtcpreceiver.New(nil, clockRate) + } + c.path = path c.state = statePreRecord @@ -688,7 +702,8 @@ func (c *Client) handleRequest(req *base.Request) error { } c.streamProtocol = gortsplib.StreamProtocolUDP - c.streamTracks[len(c.streamTracks)] = &streamTrack{ + trackId := len(c.streamTracks) + c.streamTracks[trackId] = &streamTrack{ rtpPort: (*th.ClientPorts)[0], rtcpPort: (*th.ClientPorts)[1], } @@ -743,7 +758,8 @@ func (c *Client) handleRequest(req *base.Request) error { } c.streamProtocol = gortsplib.StreamProtocolTCP - c.streamTracks[len(c.streamTracks)] = &streamTrack{ + trackId := len(c.streamTracks) + c.streamTracks[trackId] = &streamTrack{ rtpPort: 0, rtcpPort: 0, } @@ -1191,11 +1207,6 @@ func (c *Client) runRecord() bool { return "tracks" }(), c.streamProtocol) - c.rtcpReceivers = make([]*rtcpreceiver.RtcpReceiver, len(c.streamTracks)) - for trackId := range c.streamTracks { - c.rtcpReceivers[trackId] = rtcpreceiver.New(nil) - } - if c.streamProtocol == gortsplib.StreamProtocolUDP { c.udpLastFrameTimes = make([]*int64, len(c.streamTracks)) for trackId := range c.streamTracks { @@ -1348,9 +1359,10 @@ func (c *Client) runRecordUDP() bool { } case <-receiverReportTicker.C: + now := time.Now() for trackId := range c.streamTracks { - frame := c.rtcpReceivers[trackId].Report() - c.serverUdpRtcp.Write(frame, &net.UDPAddr{ + r := c.rtcpReceivers[trackId].Report(now) + c.serverUdpRtcp.Write(r, &net.UDPAddr{ IP: c.ip(), Zone: c.zone(), Port: c.streamTracks[trackId].rtcpPort, @@ -1399,7 +1411,7 @@ func (c *Client) runRecordTCP() bool { return } - c.rtcpReceivers[recvt.TrackId].OnFrame(recvt.StreamType, recvt.Content) + c.rtcpReceivers[recvt.TrackId].OnFrame(time.Now(), recvt.StreamType, recvt.Content) c.path.OnFrame(recvt.TrackId, recvt.StreamType, recvt.Content) case *base.Request: @@ -1452,9 +1464,10 @@ func (c *Client) runRecordTCP() bool { return onError(err) case <-receiverReportTicker.C: + now := time.Now() for trackId := range c.streamTracks { - frame := c.rtcpReceivers[trackId].Report() - c.conn.WriteFrameTCP(trackId, gortsplib.StreamTypeRtcp, frame) + r := c.rtcpReceivers[trackId].Report(now) + c.conn.WriteFrameTCP(trackId, gortsplib.StreamTypeRtcp, r) } case <-c.terminate: @@ -1476,9 +1489,9 @@ func (c *Client) runRecordTCP() bool { // OnUdpPublisherFrame implements serverudp.Publisher. func (c *Client) OnUdpPublisherFrame(trackId int, streamType base.StreamType, buf []byte) { - atomic.StoreInt64(c.udpLastFrameTimes[trackId], time.Now().Unix()) - - c.rtcpReceivers[trackId].OnFrame(streamType, buf) + now := time.Now() + atomic.StoreInt64(c.udpLastFrameTimes[trackId], now.Unix()) + c.rtcpReceivers[trackId].OnFrame(now, streamType, buf) c.path.OnFrame(trackId, streamType, buf) } diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index 71edf525..1a7ec60a 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -298,7 +298,7 @@ func (s *Source) runInner() bool { } // encode into RTP/H264 format - frames, err := h264Encoder.Write(nalus, pkt.Time+pkt.CTime) + frames, err := h264Encoder.Write(pkt.Time+pkt.CTime, nalus) if err != nil { readerDone <- err return @@ -315,7 +315,7 @@ func (s *Source) runInner() bool { return } - frames, err := aacEncoder.Write(pkt.Data, pkt.Time+pkt.CTime) + frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data) if err != nil { readerDone <- err return