From eaf115f60444bd3e29f80a1191b371b26f58c2ae Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 21 Nov 2020 11:57:38 +0100 Subject: [PATCH] drastically improve performance when reading streams with TCP --- go.mod | 2 +- go.sum | 4 +- internal/client/client.go | 376 ++++++++++++++++++++------------------ 3 files changed, 206 insertions(+), 176 deletions(-) diff --git a/go.mod b/go.mod index b8b467d3..35d738dd 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20201119110120-5019561d3fae + github.com/aler9/gortsplib v0.0.0-20201120083135-e66459731e97 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 diff --git a/go.sum b/go.sum index 698adb42..f7871939 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20201119110120-5019561d3fae h1:FF6+/D0sjbx90ayB6kR3OqFTrynC/2eLIOdY0jB5/io= -github.com/aler9/gortsplib v0.0.0-20201119110120-5019561d3fae/go.mod h1:6yKsTNIrCapRz90WHQtyFV/rKK0TT+QapxUXNqSJi9M= +github.com/aler9/gortsplib v0.0.0-20201120083135-e66459731e97 h1:sefesnUXzUHF4fhS+rnpON5MpMOjka5YFka9P5qiS5s= +github.com/aler9/gortsplib v0.0.0-20201120083135-e66459731e97/go.mod h1:6yKsTNIrCapRz90WHQtyFV/rKK0TT+QapxUXNqSJi9M= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/internal/client/client.go b/internal/client/client.go index 7a49c920..57e7805c 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -29,9 +29,9 @@ const ( sessionId = "12345678" ) -type readRequestPair struct { +type readReq struct { req *base.Request - res chan error + res chan bool } type streamTrack struct { @@ -121,10 +121,11 @@ type Client struct { udpLastFrameTimes []*int64 describeCSeq base.HeaderValue describeUrl string + tcpWriteMutex sync.Mutex + tcpWriteOk bool // in - describeData chan describeData // from path - tcpFrame chan *base.InterleavedFrame // from source + describeData chan describeData // from path terminate chan struct{} } @@ -988,7 +989,7 @@ func (c *Client) runWaitingDescribe() bool { func (c *Client) runPlay() bool { if c.streamProtocol == gortsplib.StreamProtocolTCP { - c.tcpFrame = make(chan *base.InterleavedFrame) + c.tcpWriteOk = true } // start sending frames only after replying to the PLAY request @@ -1002,29 +1003,25 @@ func (c *Client) runPlay() bool { return "tracks" }(), c.streamProtocol) - var onReadCmd *externalcmd.ExternalCmd if c.path.Conf().RunOnRead != "" { - onReadCmd = externalcmd.New(c.path.Conf().RunOnRead, c.path.Conf().RunOnReadRestart, externalcmd.Environment{ + onReadCmd := externalcmd.New(c.path.Conf().RunOnRead, c.path.Conf().RunOnReadRestart, externalcmd.Environment{ Path: c.path.Name(), Port: strconv.FormatInt(int64(c.rtspPort), 10), }) + defer onReadCmd.Close() } - var ret bool if c.streamProtocol == gortsplib.StreamProtocolUDP { - ret = c.runPlayUDP() + return c.runPlayUDP() } else { - ret = c.runPlayTCP() + return c.runPlayTCP() } - - if onReadCmd != nil { - onReadCmd.Close() - } - - return ret } func (c *Client) runPlayUDP() bool { + readerRequest := make(chan readReq) + defer close(readerRequest) + readerDone := make(chan error) go func() { for { @@ -1034,48 +1031,70 @@ func (c *Client) runPlayUDP() bool { return } - err = c.handleRequest(req) - if err != nil { - readerDone <- err + okc := make(chan bool) + readerRequest <- readReq{req, okc} + ok := <-okc + if !ok { + readerDone <- nil return } } }() - select { - case err := <-readerDone: + onError := func(err error) bool { if err == errStateInitial { c.state = statePrePlay c.path.OnClientPause(c) return true + } - } else { - c.path.OnClientRemove(c) - c.path = nil - - c.conn.Close() - if err != io.EOF && err != errStateTerminate { - c.log("ERR: %s", err) - } - - c.parent.OnClientClose(c) - <-c.terminate - return false + c.conn.Close() + if err != io.EOF && err != errStateTerminate { + c.log("ERR: %s", err) } - case <-c.terminate: c.path.OnClientRemove(c) c.path = nil - c.conn.Close() - <-readerDone + c.parent.OnClientClose(c) + <-c.terminate return false } + + for { + select { + case req := <-readerRequest: + err := c.handleRequest(req.req) + if err != nil { + req.res <- false + <-readerDone + return onError(err) + } + req.res <- true + + case err := <-readerDone: + return onError(err) + + case <-c.terminate: + go func() { + for req := range readerRequest { + req.res <- false + } + }() + + c.path.OnClientRemove(c) + c.path = nil + + c.conn.Close() + <-readerDone + return false + } + } } func (c *Client) runPlayTCP() bool { - readRequest := make(chan readRequestPair) - defer close(readRequest) + readerRequest := make(chan readReq) + defer close(readerRequest) readerDone := make(chan error) go func() { @@ -1091,80 +1110,65 @@ func (c *Client) runPlayTCP() bool { // rtcp feedback is handled by gortsplib case *base.Request: - res := make(chan error) - readRequest <- readRequestPair{recvt, res} - err := <-res - if err != nil { - readerDone <- err + okc := make(chan bool) + readerRequest <- readReq{recvt, okc} + ok := <-okc + if !ok { + readerDone <- nil return } } } }() - for { - select { - // responses must be written in the same routine of frames - case req := <-readRequest: - req.res <- c.handleRequest(req.req) - - case err := <-readerDone: - if err == errStateInitial { - ch := c.tcpFrame - go func() { - for range ch { - } - }() - - c.state = statePrePlay - c.path.OnClientPause(c) - - close(c.tcpFrame) - return true - - } else { - ch := c.tcpFrame - go func() { - for range ch { - } - }() + onError := func(err error) bool { + if err == errStateInitial { + c.state = statePrePlay + c.path.OnClientPause(c) + return true + } - c.path.OnClientRemove(c) - c.path = nil + c.conn.Close() + if err != io.EOF && err != errStateTerminate { + c.log("ERR: %s", err) + } - close(c.tcpFrame) + c.path.OnClientRemove(c) + c.path = nil - c.conn.Close() - if err != io.EOF && err != errStateTerminate { - c.log("ERR: %s", err) - } + c.parent.OnClientClose(c) + <-c.terminate + return false + } - c.parent.OnClientClose(c) - <-c.terminate - return false + for { + select { + case req := <-readerRequest: + c.tcpWriteMutex.Lock() + err := c.handleRequest(req.req) + if err != nil { + c.tcpWriteOk = false + c.tcpWriteMutex.Unlock() + req.res <- false + <-readerDone + return onError(err) } + c.tcpWriteMutex.Unlock() + req.res <- true - case frame := <-c.tcpFrame: - c.conn.WriteFrameTCP(frame.TrackId, frame.StreamType, frame.Content) + case err := <-readerDone: + return onError(err) case <-c.terminate: go func() { - for req := range readRequest { - req.res <- fmt.Errorf("terminated") - } - }() - - ch := c.tcpFrame - go func() { - for range ch { + for req := range readerRequest { + req.res <- false } }() c.path.OnClientRemove(c) c.path = nil - close(c.tcpFrame) - c.conn.Close() <-readerDone return false @@ -1220,29 +1224,25 @@ func (c *Client) runRecord() bool { } } - var onPublishCmd *externalcmd.ExternalCmd if c.path.Conf().RunOnPublish != "" { - onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{ + onPublishCmd := externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{ Path: c.path.Name(), Port: strconv.FormatInt(int64(c.rtspPort), 10), }) + defer onPublishCmd.Close() } - var ret bool if c.streamProtocol == gortsplib.StreamProtocolUDP { - ret = c.runRecordUDP() + return c.runRecordUDP() } else { - ret = c.runRecordTCP() + return c.runRecordTCP() } - - if onPublishCmd != nil { - onPublishCmd.Close() - } - - return ret } func (c *Client) runRecordUDP() bool { + readerRequest := make(chan readReq) + defer close(readerRequest) + readerDone := make(chan error) go func() { for { @@ -1252,9 +1252,11 @@ func (c *Client) runRecordUDP() bool { return } - err = c.handleRequest(req) - if err != nil { - readerDone <- err + okc := make(chan bool) + readerRequest <- readReq{req, okc} + ok := <-okc + if !ok { + readerDone <- nil return } } @@ -1266,38 +1268,49 @@ func (c *Client) runRecordUDP() bool { receiverReportTicker := time.NewTicker(receiverReportInterval) defer receiverReportTicker.Stop() - for { - select { - case err := <-readerDone: - if err == errStateInitial { - for _, track := range c.streamTracks { - c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c) - c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c) - } + onError := func(err error) bool { + if err == errStateInitial { + for _, track := range c.streamTracks { + c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c) + c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c) + } - c.state = statePreRecord - c.path.OnClientPause(c) + c.state = statePreRecord + c.path.OnClientPause(c) + return true + } - return true + c.conn.Close() + if err != io.EOF && err != errStateTerminate { + c.log("ERR: %s", err) + } - } else { - for _, track := range c.streamTracks { - c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c) - c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c) - } + for _, track := range c.streamTracks { + c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c) + c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c) + } - c.path.OnClientRemove(c) - c.path = nil + c.path.OnClientRemove(c) + c.path = nil - c.conn.Close() - if err != io.EOF && err != errStateTerminate { - c.log("ERR: %s", err) - } + c.parent.OnClientClose(c) + <-c.terminate + return false + } - c.parent.OnClientClose(c) - <-c.terminate - return false + for { + select { + case req := <-readerRequest: + err := c.handleRequest(req.req) + if err != nil { + req.res <- false + <-readerDone + return onError(err) } + req.res <- true + + case err := <-readerDone: + return onError(err) case <-checkStreamTicker.C: now := time.Now() @@ -1306,21 +1319,26 @@ func (c *Client) runRecordUDP() bool { last := time.Unix(atomic.LoadInt64(lastUnix), 0) if now.Sub(last) >= c.readTimeout { - for _, track := range c.streamTracks { - c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c) - c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c) - } + go func() { + for req := range readerRequest { + req.res <- false + } + }() c.log("ERR: no packets received recently (maybe there's a firewall/NAT in between)") c.conn.Close() <-readerDone + for _, track := range c.streamTracks { + c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c) + c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c) + } + c.path.OnClientRemove(c) c.path = nil c.parent.OnClientClose(c) <-c.terminate - return false } } @@ -1336,25 +1354,30 @@ func (c *Client) runRecordUDP() bool { } case <-c.terminate: + go func() { + for req := range readerRequest { + req.res <- false + } + }() + + c.conn.Close() + <-readerDone + for _, track := range c.streamTracks { c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c) c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c) } - c.conn.Close() - <-readerDone - c.path.OnClientRemove(c) c.path = nil - return false } } } func (c *Client) runRecordTCP() bool { - readRequest := make(chan readRequestPair) - defer close(readRequest) + readerRequest := make(chan readReq) + defer close(readerRequest) readerDone := make(chan error) go func() { @@ -1376,9 +1399,11 @@ func (c *Client) runRecordTCP() bool { c.path.OnFrame(recvt.TrackId, recvt.StreamType, recvt.Content) case *base.Request: - err := c.handleRequest(recvt) - if err != nil { - readerDone <- err + okc := make(chan bool) + readerRequest <- readReq{recvt, okc} + ok := <-okc + if !ok { + readerDone <- nil return } } @@ -1388,33 +1413,39 @@ func (c *Client) runRecordTCP() bool { receiverReportTicker := time.NewTicker(receiverReportInterval) defer receiverReportTicker.Stop() - for { - select { - // responses must be written in the same routine of receiver reports - case req := <-readRequest: - req.res <- c.handleRequest(req.req) - - case err := <-readerDone: - if err == errStateInitial { - c.state = statePreRecord - c.path.OnClientPause(c) - - return true + onError := func(err error) bool { + if err == errStateInitial { + c.state = statePreRecord + c.path.OnClientPause(c) + return true + } - } else { - c.path.OnClientRemove(c) - c.path = nil + c.conn.Close() + if err != io.EOF && err != errStateTerminate { + c.log("ERR: %s", err) + } - c.conn.Close() - if err != io.EOF && err != errStateTerminate { - c.log("ERR: %s", err) - } + c.path.OnClientRemove(c) + c.path = nil - c.parent.OnClientClose(c) - <-c.terminate + c.parent.OnClientClose(c) + <-c.terminate + return false + } - return false + for { + select { + case req := <-readerRequest: + err := c.handleRequest(req.req) + if err != nil { + req.res <- false + <-readerDone + return onError(err) } + req.res <- true + + case err := <-readerDone: + return onError(err) case <-receiverReportTicker.C: for trackId := range c.streamTracks { @@ -1424,8 +1455,8 @@ func (c *Client) runRecordTCP() bool { case <-c.terminate: go func() { - for req := range readRequest { - req.res <- fmt.Errorf("terminated") + for req := range readerRequest { + req.res <- false } }() @@ -1434,7 +1465,6 @@ func (c *Client) runRecordTCP() bool { c.path.OnClientRemove(c) c.path = nil - return false } } @@ -1472,11 +1502,11 @@ func (c *Client) OnReaderFrame(trackId int, streamType base.StreamType, buf []by } } else { - c.tcpFrame <- &base.InterleavedFrame{ - TrackId: trackId, - StreamType: streamType, - Content: buf, + c.tcpWriteMutex.Lock() + if c.tcpWriteOk { + c.conn.WriteFrameTCP(trackId, streamType, buf) } + c.tcpWriteMutex.Unlock() } }