Browse Source

move RTCP receivers into gortsplib

pull/181/head
aler9 4 years ago
parent
commit
d590805cda
  1. 2
      go.mod
  2. 4
      go.sum
  3. 126
      internal/client/client.go

2
go.mod

@ -5,7 +5,7 @@ go 1.15 @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210106201702-d17ef3fcc3ff
github.com/aler9/gortsplib v0.0.0-20210106212707-7b8b8e7c84d6
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210106201702-d17ef3fcc3ff h1:rXe4QSWV7QwDaOW1NCqEOa7T4p5N86Q13urvo82TuPg=
github.com/aler9/gortsplib v0.0.0-20210106201702-d17ef3fcc3ff/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/aler9/gortsplib v0.0.0-20210106212707-7b8b8e7c84d6 h1:rqUzaMn1Yg2LK/yr+gaRXnMC/o1ciZuW4wZubzO8BGo=
github.com/aler9/gortsplib v0.0.0-20210106212707-7b8b8e7c84d6/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

126
internal/client/client.go

@ -15,7 +15,6 @@ import ( @@ -15,7 +15,6 @@ import (
"github.com/aler9/gortsplib/pkg/auth"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
@ -24,10 +23,8 @@ import ( @@ -24,10 +23,8 @@ import (
)
const (
checkStreamInterval = 5 * time.Second
receiverReportInterval = 10 * time.Second
sessionID = "12345678"
pauseAfterAuthError = 2 * time.Second
sessionID = "12345678"
pauseAfterAuthError = 2 * time.Second
)
type describeData struct {
@ -69,22 +66,17 @@ type Client struct { @@ -69,22 +66,17 @@ type Client struct {
conn *gortsplib.ServerConn
parent Parent
path Path
authUser string
authPass string
authValidator *auth.Validator
authFailures int
rtcpReceivers map[int]*rtcpreceiver.RTCPReceiver
udpLastFrameTimes []*int64
onReadCmd *externalcmd.Cmd
onPublishCmd *externalcmd.Cmd
path Path
authUser string
authPass string
authValidator *auth.Validator
authFailures int
onReadCmd *externalcmd.Cmd
onPublishCmd *externalcmd.Cmd
// in
describeData chan describeData // from path
terminate chan struct{}
backgroundRecordTerminate chan struct{}
backgroundRecordDone chan struct{}
}
// New allocates a Client.
@ -110,7 +102,6 @@ func New( @@ -110,7 +102,6 @@ func New(
stats: stats,
conn: conn,
parent: parent,
rtcpReceivers: make(map[int]*rtcpreceiver.RTCPReceiver),
terminate: make(chan struct{}),
}
@ -282,11 +273,6 @@ func (c *Client) run() { @@ -282,11 +273,6 @@ func (c *Client) run() {
}
}
for trackID, t := range tracks {
clockRate, _ := t.ClockRate()
c.rtcpReceivers[trackID] = rtcpreceiver.New(nil, clockRate)
}
c.path = path
return &base.Response{
@ -498,12 +484,6 @@ func (c *Client) run() { @@ -498,12 +484,6 @@ func (c *Client) run() {
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath)
}
if c.conn.TracksLen() != c.path.SourceTrackCount() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("not all tracks have been setup")
}
c.startRecord()
return &base.Response{
@ -538,16 +518,7 @@ func (c *Client) run() { @@ -538,16 +518,7 @@ func (c *Client) run() {
return
}
if *c.conn.TracksProtocol() == gortsplib.StreamProtocolUDP {
now := time.Now()
atomic.StoreInt64(c.udpLastFrameTimes[trackID], now.Unix())
c.rtcpReceivers[trackID].ProcessFrame(now, streamType, payload)
c.path.OnFrame(trackID, streamType, payload)
} else {
c.rtcpReceivers[trackID].ProcessFrame(time.Now(), streamType, payload)
c.path.OnFrame(trackID, streamType, payload)
}
c.path.OnFrame(trackID, streamType, payload)
}
readDone := c.conn.Read(gortsplib.ServerConnReadHandlers{
@ -718,97 +689,20 @@ func (c *Client) startRecord() { @@ -718,97 +689,20 @@ func (c *Client) startRecord() {
return "tracks"
}(), *c.conn.TracksProtocol())
if *c.conn.TracksProtocol() == gortsplib.StreamProtocolUDP {
c.udpLastFrameTimes = make([]*int64, c.conn.TracksLen())
for trackID := range c.conn.Tracks() {
v := time.Now().Unix()
c.udpLastFrameTimes[trackID] = &v
}
}
if c.path.Conf().RunOnPublish != "" {
c.onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{
Path: c.path.Name(),
Port: strconv.FormatInt(int64(c.rtspPort), 10),
})
}
c.backgroundRecordTerminate = make(chan struct{})
c.backgroundRecordDone = make(chan struct{})
if *c.conn.TracksProtocol() == gortsplib.StreamProtocolUDP {
go c.backgroundRecordUDP()
} else {
go c.backgroundRecordTCP()
}
}
func (c *Client) stopRecord() {
close(c.backgroundRecordTerminate)
<-c.backgroundRecordDone
if c.path.Conf().RunOnPublish != "" {
c.onPublishCmd.Close()
}
}
func (c *Client) backgroundRecordUDP() {
defer close(c.backgroundRecordDone)
checkStreamTicker := time.NewTicker(checkStreamInterval)
defer checkStreamTicker.Stop()
receiverReportTicker := time.NewTicker(receiverReportInterval)
defer receiverReportTicker.Stop()
for {
select {
case <-checkStreamTicker.C:
now := time.Now()
for _, lastUnix := range c.udpLastFrameTimes {
last := time.Unix(atomic.LoadInt64(lastUnix), 0)
if now.Sub(last) >= c.readTimeout {
c.log(logger.Info, "ERR: no UDP packets received recently (maybe there's a firewall/NAT in between)")
c.conn.Close()
return
}
}
case <-receiverReportTicker.C:
now := time.Now()
for trackID := range c.conn.Tracks() {
r := c.rtcpReceivers[trackID].Report(now)
c.conn.WriteFrame(trackID, gortsplib.StreamTypeRTP, r)
}
case <-c.backgroundRecordTerminate:
return
}
}
}
func (c *Client) backgroundRecordTCP() {
defer close(c.backgroundRecordDone)
receiverReportTicker := time.NewTicker(receiverReportInterval)
defer receiverReportTicker.Stop()
for {
select {
case <-receiverReportTicker.C:
now := time.Now()
for trackID := range c.conn.Tracks() {
r := c.rtcpReceivers[trackID].Report(now)
c.conn.WriteFrame(trackID, gortsplib.StreamTypeRTCP, r)
}
case <-c.backgroundRecordTerminate:
return
}
}
}
// OnReaderFrame implements path.Reader.
func (c *Client) OnReaderFrame(trackID int, streamType base.StreamType, buf []byte) {
if !c.conn.HasTrack(trackID) {

Loading…
Cancel
Save