Browse Source

drastically improve performance when reading streams with TCP

pull/169/head
aler9 5 years ago
parent
commit
eaf115f604
  1. 2
      go.mod
  2. 4
      go.sum
  3. 376
      internal/client/client.go

2
go.mod

@ -5,7 +5,7 @@ go 1.15 @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20201119110120-5019561d3fae
github.com/aler9/gortsplib v0.0.0-20201120083135-e66459731e97
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20201119110120-5019561d3fae h1:FF6+/D0sjbx90ayB6kR3OqFTrynC/2eLIOdY0jB5/io=
github.com/aler9/gortsplib v0.0.0-20201119110120-5019561d3fae/go.mod h1:6yKsTNIrCapRz90WHQtyFV/rKK0TT+QapxUXNqSJi9M=
github.com/aler9/gortsplib v0.0.0-20201120083135-e66459731e97 h1:sefesnUXzUHF4fhS+rnpON5MpMOjka5YFka9P5qiS5s=
github.com/aler9/gortsplib v0.0.0-20201120083135-e66459731e97/go.mod h1:6yKsTNIrCapRz90WHQtyFV/rKK0TT+QapxUXNqSJi9M=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

376
internal/client/client.go

@ -29,9 +29,9 @@ const ( @@ -29,9 +29,9 @@ const (
sessionId = "12345678"
)
type readRequestPair struct {
type readReq struct {
req *base.Request
res chan error
res chan bool
}
type streamTrack struct {
@ -121,10 +121,11 @@ type Client struct { @@ -121,10 +121,11 @@ type Client struct {
udpLastFrameTimes []*int64
describeCSeq base.HeaderValue
describeUrl string
tcpWriteMutex sync.Mutex
tcpWriteOk bool
// in
describeData chan describeData // from path
tcpFrame chan *base.InterleavedFrame // from source
describeData chan describeData // from path
terminate chan struct{}
}
@ -988,7 +989,7 @@ func (c *Client) runWaitingDescribe() bool { @@ -988,7 +989,7 @@ func (c *Client) runWaitingDescribe() bool {
func (c *Client) runPlay() bool {
if c.streamProtocol == gortsplib.StreamProtocolTCP {
c.tcpFrame = make(chan *base.InterleavedFrame)
c.tcpWriteOk = true
}
// start sending frames only after replying to the PLAY request
@ -1002,29 +1003,25 @@ func (c *Client) runPlay() bool { @@ -1002,29 +1003,25 @@ func (c *Client) runPlay() bool {
return "tracks"
}(), c.streamProtocol)
var onReadCmd *externalcmd.ExternalCmd
if c.path.Conf().RunOnRead != "" {
onReadCmd = externalcmd.New(c.path.Conf().RunOnRead, c.path.Conf().RunOnReadRestart, externalcmd.Environment{
onReadCmd := externalcmd.New(c.path.Conf().RunOnRead, c.path.Conf().RunOnReadRestart, externalcmd.Environment{
Path: c.path.Name(),
Port: strconv.FormatInt(int64(c.rtspPort), 10),
})
defer onReadCmd.Close()
}
var ret bool
if c.streamProtocol == gortsplib.StreamProtocolUDP {
ret = c.runPlayUDP()
return c.runPlayUDP()
} else {
ret = c.runPlayTCP()
return c.runPlayTCP()
}
if onReadCmd != nil {
onReadCmd.Close()
}
return ret
}
func (c *Client) runPlayUDP() bool {
readerRequest := make(chan readReq)
defer close(readerRequest)
readerDone := make(chan error)
go func() {
for {
@ -1034,48 +1031,70 @@ func (c *Client) runPlayUDP() bool { @@ -1034,48 +1031,70 @@ func (c *Client) runPlayUDP() bool {
return
}
err = c.handleRequest(req)
if err != nil {
readerDone <- err
okc := make(chan bool)
readerRequest <- readReq{req, okc}
ok := <-okc
if !ok {
readerDone <- nil
return
}
}
}()
select {
case err := <-readerDone:
onError := func(err error) bool {
if err == errStateInitial {
c.state = statePrePlay
c.path.OnClientPause(c)
return true
}
} else {
c.path.OnClientRemove(c)
c.path = nil
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
}
c.parent.OnClientClose(c)
<-c.terminate
return false
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
}
case <-c.terminate:
c.path.OnClientRemove(c)
c.path = nil
c.conn.Close()
<-readerDone
c.parent.OnClientClose(c)
<-c.terminate
return false
}
for {
select {
case req := <-readerRequest:
err := c.handleRequest(req.req)
if err != nil {
req.res <- false
<-readerDone
return onError(err)
}
req.res <- true
case err := <-readerDone:
return onError(err)
case <-c.terminate:
go func() {
for req := range readerRequest {
req.res <- false
}
}()
c.path.OnClientRemove(c)
c.path = nil
c.conn.Close()
<-readerDone
return false
}
}
}
func (c *Client) runPlayTCP() bool {
readRequest := make(chan readRequestPair)
defer close(readRequest)
readerRequest := make(chan readReq)
defer close(readerRequest)
readerDone := make(chan error)
go func() {
@ -1091,80 +1110,65 @@ func (c *Client) runPlayTCP() bool { @@ -1091,80 +1110,65 @@ func (c *Client) runPlayTCP() bool {
// rtcp feedback is handled by gortsplib
case *base.Request:
res := make(chan error)
readRequest <- readRequestPair{recvt, res}
err := <-res
if err != nil {
readerDone <- err
okc := make(chan bool)
readerRequest <- readReq{recvt, okc}
ok := <-okc
if !ok {
readerDone <- nil
return
}
}
}
}()
for {
select {
// responses must be written in the same routine of frames
case req := <-readRequest:
req.res <- c.handleRequest(req.req)
case err := <-readerDone:
if err == errStateInitial {
ch := c.tcpFrame
go func() {
for range ch {
}
}()
c.state = statePrePlay
c.path.OnClientPause(c)
close(c.tcpFrame)
return true
} else {
ch := c.tcpFrame
go func() {
for range ch {
}
}()
onError := func(err error) bool {
if err == errStateInitial {
c.state = statePrePlay
c.path.OnClientPause(c)
return true
}
c.path.OnClientRemove(c)
c.path = nil
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
}
close(c.tcpFrame)
c.path.OnClientRemove(c)
c.path = nil
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
}
c.parent.OnClientClose(c)
<-c.terminate
return false
}
c.parent.OnClientClose(c)
<-c.terminate
return false
for {
select {
case req := <-readerRequest:
c.tcpWriteMutex.Lock()
err := c.handleRequest(req.req)
if err != nil {
c.tcpWriteOk = false
c.tcpWriteMutex.Unlock()
req.res <- false
<-readerDone
return onError(err)
}
c.tcpWriteMutex.Unlock()
req.res <- true
case frame := <-c.tcpFrame:
c.conn.WriteFrameTCP(frame.TrackId, frame.StreamType, frame.Content)
case err := <-readerDone:
return onError(err)
case <-c.terminate:
go func() {
for req := range readRequest {
req.res <- fmt.Errorf("terminated")
}
}()
ch := c.tcpFrame
go func() {
for range ch {
for req := range readerRequest {
req.res <- false
}
}()
c.path.OnClientRemove(c)
c.path = nil
close(c.tcpFrame)
c.conn.Close()
<-readerDone
return false
@ -1220,29 +1224,25 @@ func (c *Client) runRecord() bool { @@ -1220,29 +1224,25 @@ func (c *Client) runRecord() bool {
}
}
var onPublishCmd *externalcmd.ExternalCmd
if c.path.Conf().RunOnPublish != "" {
onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{
onPublishCmd := externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{
Path: c.path.Name(),
Port: strconv.FormatInt(int64(c.rtspPort), 10),
})
defer onPublishCmd.Close()
}
var ret bool
if c.streamProtocol == gortsplib.StreamProtocolUDP {
ret = c.runRecordUDP()
return c.runRecordUDP()
} else {
ret = c.runRecordTCP()
return c.runRecordTCP()
}
if onPublishCmd != nil {
onPublishCmd.Close()
}
return ret
}
func (c *Client) runRecordUDP() bool {
readerRequest := make(chan readReq)
defer close(readerRequest)
readerDone := make(chan error)
go func() {
for {
@ -1252,9 +1252,11 @@ func (c *Client) runRecordUDP() bool { @@ -1252,9 +1252,11 @@ func (c *Client) runRecordUDP() bool {
return
}
err = c.handleRequest(req)
if err != nil {
readerDone <- err
okc := make(chan bool)
readerRequest <- readReq{req, okc}
ok := <-okc
if !ok {
readerDone <- nil
return
}
}
@ -1266,38 +1268,49 @@ func (c *Client) runRecordUDP() bool { @@ -1266,38 +1268,49 @@ func (c *Client) runRecordUDP() bool {
receiverReportTicker := time.NewTicker(receiverReportInterval)
defer receiverReportTicker.Stop()
for {
select {
case err := <-readerDone:
if err == errStateInitial {
for _, track := range c.streamTracks {
c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c)
c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c)
}
onError := func(err error) bool {
if err == errStateInitial {
for _, track := range c.streamTracks {
c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c)
c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c)
}
c.state = statePreRecord
c.path.OnClientPause(c)
c.state = statePreRecord
c.path.OnClientPause(c)
return true
}
return true
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
}
} else {
for _, track := range c.streamTracks {
c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c)
c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c)
}
for _, track := range c.streamTracks {
c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c)
c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c)
}
c.path.OnClientRemove(c)
c.path = nil
c.path.OnClientRemove(c)
c.path = nil
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
}
c.parent.OnClientClose(c)
<-c.terminate
return false
}
c.parent.OnClientClose(c)
<-c.terminate
return false
for {
select {
case req := <-readerRequest:
err := c.handleRequest(req.req)
if err != nil {
req.res <- false
<-readerDone
return onError(err)
}
req.res <- true
case err := <-readerDone:
return onError(err)
case <-checkStreamTicker.C:
now := time.Now()
@ -1306,21 +1319,26 @@ func (c *Client) runRecordUDP() bool { @@ -1306,21 +1319,26 @@ func (c *Client) runRecordUDP() bool {
last := time.Unix(atomic.LoadInt64(lastUnix), 0)
if now.Sub(last) >= c.readTimeout {
for _, track := range c.streamTracks {
c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c)
c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c)
}
go func() {
for req := range readerRequest {
req.res <- false
}
}()
c.log("ERR: no packets received recently (maybe there's a firewall/NAT in between)")
c.conn.Close()
<-readerDone
for _, track := range c.streamTracks {
c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c)
c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c)
}
c.path.OnClientRemove(c)
c.path = nil
c.parent.OnClientClose(c)
<-c.terminate
return false
}
}
@ -1336,25 +1354,30 @@ func (c *Client) runRecordUDP() bool { @@ -1336,25 +1354,30 @@ func (c *Client) runRecordUDP() bool {
}
case <-c.terminate:
go func() {
for req := range readerRequest {
req.res <- false
}
}()
c.conn.Close()
<-readerDone
for _, track := range c.streamTracks {
c.serverUdpRtp.RemovePublisher(c.ip(), track.rtpPort, c)
c.serverUdpRtcp.RemovePublisher(c.ip(), track.rtcpPort, c)
}
c.conn.Close()
<-readerDone
c.path.OnClientRemove(c)
c.path = nil
return false
}
}
}
func (c *Client) runRecordTCP() bool {
readRequest := make(chan readRequestPair)
defer close(readRequest)
readerRequest := make(chan readReq)
defer close(readerRequest)
readerDone := make(chan error)
go func() {
@ -1376,9 +1399,11 @@ func (c *Client) runRecordTCP() bool { @@ -1376,9 +1399,11 @@ func (c *Client) runRecordTCP() bool {
c.path.OnFrame(recvt.TrackId, recvt.StreamType, recvt.Content)
case *base.Request:
err := c.handleRequest(recvt)
if err != nil {
readerDone <- err
okc := make(chan bool)
readerRequest <- readReq{recvt, okc}
ok := <-okc
if !ok {
readerDone <- nil
return
}
}
@ -1388,33 +1413,39 @@ func (c *Client) runRecordTCP() bool { @@ -1388,33 +1413,39 @@ func (c *Client) runRecordTCP() bool {
receiverReportTicker := time.NewTicker(receiverReportInterval)
defer receiverReportTicker.Stop()
for {
select {
// responses must be written in the same routine of receiver reports
case req := <-readRequest:
req.res <- c.handleRequest(req.req)
case err := <-readerDone:
if err == errStateInitial {
c.state = statePreRecord
c.path.OnClientPause(c)
return true
onError := func(err error) bool {
if err == errStateInitial {
c.state = statePreRecord
c.path.OnClientPause(c)
return true
}
} else {
c.path.OnClientRemove(c)
c.path = nil
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
}
c.conn.Close()
if err != io.EOF && err != errStateTerminate {
c.log("ERR: %s", err)
}
c.path.OnClientRemove(c)
c.path = nil
c.parent.OnClientClose(c)
<-c.terminate
c.parent.OnClientClose(c)
<-c.terminate
return false
}
return false
for {
select {
case req := <-readerRequest:
err := c.handleRequest(req.req)
if err != nil {
req.res <- false
<-readerDone
return onError(err)
}
req.res <- true
case err := <-readerDone:
return onError(err)
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
@ -1424,8 +1455,8 @@ func (c *Client) runRecordTCP() bool { @@ -1424,8 +1455,8 @@ func (c *Client) runRecordTCP() bool {
case <-c.terminate:
go func() {
for req := range readRequest {
req.res <- fmt.Errorf("terminated")
for req := range readerRequest {
req.res <- false
}
}()
@ -1434,7 +1465,6 @@ func (c *Client) runRecordTCP() bool { @@ -1434,7 +1465,6 @@ func (c *Client) runRecordTCP() bool {
c.path.OnClientRemove(c)
c.path = nil
return false
}
}
@ -1472,11 +1502,11 @@ func (c *Client) OnReaderFrame(trackId int, streamType base.StreamType, buf []by @@ -1472,11 +1502,11 @@ func (c *Client) OnReaderFrame(trackId int, streamType base.StreamType, buf []by
}
} else {
c.tcpFrame <- &base.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,
Content: buf,
c.tcpWriteMutex.Lock()
if c.tcpWriteOk {
c.conn.WriteFrameTCP(trackId, streamType, buf)
}
c.tcpWriteMutex.Unlock()
}
}

Loading…
Cancel
Save