Browse Source

new parameters: sourceOnDemandStartTimeout, sourceOnDemandCloseAfter, runOnDemandStartTimeout, runOnDemandCloseAfter (#62)

pull/169/head
aler9 5 years ago
parent
commit
20e478b8bd
  1. 2
      README.md
  2. 31
      conf/pathconf.go
  3. 4
      externalcmd/externalcmd.go
  4. 16
      main_test.go
  5. 393
      path/path.go
  6. 32
      rtsp-simple-server.yml
  7. 93
      sourcertmp/source.go
  8. 96
      sourcertsp/source.go

2
README.md

@ -83,7 +83,7 @@ Parameters in maps can be overridden by using underscores, in the following way:
RTSP_PATHS_TEST_SOURCE=rtsp://myurl ./rtsp-simple-server RTSP_PATHS_TEST_SOURCE=rtsp://myurl ./rtsp-simple-server
``` ```
The configuration can be changed dinamically when the server is running (hot reloading) by editing the configuration file: changes are detected and applied without disconnecting existing clients, if possible. The configuration can be changed dinamically when the server is running (hot reloading) by editing the configuration file, Changes are detected and applied without disconnecting existing clients, whenever is possible.
### RTSP proxy mode ### RTSP proxy mode

31
conf/pathconf.go

@ -6,6 +6,7 @@ import (
"net/url" "net/url"
"regexp" "regexp"
"strings" "strings"
"time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
) )
@ -20,11 +21,15 @@ type PathConf struct {
SourceProtocol string `yaml:"sourceProtocol"` SourceProtocol string `yaml:"sourceProtocol"`
SourceProtocolParsed gortsplib.StreamProtocol `yaml:"-" json:"-"` SourceProtocolParsed gortsplib.StreamProtocol `yaml:"-" json:"-"`
SourceOnDemand bool `yaml:"sourceOnDemand"` SourceOnDemand bool `yaml:"sourceOnDemand"`
SourceOnDemandStartTimeout time.Duration `yaml:"sourceOnDemandStartTimeout"`
SourceOnDemandCloseAfter time.Duration `yaml:"sourceOnDemandCloseAfter"`
SourceRedirect string `yaml:"sourceRedirect"` SourceRedirect string `yaml:"sourceRedirect"`
RunOnInit string `yaml:"runOnInit"` RunOnInit string `yaml:"runOnInit"`
RunOnInitRestart bool `yaml:"runOnInitRestart"` RunOnInitRestart bool `yaml:"runOnInitRestart"`
RunOnDemand string `yaml:"runOnDemand"` RunOnDemand string `yaml:"runOnDemand"`
RunOnDemandRestart bool `yaml:"runOnDemandRestart"` RunOnDemandRestart bool `yaml:"runOnDemandRestart"`
RunOnDemandStartTimeout time.Duration `yaml:"runOnDemandStartTimeout"`
RunOnDemandCloseAfter time.Duration `yaml:"runOnDemandCloseAfter"`
RunOnPublish string `yaml:"runOnPublish"` RunOnPublish string `yaml:"runOnPublish"`
RunOnPublishRestart bool `yaml:"runOnPublishRestart"` RunOnPublishRestart bool `yaml:"runOnPublishRestart"`
RunOnRead string `yaml:"runOnRead"` RunOnRead string `yaml:"runOnRead"`
@ -64,7 +69,9 @@ func (pconf *PathConf) fillAndCheck(name string) error {
pconf.Source = "record" pconf.Source = "record"
} }
if strings.HasPrefix(pconf.Source, "rtsp://") { if pconf.Source == "record" {
} else if strings.HasPrefix(pconf.Source, "rtsp://") {
if pconf.Regexp != nil { if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTSP source; use another path") return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTSP source; use another path")
} }
@ -117,8 +124,6 @@ func (pconf *PathConf) fillAndCheck(name string) error {
} }
} }
} else if pconf.Source == "record" {
} else if pconf.Source == "redirect" { } else if pconf.Source == "redirect" {
if pconf.SourceRedirect == "" { if pconf.SourceRedirect == "" {
return fmt.Errorf("source redirect must be filled") return fmt.Errorf("source redirect must be filled")
@ -134,7 +139,15 @@ func (pconf *PathConf) fillAndCheck(name string) error {
} }
} else { } else {
return fmt.Errorf("unsupported source: '%s'", pconf.Source) return fmt.Errorf("invalid source: '%s'", pconf.Source)
}
if pconf.SourceOnDemandStartTimeout == 0 {
pconf.SourceOnDemandStartTimeout = 10 * time.Second
}
if pconf.SourceOnDemandCloseAfter == 0 {
pconf.SourceOnDemandCloseAfter = 10 * time.Second
} }
if pconf.PublishUser != "" { if pconf.PublishUser != "" {
@ -188,10 +201,18 @@ func (pconf *PathConf) fillAndCheck(name string) error {
pconf.ReadIps = nil pconf.ReadIps = nil
} }
if pconf.Regexp != nil && pconf.RunOnInit != "" { if pconf.RunOnInit != "" && pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path") return fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path")
} }
if pconf.RunOnDemandStartTimeout == 0 {
pconf.RunOnDemandStartTimeout = 10 * time.Second
}
if pconf.RunOnDemandCloseAfter == 0 {
pconf.RunOnDemandCloseAfter = 10 * time.Second
}
return nil return nil
} }

4
externalcmd/externalcmd.go

@ -9,7 +9,7 @@ import (
) )
const ( const (
restartPause = 5 * time.Second retryPause = 5 * time.Second
) )
type ExternalCmd struct { type ExternalCmd struct {
@ -57,7 +57,7 @@ func (e *ExternalCmd) run() {
return false return false
} }
t := time.NewTimer(restartPause) t := time.NewTimer(retryPause)
defer t.Stop() defer t.Stop()
select { select {

16
main_test.go

@ -176,6 +176,10 @@ func TestEnvironment(t *testing.T) {
require.Equal(t, true, ok) require.Equal(t, true, ok)
require.Equal(t, &conf.PathConf{ require.Equal(t, &conf.PathConf{
Source: "record", Source: "record",
SourceOnDemandStartTimeout: 10 * time.Second,
SourceOnDemandCloseAfter: 10 * time.Second,
RunOnDemandStartTimeout: 10 * time.Second,
RunOnDemandCloseAfter: 10 * time.Second,
}, pa) }, pa)
pa, ok = p.conf.Paths["~^.*$"] pa, ok = p.conf.Paths["~^.*$"]
@ -184,8 +188,12 @@ func TestEnvironment(t *testing.T) {
Regexp: regexp.MustCompile("^.*$"), Regexp: regexp.MustCompile("^.*$"),
Source: "record", Source: "record",
SourceProtocol: "udp", SourceProtocol: "udp",
SourceOnDemandStartTimeout: 10 * time.Second,
SourceOnDemandCloseAfter: 10 * time.Second,
ReadUser: "testuser", ReadUser: "testuser",
ReadPass: "testpass", ReadPass: "testpass",
RunOnDemandStartTimeout: 10 * time.Second,
RunOnDemandCloseAfter: 10 * time.Second,
}, pa) }, pa)
pa, ok = p.conf.Paths["cam1"] pa, ok = p.conf.Paths["cam1"]
@ -195,6 +203,10 @@ func TestEnvironment(t *testing.T) {
SourceProtocol: "tcp", SourceProtocol: "tcp",
SourceProtocolParsed: gortsplib.StreamProtocolTCP, SourceProtocolParsed: gortsplib.StreamProtocolTCP,
SourceOnDemand: true, SourceOnDemand: true,
SourceOnDemandStartTimeout: 10 * time.Second,
SourceOnDemandCloseAfter: 10 * time.Second,
RunOnDemandStartTimeout: 10 * time.Second,
RunOnDemandCloseAfter: 10 * time.Second,
}, pa) }, pa)
} }
@ -211,6 +223,10 @@ func TestEnvironmentNoFile(t *testing.T) {
require.Equal(t, &conf.PathConf{ require.Equal(t, &conf.PathConf{
Source: "rtsp://testing", Source: "rtsp://testing",
SourceProtocol: "udp", SourceProtocol: "udp",
SourceOnDemandStartTimeout: 10 * time.Second,
SourceOnDemandCloseAfter: 10 * time.Second,
RunOnDemandStartTimeout: 10 * time.Second,
RunOnDemandCloseAfter: 10 * time.Second,
}, pa) }, pa)
} }

393
path/path.go

@ -18,12 +18,11 @@ import (
"github.com/aler9/rtsp-simple-server/stats" "github.com/aler9/rtsp-simple-server/stats"
) )
const ( func newEmptyTimer() *time.Timer {
pathCheckPeriod = 5 * time.Second t := time.NewTimer(0)
describeTimeout = 5 * time.Second <-t.C
sourceStopAfterDescribePeriod = 10 * time.Second return t
onDemandCmdStopAfterDescribePeriod = 10 * time.Second }
)
type Parent interface { type Parent interface {
Log(string, ...interface{}) Log(string, ...interface{})
@ -45,9 +44,8 @@ type source interface {
// * sourcertmp.Source // * sourcertmp.Source
type sourceExternal interface { type sourceExternal interface {
IsSource() IsSource()
IsSourceExternal()
Close() Close()
IsRunning() bool
SetRunning(bool)
} }
type sourceRedirect struct{} type sourceRedirect struct{}
@ -118,6 +116,14 @@ const (
clientStatePreRemove clientStatePreRemove
) )
type sourceState int
const (
sourceStateNotReady sourceState = iota
sourceStateWaitingDescribe
sourceStateReady
)
type Path struct { type Path struct {
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
@ -131,14 +137,20 @@ type Path struct {
clients map[*client.Client]clientState clients map[*client.Client]clientState
clientsWg sync.WaitGroup clientsWg sync.WaitGroup
source source source source
sourceReady bool
sourceTrackCount int sourceTrackCount int
sourceSdp []byte sourceSdp []byte
lastDescribeReq time.Time
lastDescribeActivation time.Time
readers *readersMap readers *readersMap
onInitCmd *externalcmd.ExternalCmd onInitCmd *externalcmd.ExternalCmd
onDemandCmd *externalcmd.ExternalCmd onDemandCmd *externalcmd.ExternalCmd
describeTimer *time.Timer
sourceCloseTimer *time.Timer
sourceCloseTimerStarted bool
sourceState sourceState
sourceWg sync.WaitGroup
runOnDemandCloseTimer *time.Timer
runOnDemandCloseTimerStarted bool
closeTimer *time.Timer
closeTimerStarted bool
// in // in
sourceSetReady chan struct{} // from source sourceSetReady chan struct{} // from source
@ -173,6 +185,10 @@ func New(
parent: parent, parent: parent,
clients: make(map[*client.Client]clientState), clients: make(map[*client.Client]clientState),
readers: newReadersMap(), readers: newReadersMap(),
describeTimer: newEmptyTimer(),
sourceCloseTimer: newEmptyTimer(),
runOnDemandCloseTimer: newEmptyTimer(),
closeTimer: newEmptyTimer(),
sourceSetReady: make(chan struct{}), sourceSetReady: make(chan struct{}),
sourceSetNotReady: make(chan struct{}), sourceSetNotReady: make(chan struct{}),
clientDescribe: make(chan ClientDescribeReq), clientDescribe: make(chan ClientDescribeReq),
@ -200,47 +216,57 @@ func (pa *Path) Log(format string, args ...interface{}) {
func (pa *Path) run() { func (pa *Path) run() {
defer pa.wg.Done() defer pa.wg.Done()
if strings.HasPrefix(pa.conf.Source, "rtsp://") { if pa.conf.Source == "redirect" {
state := !pa.conf.SourceOnDemand
if state {
pa.Log("starting source")
}
pa.source = sourcertsp.New(pa.conf.Source, pa.conf.SourceProtocolParsed,
pa.readTimeout, pa.writeTimeout, state, pa.stats, pa)
} else if strings.HasPrefix(pa.conf.Source, "rtmp://") {
state := !pa.conf.SourceOnDemand
if state {
pa.Log("starting source")
}
pa.source = sourcertmp.New(pa.conf.Source, state, pa.stats, pa)
} else if pa.conf.Source == "redirect" {
pa.source = &sourceRedirect{} pa.source = &sourceRedirect{}
} else if pa.hasExternalSource() && !pa.conf.SourceOnDemand {
pa.startExternalSource()
} }
if pa.conf.RunOnInit != "" { if pa.conf.RunOnInit != "" {
pa.Log("starting on init command") pa.Log("on init command started")
pa.onInitCmd = externalcmd.New(pa.conf.RunOnInit, pa.onInitCmd = externalcmd.New(pa.conf.RunOnInit,
pa.conf.RunOnInitRestart, pa.name) pa.conf.RunOnInitRestart, pa.name)
} }
tickerCheck := time.NewTicker(pathCheckPeriod)
defer tickerCheck.Stop()
outer: outer:
for { for {
select { select {
case <-tickerCheck.C: case <-pa.describeTimer.C:
ok := pa.onCheck() for c, state := range pa.clients {
if !ok { if state == clientStateWaitingDescribe {
pa.removeClient(c)
c.OnPathDescribeData(nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name))
}
}
// set state after removeClient(), so schedule* works once
pa.sourceState = sourceStateNotReady
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
case <-pa.sourceCloseTimer.C:
pa.sourceCloseTimerStarted = false
pa.source.(sourceExternal).Close()
pa.source = nil
pa.scheduleClose()
case <-pa.runOnDemandCloseTimer.C:
pa.runOnDemandCloseTimerStarted = false
pa.Log("on demand command stopped")
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
pa.scheduleClose()
case <-pa.closeTimer.C:
pa.exhaustChannels() pa.exhaustChannels()
pa.parent.OnPathClose(pa) pa.parent.OnPathClose(pa)
<-pa.terminate <-pa.terminate
break outer break outer
}
case <-pa.sourceSetReady: case <-pa.sourceSetReady:
pa.onSourceSetReady() pa.onSourceSetReady()
@ -290,7 +316,7 @@ outer:
} }
if pa.clients[req.client] != clientStatePreRemove { if pa.clients[req.client] != clientStatePreRemove {
pa.onClientPreRemove(req.client) pa.removeClient(req.client)
} }
delete(pa.clients, req.client) delete(pa.clients, req.client)
@ -304,20 +330,23 @@ outer:
} }
} }
pa.describeTimer.Stop()
pa.sourceCloseTimer.Stop()
pa.runOnDemandCloseTimer.Stop()
pa.closeTimer.Stop()
if pa.onInitCmd != nil { if pa.onInitCmd != nil {
pa.Log("stopping on init command (closing)") pa.Log("on init command stopped")
pa.onInitCmd.Close() pa.onInitCmd.Close()
} }
if source, ok := pa.source.(sourceExternal); ok { if source, ok := pa.source.(sourceExternal); ok {
if source.IsRunning() {
pa.Log("stopping on demand source (closing)")
}
source.Close() source.Close()
} }
pa.sourceWg.Wait()
if pa.onDemandCmd != nil { if pa.onDemandCmd != nil {
pa.Log("stopping on demand command (closing)") pa.Log("on demand command stopped")
pa.onDemandCmd.Close() pa.onDemandCmd.Close()
} }
@ -331,7 +360,6 @@ outer:
case clientStateRecord: case clientStateRecord:
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
} }
pa.parent.OnPathClientClose(c) pa.parent.OnPathClientClose(c)
} }
} }
@ -409,25 +437,31 @@ func (pa *Path) exhaustChannels() {
}() }()
} }
func (pa *Path) hasClients() bool { func (pa *Path) hasExternalSource() bool {
for _, state := range pa.clients { return strings.HasPrefix(pa.conf.Source, "rtsp://") ||
if state != clientStatePreRemove { strings.HasPrefix(pa.conf.Source, "rtmp://")
return true }
}
func (pa *Path) startExternalSource() {
if strings.HasPrefix(pa.conf.Source, "rtsp://") {
pa.source = sourcertsp.New(pa.conf.Source, pa.conf.SourceProtocolParsed,
pa.readTimeout, pa.writeTimeout, &pa.sourceWg, pa.stats, pa)
} else if strings.HasPrefix(pa.conf.Source, "rtmp://") {
pa.source = sourcertmp.New(pa.conf.Source, &pa.sourceWg, pa.stats, pa)
} }
return false
} }
func (pa *Path) hasClientsWaitingDescribe() bool { func (pa *Path) hasClients() bool {
for _, state := range pa.clients { for _, state := range pa.clients {
if state == clientStateWaitingDescribe { if state != clientStatePreRemove {
return true return true
} }
} }
return false return false
} }
func (pa *Path) hasClientReadersOrWaitingDescribe() bool { func (pa *Path) hasClientsNotSources() bool {
for c, state := range pa.clients { for c, state := range pa.clients {
if state != clientStatePreRemove && c != pa.source { if state != clientStatePreRemove && c != pa.source {
return true return true
@ -436,130 +470,149 @@ func (pa *Path) hasClientReadersOrWaitingDescribe() bool {
return false return false
} }
func (pa *Path) onCheck() bool { func (pa *Path) addClient(c *client.Client, state clientState) {
// reply to DESCRIBE requests if they are in timeout if _, ok := pa.clients[c]; ok {
if pa.hasClientsWaitingDescribe() && panic("client already added")
time.Since(pa.lastDescribeActivation) >= describeTimeout {
for c, state := range pa.clients {
if state != clientStatePreRemove && state == clientStateWaitingDescribe {
pa.clients[c] = clientStatePreRemove
c.OnPathDescribeData(nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name))
}
}
} }
// stop on demand source if needed pa.clients[c] = state
if source, ok := pa.source.(sourceExternal); ok { pa.clientsWg.Add(1)
if pa.conf.SourceOnDemand && }
source.IsRunning() &&
!pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod {
pa.Log("stopping on demand source (not requested anymore)")
source.SetRunning(false)
}
}
// stop on demand command if needed func (pa *Path) removeClient(c *client.Client) {
if pa.onDemandCmd != nil && state := pa.clients[c]
!pa.hasClientReadersOrWaitingDescribe() && pa.clients[c] = clientStatePreRemove
time.Since(pa.lastDescribeReq) >= onDemandCmdStopAfterDescribePeriod {
pa.Log("stopping on demand command (not requested anymore)") switch state {
pa.onDemandCmd.Close() case clientStatePlay:
pa.onDemandCmd = nil atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readers.remove(c)
case clientStateRecord:
atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.onSourceSetNotReady()
} }
// remove path if is regexp, has no source, has no on-demand command and has no clients if pa.source == c {
if pa.conf.Regexp != nil && pa.source = nil
pa.source == nil &&
pa.onDemandCmd == nil && // close all clients that are reading or waiting to read
!pa.hasClients() { for oc, state := range pa.clients {
return false if state != clientStatePreRemove && state != clientStateWaitingDescribe {
pa.removeClient(oc)
pa.parent.OnPathClientClose(oc)
}
}
} }
return true pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
} }
func (pa *Path) onSourceSetReady() { func (pa *Path) onSourceSetReady() {
pa.sourceReady = true if pa.sourceState == sourceStateWaitingDescribe {
pa.describeTimer.Stop()
pa.describeTimer = newEmptyTimer()
}
pa.sourceState = sourceStateReady
// reply to all clients that are waiting for a description // reply to all clients that are waiting for a description
for c, state := range pa.clients { for c, state := range pa.clients {
if state == clientStateWaitingDescribe { if state == clientStateWaitingDescribe {
pa.clients[c] = clientStatePreRemove pa.removeClient(c)
c.OnPathDescribeData(pa.sourceSdp, "", nil) c.OnPathDescribeData(pa.sourceSdp, "", nil)
} }
} }
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
} }
func (pa *Path) onSourceSetNotReady() { func (pa *Path) onSourceSetNotReady() {
pa.sourceReady = false pa.sourceState = sourceStateNotReady
// close all clients that are reading or waiting to read // close all clients that are reading or waiting to read
for c, state := range pa.clients { for c, state := range pa.clients {
if state != clientStatePreRemove && state != clientStateWaitingDescribe && c != pa.source { if state == clientStateWaitingDescribe {
pa.onClientPreRemove(c) panic("not possible")
}
if c != pa.source && state != clientStatePreRemove {
pa.removeClient(c)
pa.parent.OnPathClientClose(c) pa.parent.OnPathClientClose(c)
} }
} }
} }
func (pa *Path) onClientDescribe(c *client.Client) { func (pa *Path) onClientDescribe(c *client.Client) {
pa.lastDescribeReq = time.Now() // prevent on-demand source from closing
if pa.sourceCloseTimerStarted {
pa.sourceCloseTimer = newEmptyTimer()
pa.sourceCloseTimerStarted = false
}
// prevent on-demand command from closing
if pa.runOnDemandCloseTimerStarted {
pa.runOnDemandCloseTimer = newEmptyTimer()
pa.runOnDemandCloseTimerStarted = false
}
// source not found // start on-demand source
if pa.hasExternalSource() {
if pa.source == nil { if pa.source == nil {
// on demand command is available: put the client on hold pa.startExternalSource()
if pa.sourceState != sourceStateWaitingDescribe {
pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout)
pa.sourceState = sourceStateWaitingDescribe
}
}
}
// start on-demand command
if pa.conf.RunOnDemand != "" { if pa.conf.RunOnDemand != "" {
if pa.onDemandCmd == nil { // start if needed if pa.onDemandCmd == nil {
pa.Log("starting on demand command") pa.Log("on demand command started")
pa.lastDescribeActivation = time.Now()
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand,
pa.conf.RunOnDemandRestart, pa.name) pa.conf.RunOnDemandRestart, pa.name)
}
pa.clients[c] = clientStateWaitingDescribe
pa.clientsWg.Add(1)
// no on-demand: reply with 404
} else {
pa.clients[c] = clientStatePreRemove
pa.clientsWg.Add(1)
c.OnPathDescribeData(nil, "", fmt.Errorf("no one is publishing to path '%s'", pa.name)) if pa.sourceState != sourceStateWaitingDescribe {
pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout)
pa.sourceState = sourceStateWaitingDescribe
}
}
} }
// source found and is redirect if _, ok := pa.source.(*sourceRedirect); ok {
} else if _, ok := pa.source.(*sourceRedirect); ok { pa.addClient(c, clientStatePreRemove)
pa.clients[c] = clientStatePreRemove pa.removeClient(c)
pa.clientsWg.Add(1)
c.OnPathDescribeData(nil, pa.conf.SourceRedirect, nil) c.OnPathDescribeData(nil, pa.conf.SourceRedirect, nil)
return
// source was found but is not ready: put the client on hold
} else if !pa.sourceReady {
// start source if needed
if source, ok := pa.source.(sourceExternal); ok {
if !source.IsRunning() {
pa.Log("starting on demand source")
pa.lastDescribeActivation = time.Now()
source.SetRunning(true)
}
} }
pa.clients[c] = clientStateWaitingDescribe switch pa.sourceState {
pa.clientsWg.Add(1) case sourceStateReady:
pa.addClient(c, clientStatePreRemove)
pa.removeClient(c)
c.OnPathDescribeData(pa.sourceSdp, "", nil)
return
// source was found and is ready case sourceStateWaitingDescribe:
} else { pa.addClient(c, clientStateWaitingDescribe)
pa.clients[c] = clientStatePreRemove return
pa.clientsWg.Add(1)
c.OnPathDescribeData(pa.sourceSdp, "", nil) case sourceStateNotReady:
pa.addClient(c, clientStatePreRemove)
pa.removeClient(c)
c.OnPathDescribeData(nil, "", fmt.Errorf("no one is publishing to path '%s'", pa.name))
return
} }
} }
func (pa *Path) onClientSetupPlay(c *client.Client, trackId int) error { func (pa *Path) onClientSetupPlay(c *client.Client, trackId int) error {
if !pa.sourceReady { if pa.sourceState != sourceStateReady {
return fmt.Errorf("no one is publishing to path '%s'", pa.name) return fmt.Errorf("no one is publishing to path '%s'", pa.name)
} }
@ -568,8 +621,19 @@ func (pa *Path) onClientSetupPlay(c *client.Client, trackId int) error {
} }
if _, ok := pa.clients[c]; !ok { if _, ok := pa.clients[c]; !ok {
pa.clients[c] = clientStatePrePlay // prevent on-demand source from closing
pa.clientsWg.Add(1) if pa.sourceCloseTimerStarted {
pa.sourceCloseTimer = newEmptyTimer()
pa.sourceCloseTimerStarted = false
}
// prevent on-demand command from closing
if pa.runOnDemandCloseTimerStarted {
pa.runOnDemandCloseTimer = newEmptyTimer()
pa.runOnDemandCloseTimerStarted = false
}
pa.addClient(c, clientStatePrePlay)
} }
return nil return nil
@ -595,12 +659,11 @@ func (pa *Path) onClientAnnounce(c *client.Client, tracks gortsplib.Tracks) erro
return fmt.Errorf("already subscribed") return fmt.Errorf("already subscribed")
} }
if pa.source != nil { if pa.source != nil || pa.hasExternalSource() {
return fmt.Errorf("someone is already publishing to path '%s'", pa.name) return fmt.Errorf("someone is already publishing to path '%s'", pa.name)
} }
pa.clients[c] = clientStatePreRecord pa.addClient(c, clientStatePreRecord)
pa.clientsWg.Add(1)
pa.source = c pa.source = c
pa.sourceTrackCount = len(tracks) pa.sourceTrackCount = len(tracks)
@ -624,40 +687,58 @@ func (pa *Path) onClientRecord(c *client.Client) {
pa.onSourceSetReady() pa.onSourceSetReady()
} }
func (pa *Path) onClientPreRemove(c *client.Client) { func (pa *Path) scheduleSourceClose() {
state := pa.clients[c] if !pa.hasExternalSource() || !pa.conf.SourceOnDemand || pa.source == nil {
pa.clients[c] = clientStatePreRemove return
}
switch state {
case clientStatePlay:
atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readers.remove(c)
case clientStateRecord: if pa.sourceCloseTimerStarted ||
atomic.AddInt64(pa.stats.CountPublishers, -1) pa.sourceState == sourceStateWaitingDescribe ||
pa.onSourceSetNotReady() pa.hasClients() {
return
} }
if pa.source == c { pa.sourceCloseTimer.Stop()
pa.source = nil pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter)
pa.sourceCloseTimerStarted = true
}
// close all clients that are reading or waiting to read func (pa *Path) scheduleRunOnDemandClose() {
for oc, state := range pa.clients { if pa.conf.RunOnDemand == "" || pa.onDemandCmd == nil {
if state != clientStatePreRemove && state != clientStateWaitingDescribe && oc != pa.source { return
pa.onClientPreRemove(oc)
pa.parent.OnPathClientClose(oc)
} }
if pa.runOnDemandCloseTimerStarted ||
pa.sourceState == sourceStateWaitingDescribe ||
pa.hasClientsNotSources() {
return
} }
pa.runOnDemandCloseTimer.Stop()
pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter)
pa.runOnDemandCloseTimerStarted = true
}
func (pa *Path) scheduleClose() {
if pa.closeTimerStarted ||
pa.conf.Regexp == nil ||
pa.hasClients() ||
pa.source != nil {
return
} }
pa.closeTimer.Stop()
pa.closeTimer = time.NewTimer(0)
pa.closeTimerStarted = true
} }
func (pa *Path) OnSourceReady(tracks gortsplib.Tracks) { func (pa *Path) OnSourceSetReady(tracks gortsplib.Tracks) {
pa.sourceSdp = tracks.Write() pa.sourceSdp = tracks.Write()
pa.sourceTrackCount = len(tracks) pa.sourceTrackCount = len(tracks)
pa.sourceSetReady <- struct{}{} pa.sourceSetReady <- struct{}{}
} }
func (pa *Path) OnSourceNotReady() { func (pa *Path) OnSourceSetNotReady() {
pa.sourceSetNotReady <- struct{}{} pa.sourceSetNotReady <- struct{}{}
} }

32
rtsp-simple-server.yml

@ -22,14 +22,14 @@ metrics: no
# enable pprof on port 9999 to monitor performances. # enable pprof on port 9999 to monitor performances.
pprof: no pprof: no
# destinations of log messages; available options are "stdout", "file" and "syslog". # destinations of log messages; available values are "stdout", "file" and "syslog".
logDestinations: [stdout] logDestinations: [stdout]
# if "file" is in logDestinations, this is the file that will receive the logs. # if "file" is in logDestinations, this is the file which will receive the logs.
logFile: rtsp-simple-server.log logFile: rtsp-simple-server.log
# command to run when a client connects to the server. # command to run when a client connects to the server.
# this is terminated with SIGINT when a client disconnects from the server. # this is terminated with SIGINT when a client disconnects from the server.
# the restart option allows to restart the command if it exits suddely. # the restart parameter allows to restart the command if it exits suddenly.
runOnConnect: runOnConnect:
runOnConnectRestart: no runOnConnectRestart: no
@ -54,6 +54,12 @@ paths:
# if the source is an RTSP or RTMP url, it will be pulled only when at least # if the source is an RTSP or RTMP url, it will be pulled only when at least
# one reader is connected, saving bandwidth. # one reader is connected, saving bandwidth.
sourceOnDemand: no sourceOnDemand: no
# if sourceOnDemand is "yes", readers will be put on hold until the source is
# ready or until this amount of time has passed.
sourceOnDemandStartTimeout: 10s
# if sourceOnDemand is "yes", the source will be closed when there are no
# readers connected and this amount of time has passed.
sourceOnDemandCloseAfter: 10s
# if the source is "redirect", this is the RTSP url which clients will be # if the source is "redirect", this is the RTSP url which clients will be
# redirected to. # redirected to.
@ -73,32 +79,38 @@ paths:
# ips or networks (x.x.x.x/24) allowed to read. # ips or networks (x.x.x.x/24) allowed to read.
readIps: [] readIps: []
# command to run when this path is loaded by the program. # command to run when this path is initialized.
# this can be used, for example, to publish a stream and keep it always opened. # this can be used to publish a stream and keep it always opened.
# this is terminated with SIGINT when the program closes. # this is terminated with SIGINT when the program closes.
# the path name is available in the RTSP_SERVER_PATH variable. # the path name is available in the RTSP_SERVER_PATH variable.
# the restart option allows to restart the command if it exits suddely. # the restart parameter allows to restart the command if it exits suddenly.
runOnInit: runOnInit:
runOnInitRestart: no runOnInitRestart: no
# command to run when this path is requested. # command to run when this path is requested.
# this can be used, for example, to publish a stream on demand. # this can be used to publish a stream on demand.
# this is terminated with SIGINT when the path is not requested anymore. # this is terminated with SIGINT when the path is not requested anymore.
# the path name is available in the RTSP_SERVER_PATH variable. # the path name is available in the RTSP_SERVER_PATH variable.
# the restart option allows to restart the command if it exits suddely. # the restart parameter allows to restart the command if it exits suddenly.
runOnDemand: runOnDemand:
runOnDemandRestart: no runOnDemandRestart: no
# readers will be put on hold until the runOnDemand command starts publishing
# or until this amount of time has passed.
runOnDemandStartTimeout: 10s
# the runOnDemand command will be closed when there are no
# readers connected and this amount of time has passed.
runOnDemandCloseAfter: 10s
# command to run when a client starts publishing. # command to run when a client starts publishing.
# this is terminated with SIGINT when a client stops publishing. # this is terminated with SIGINT when a client stops publishing.
# the path name is available in the RTSP_SERVER_PATH variable. # the path name is available in the RTSP_SERVER_PATH variable.
# the restart option allows to restart the command if it exits suddely. # the restart parameter allows to restart the command if it exits suddenly.
runOnPublish: runOnPublish:
runOnPublishRestart: no runOnPublishRestart: no
# command to run when a clients starts reading. # command to run when a clients starts reading.
# this is terminated with SIGINT when a client stops reading. # this is terminated with SIGINT when a client stops reading.
# the path name is available in the RTSP_SERVER_PATH variable. # the path name is available in the RTSP_SERVER_PATH variable.
# the restart option allows to restart the command if it exits suddely. # the restart parameter allows to restart the command if it exits suddenly.
runOnRead: runOnRead:
runOnReadRestart: no runOnReadRestart: no

93
sourcertmp/source.go

@ -3,6 +3,7 @@ package sourcertmp
import ( import (
"fmt" "fmt"
"net" "net"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -17,126 +18,74 @@ import (
) )
const ( const (
retryInterval = 5 * time.Second retryPause = 5 * time.Second
) )
type Parent interface { type Parent interface {
Log(string, ...interface{}) Log(string, ...interface{})
OnSourceReady(gortsplib.Tracks) OnSourceSetReady(gortsplib.Tracks)
OnSourceNotReady() OnSourceSetNotReady()
OnFrame(int, gortsplib.StreamType, []byte) OnFrame(int, gortsplib.StreamType, []byte)
} }
type Source struct { type Source struct {
ur string ur string
state bool state bool
wg *sync.WaitGroup
stats *stats.Stats stats *stats.Stats
parent Parent parent Parent
innerState bool
// in // in
innerTerminate chan struct{}
innerDone chan struct{}
stateChange chan bool
terminate chan struct{} terminate chan struct{}
// out
done chan struct{}
} }
func New(ur string, func New(ur string,
state bool, wg *sync.WaitGroup,
stats *stats.Stats, stats *stats.Stats,
parent Parent) *Source { parent Parent) *Source {
s := &Source{ s := &Source{
ur: ur, ur: ur,
state: state, wg: wg,
stats: stats, stats: stats,
parent: parent, parent: parent,
stateChange: make(chan bool),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}),
} }
atomic.AddInt64(s.stats.CountSourcesRtmp, +1) atomic.AddInt64(s.stats.CountSourcesRtmp, +1)
s.parent.Log("rtmp source started")
s.wg.Add(1)
go s.run() go s.run()
s.SetRunning(s.state)
return s return s
} }
func (s *Source) Close() { func (s *Source) Close() {
atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1)
s.parent.Log("rtmp source stopped")
close(s.terminate) close(s.terminate)
<-s.done
} }
func (s *Source) IsSource() {} func (s *Source) IsSource() {}
func (s *Source) IsRunning() bool { func (s *Source) IsSourceExternal() {}
return s.state
}
func (s *Source) SetRunning(state bool) {
s.state = state
s.stateChange <- s.state
}
func (s *Source) run() { func (s *Source) run() {
defer close(s.done) defer s.wg.Done()
outer:
for {
select {
case state := <-s.stateChange:
if state {
if !s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtmpRunning, +1)
s.innerState = true
s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{})
go s.runInner()
}
} else {
if s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1)
close(s.innerTerminate)
<-s.innerDone
s.innerState = false
}
}
case <-s.terminate:
break outer
}
}
if s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1)
close(s.innerTerminate)
<-s.innerDone
}
close(s.stateChange)
}
func (s *Source) runInner() {
defer close(s.innerDone)
for { for {
ok := func() bool { ok := func() bool {
ok := s.runInnerInner() ok := s.runInner()
if !ok { if !ok {
return false return false
} }
t := time.NewTimer(retryInterval) t := time.NewTimer(retryPause)
defer t.Stop() defer t.Stop()
select { select {
case <-t.C: case <-t.C:
return true return true
case <-s.innerTerminate: case <-s.terminate:
return false return false
} }
}() }()
@ -146,7 +95,7 @@ func (s *Source) runInner() {
} }
} }
func (s *Source) runInnerInner() bool { func (s *Source) runInner() bool {
s.parent.Log("connecting to rtmp source") s.parent.Log("connecting to rtmp source")
var conn *rtmp.Conn var conn *rtmp.Conn
@ -159,7 +108,7 @@ func (s *Source) runInnerInner() bool {
}() }()
select { select {
case <-s.innerTerminate: case <-s.terminate:
return false return false
case <-dialDone: case <-dialDone:
} }
@ -271,8 +220,8 @@ func (s *Source) runInnerInner() bool {
return true return true
} }
s.parent.OnSourceReady(tracks)
s.parent.Log("rtmp source ready") s.parent.Log("rtmp source ready")
s.parent.OnSourceSetReady(tracks)
readDone := make(chan error) readDone := make(chan error)
go func() { go func() {
@ -336,7 +285,7 @@ func (s *Source) runInnerInner() bool {
outer: outer:
for { for {
select { select {
case <-s.innerTerminate: case <-s.terminate:
nconn.Close() nconn.Close()
<-readDone <-readDone
ret = false ret = false
@ -350,7 +299,7 @@ outer:
} }
} }
s.parent.OnSourceNotReady() s.parent.OnSourceSetNotReady()
return ret return ret
} }

96
sourcertsp/source.go

@ -12,13 +12,13 @@ import (
) )
const ( const (
retryInterval = 5 * time.Second retryPause = 5 * time.Second
) )
type Parent interface { type Parent interface {
Log(string, ...interface{}) Log(string, ...interface{})
OnSourceReady(gortsplib.Tracks) OnSourceSetReady(gortsplib.Tracks)
OnSourceNotReady() OnSourceSetNotReady()
OnFrame(int, gortsplib.StreamType, []byte) OnFrame(int, gortsplib.StreamType, []byte)
} }
@ -27,16 +27,11 @@ type Source struct {
proto gortsplib.StreamProtocol proto gortsplib.StreamProtocol
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
state bool wg *sync.WaitGroup
stats *stats.Stats stats *stats.Stats
parent Parent parent Parent
innerState bool
// in // in
innerTerminate chan struct{}
innerDone chan struct{}
stateChange chan bool
terminate chan struct{} terminate chan struct{}
// out // out
@ -47,7 +42,7 @@ func New(ur string,
proto gortsplib.StreamProtocol, proto gortsplib.StreamProtocol,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration, writeTimeout time.Duration,
state bool, wg *sync.WaitGroup,
stats *stats.Stats, stats *stats.Stats,
parent Parent) *Source { parent Parent) *Source {
s := &Source{ s := &Source{
@ -55,92 +50,47 @@ func New(ur string,
proto: proto, proto: proto,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
state: state, wg: wg,
stats: stats, stats: stats,
parent: parent, parent: parent,
stateChange: make(chan bool),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}),
} }
atomic.AddInt64(s.stats.CountSourcesRtsp, +1) atomic.AddInt64(s.stats.CountSourcesRtsp, +1)
s.parent.Log("rtsp source started")
s.wg.Add(1)
go s.run() go s.run()
s.SetRunning(s.state)
return s return s
} }
func (s *Source) Close() { func (s *Source) Close() {
atomic.AddInt64(s.stats.CountSourcesRtsp, -1)
s.parent.Log("rtsp source stopped")
close(s.terminate) close(s.terminate)
<-s.done
} }
func (s *Source) IsSource() {} func (s *Source) IsSource() {}
func (s *Source) IsRunning() bool { func (s *Source) IsSourceExternal() {}
return s.state
}
func (s *Source) SetRunning(state bool) {
s.state = state
s.stateChange <- s.state
}
func (s *Source) run() { func (s *Source) run() {
defer close(s.done) defer s.wg.Done()
outer:
for {
select {
case state := <-s.stateChange:
if state {
if !s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtspRunning, +1)
s.innerState = true
s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{})
go s.runInner()
}
} else {
if s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtspRunning, -1)
close(s.innerTerminate)
<-s.innerDone
s.innerState = false
}
}
case <-s.terminate:
break outer
}
}
if s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtspRunning, -1)
close(s.innerTerminate)
<-s.innerDone
}
close(s.stateChange)
}
func (s *Source) runInner() {
defer close(s.innerDone)
for { for {
ok := func() bool { ok := func() bool {
ok := s.runInnerInner() ok := s.runInner()
if !ok { if !ok {
return false return false
} }
t := time.NewTimer(retryInterval) t := time.NewTimer(retryPause)
defer t.Stop() defer t.Stop()
select { select {
case <-t.C: case <-t.C:
return true return true
case <-s.innerTerminate: case <-s.terminate:
return false return false
} }
}() }()
@ -150,7 +100,7 @@ func (s *Source) runInner() {
} }
} }
func (s *Source) runInnerInner() bool { func (s *Source) runInner() bool {
s.parent.Log("connecting to rtsp source") s.parent.Log("connecting to rtsp source")
u, _ := url.Parse(s.ur) u, _ := url.Parse(s.ur)
@ -169,7 +119,7 @@ func (s *Source) runInnerInner() bool {
}() }()
select { select {
case <-s.innerTerminate: case <-s.terminate:
return false return false
case <-dialDone: case <-dialDone:
} }
@ -217,8 +167,8 @@ func (s *Source) runUDP(u *url.URL, conn *gortsplib.ConnClient, tracks gortsplib
return true return true
} }
s.parent.OnSourceReady(tracks)
s.parent.Log("rtsp source ready") s.parent.Log("rtsp source ready")
s.parent.OnSourceSetReady(tracks)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -266,7 +216,7 @@ func (s *Source) runUDP(u *url.URL, conn *gortsplib.ConnClient, tracks gortsplib
outer: outer:
for { for {
select { select {
case <-s.innerTerminate: case <-s.terminate:
conn.Close() conn.Close()
<-tcpConnDone <-tcpConnDone
ret = false ret = false
@ -282,7 +232,7 @@ outer:
wg.Wait() wg.Wait()
s.parent.OnSourceNotReady() s.parent.OnSourceSetNotReady()
return ret return ret
} }
@ -304,8 +254,8 @@ func (s *Source) runTCP(u *url.URL, conn *gortsplib.ConnClient, tracks gortsplib
return true return true
} }
s.parent.OnSourceReady(tracks)
s.parent.Log("rtsp source ready") s.parent.Log("rtsp source ready")
s.parent.OnSourceSetReady(tracks)
tcpConnDone := make(chan error) tcpConnDone := make(chan error)
go func() { go func() {
@ -325,7 +275,7 @@ func (s *Source) runTCP(u *url.URL, conn *gortsplib.ConnClient, tracks gortsplib
outer: outer:
for { for {
select { select {
case <-s.innerTerminate: case <-s.terminate:
conn.Close() conn.Close()
<-tcpConnDone <-tcpConnDone
ret = false ret = false
@ -339,7 +289,7 @@ outer:
} }
} }
s.parent.OnSourceNotReady() s.parent.OnSourceSetNotReady()
return ret return ret
} }

Loading…
Cancel
Save