diff --git a/client.go b/client.go index 45afbdfa..902ef0de 100644 --- a/client.go +++ b/client.go @@ -887,14 +887,30 @@ func (c *client) runPlay(path string) { } else { readDone := make(chan error) go func() { - buf := make([]byte, 2048) + frame := &gortsplib.InterleavedFrame{} + readBuf := make([]byte, clientTcpReadBufferSize) for { - _, err := c.conn.NetConn().Read(buf) + frame.Content = readBuf + frame.Content = frame.Content[:cap(frame.Content)] + + recv, err := c.conn.ReadFrameOrRequest(frame, false) if err != nil { readDone <- err break } + + switch recvt := recv.(type) { + case *gortsplib.InterleavedFrame: + // rtcp feedback is handled by gortsplib + + case *gortsplib.Request: + ok := c.handleRequest(recvt) + if !ok { + readDone <- nil + break + } + } } }() @@ -902,7 +918,7 @@ func (c *client) runPlay(path string) { for { select { case err := <-readDone: - if err != io.EOF { + if err != nil && err != io.EOF { c.log("ERR: %s", err) } break outer @@ -1034,7 +1050,7 @@ func (c *client) runRecord(path string) { frame.Content = readBuf.next() frame.Content = frame.Content[:cap(frame.Content)] - recv, err := c.conn.ReadFrameOrRequest(frame) + recv, err := c.conn.ReadFrameOrRequest(frame, true) if err != nil { readDone <- err break diff --git a/go.mod b/go.mod index 5a7c45e4..382a48a8 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200817164925-b45d2000a637 + github.com/aler9/gortsplib v0.0.0-20200829115535-1b7b4ea7dc77 github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 github.com/davecgh/go-spew v1.1.1 // indirect github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 266e9079..df521822 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200817164925-b45d2000a637 h1:nRL12W8of4IVFqFCJRsF4Iy0NQAhSbNdu4+dBzdu3n8= -github.com/aler9/gortsplib v0.0.0-20200817164925-b45d2000a637/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= +github.com/aler9/gortsplib v0.0.0-20200829115535-1b7b4ea7dc77 h1:brN94N74bmrW9MsURf1QY6Mpph9P9aUUO730BavHshg= +github.com/aler9/gortsplib v0.0.0-20200829115535-1b7b4ea7dc77/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/source.go b/source.go index 9cc69820..9bf1b89f 100644 --- a/source.go +++ b/source.go @@ -170,16 +170,16 @@ func (s *source) doInner(terminate chan struct{}) bool { return true } - defer conn.Close() - _, err = conn.Options(s.confp.sourceUrl) if err != nil { + conn.Close() s.log("ERR: %s", err) return true } tracks, _, err := conn.Describe(s.confp.sourceUrl) if err != nil { + conn.Close() s.log("ERR: %s", err) return true } @@ -199,24 +199,17 @@ func (s *source) doInner(terminate chan struct{}) bool { } func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { - type trackListenerPair struct { - serverRtp *gortsplib.ConnClientUdpListener - serverRtcp *gortsplib.ConnClientUdpListener - } - var listeners []*trackListenerPair + var rtpReads []gortsplib.UdpReadFunc + var rtcpReads []gortsplib.UdpReadFunc for _, track := range s.tracks { - var serverRtp *gortsplib.ConnClientUdpListener - var serverRtcp *gortsplib.ConnClientUdpListener - var err error - for { // choose two consecutive ports in range 65536-10000 // rtp must be even and rtcp odd rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000 rtcpPort := rtpPort + 1 - serverRtp, serverRtcp, _, err = conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort) + rtpRead, rtcpRead, _, err := conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort) if err != nil { // retry if it's a bind error if nerr, ok := err.(*net.OpError); ok { @@ -227,21 +220,20 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo } } + conn.Close() s.log("ERR: %s", err) return true } + rtpReads = append(rtpReads, rtpRead) + rtcpReads = append(rtcpReads, rtcpRead) break } - - listeners = append(listeners, &trackListenerPair{ - serverRtp: serverRtp, - serverRtcp: serverRtcp, - }) } _, err := conn.Play(s.confp.sourceUrl) if err != nil { + conn.Close() s.log("ERR: %s", err) return true } @@ -250,42 +242,44 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo var wg sync.WaitGroup - for trackId, lp := range listeners { - wg.Add(2) - - // receive RTP packets - go func(trackId int, l *gortsplib.ConnClientUdpListener) { + // receive RTP packets + for trackId, rtpRead := range rtpReads { + wg.Add(1) + go func(trackId int, rtpRead gortsplib.UdpReadFunc) { defer wg.Done() multiBuf := newMultiBuffer(3, sourceUdpReadBufferSize) for { buf := multiBuf.next() - n, err := l.Read(buf) + n, err := rtpRead(buf) if err != nil { break } s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} } - }(trackId, lp.serverRtp) + }(trackId, rtpRead) + } - // receive RTCP packets - go func(trackId int, l *gortsplib.ConnClientUdpListener) { + // receive RTCP packets + for trackId, rtcpRead := range rtcpReads { + wg.Add(1) + go func(trackId int, rtcpRead gortsplib.UdpReadFunc) { defer wg.Done() multiBuf := newMultiBuffer(3, sourceUdpReadBufferSize) for { buf := multiBuf.next() - n, err := l.Read(buf) + n, err := rtcpRead(buf) if err != nil { break } s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} } - }(trackId, lp.serverRtcp) + }(trackId, rtcpRead) } tcpConnDone := make(chan error) @@ -299,26 +293,23 @@ outer: for { select { case <-terminate: - conn.NetConn().Close() + conn.Close() <-tcpConnDone ret = false break outer case err := <-tcpConnDone: + conn.Close() s.log("ERR: %s", err) ret = true break outer } } - s.p.events <- programEventSourceNotReady{s} - - for _, lp := range listeners { - lp.serverRtp.Close() - lp.serverRtcp.Close() - } wg.Wait() + s.p.events <- programEventSourceNotReady{s} + return ret } @@ -326,6 +317,7 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo for _, track := range s.tracks { _, err := conn.SetupTcp(s.confp.sourceUrl, track) if err != nil { + conn.Close() s.log("ERR: %s", err) return true } @@ -333,6 +325,7 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo _, err := conn.Play(s.confp.sourceUrl) if err != nil { + conn.Close() s.log("ERR: %s", err) return true } @@ -364,12 +357,13 @@ outer: for { select { case <-terminate: - conn.NetConn().Close() + conn.Close() <-tcpConnDone ret = false break outer case err := <-tcpConnDone: + conn.Close() s.log("ERR: %s", err) ret = true break outer