From ae1b7f4dedfaf6a4639677e330c53214d742e9a6 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 25 Oct 2020 11:41:41 +0100 Subject: [PATCH] manage external sources with a common interface --- path/path.go | 101 ++++++++++++++----------------------------- sourcertmp/source.go | 75 +++++++++++++++----------------- sourcertsp/source.go | 77 +++++++++++++++------------------ 3 files changed, 104 insertions(+), 149 deletions(-) diff --git a/path/path.go b/path/path.go index af246bb5..e27959db 100644 --- a/path/path.go +++ b/path/path.go @@ -31,11 +31,19 @@ type Parent interface { OnPathClientClose(*client.Client) } -// a source can be a client, a sourcertsp.Source or a sourcertmp.Source +// a source is either a client.Client, a sourcertsp.Source or a sourcertmp.Source type source interface { IsSource() } +// a sourceExternal is either a sourcertsp.Source or a sourcertmp.Source +type sourceExternal interface { + IsSource() + Close() + IsRunning() bool + SetRunning(bool) +} + type ClientDescribeRes struct { Path client.Path Err error @@ -183,46 +191,26 @@ func (pa *Path) run() { defer pa.wg.Done() if strings.HasPrefix(pa.conf.Source, "rtsp://") { - state := sourcertsp.StateStopped - if !pa.conf.SourceOnDemand { - state = sourcertsp.StateRunning + state := !pa.conf.SourceOnDemand + if state { + pa.Log("starting source") } - s := sourcertsp.New( - pa.conf.Source, - pa.conf.SourceProtocolParsed, - pa.readTimeout, - pa.writeTimeout, - state, - pa) - pa.source = s - - atomic.AddInt64(pa.stats.CountSourcesRtsp, +1) - if !pa.conf.SourceOnDemand { - atomic.AddInt64(pa.stats.CountSourcesRtspRunning, +1) - } + pa.source = sourcertsp.New(pa.conf.Source, pa.conf.SourceProtocolParsed, + pa.readTimeout, pa.writeTimeout, state, pa.stats, pa) } else if strings.HasPrefix(pa.conf.Source, "rtmp://") { - state := sourcertmp.StateStopped - if !pa.conf.SourceOnDemand { - state = sourcertmp.StateRunning + state := !pa.conf.SourceOnDemand + if state { + pa.Log("starting source") } - s := sourcertmp.New( - pa.conf.Source, - state, - pa) - pa.source = s - - atomic.AddInt64(pa.stats.CountSourcesRtmp, +1) - if !pa.conf.SourceOnDemand { - atomic.AddInt64(pa.stats.CountSourcesRtmpRunning, +1) - } + pa.source = sourcertmp.New(pa.conf.Source, state, + pa.stats, pa) } if pa.conf.RunOnInit != "" { pa.Log("starting on init command") - var err error pa.onInitCmd, err = externalcmd.New(pa.conf.RunOnInit, pa.name) if err != nil { @@ -312,10 +300,10 @@ outer: pa.onInitCmd.Close() } - if source, ok := pa.source.(*sourcertsp.Source); ok { - source.Close() - - } else if source, ok := pa.source.(*sourcertmp.Source); ok { + if source, ok := pa.source.(sourceExternal); ok { + if source.IsRunning() { + pa.Log("stopping on demand source (closing)") + } source.Close() } @@ -451,26 +439,14 @@ func (pa *Path) onCheck() bool { } } - // stop on demand rtsp source if needed - if source, ok := pa.source.(*sourcertsp.Source); ok { + // stop on demand source if needed + if source, ok := pa.source.(sourceExternal); ok { if pa.conf.SourceOnDemand && - source.State() == sourcertsp.StateRunning && + source.IsRunning() && !pa.hasClients() && time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod { - pa.Log("stopping on demand rtsp source (not requested anymore)") - atomic.AddInt64(pa.stats.CountSourcesRtspRunning, -1) - source.SetState(sourcertsp.StateStopped) - } - - // stop on demand rtmp source if needed - } else if source, ok := pa.source.(*sourcertmp.Source); ok { - if pa.conf.SourceOnDemand && - source.State() == sourcertmp.StateRunning && - !pa.hasClients() && - time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod { - pa.Log("stopping on demand rtmp source (not requested anymore)") - atomic.AddInt64(pa.stats.CountSourcesRtmpRunning, -1) - source.SetState(sourcertmp.StateStopped) + pa.Log("stopping on demand source (not requested anymore)") + source.SetRunning(false) } } @@ -528,7 +504,6 @@ func (pa *Path) onClientDescribe(c *client.Client) { if pa.onDemandCmd == nil { // start if needed pa.Log("starting on demand command") pa.lastDescribeActivation = time.Now() - var err error pa.onDemandCmd, err = externalcmd.New(pa.conf.RunOnDemand, pa.name) if err != nil { @@ -549,22 +524,12 @@ func (pa *Path) onClientDescribe(c *client.Client) { // publisher was found but is not ready: put the client on hold } else if !pa.sourceReady { - // start rtsp source if needed - if source, ok := pa.source.(*sourcertsp.Source); ok { - if source.State() == sourcertsp.StateStopped { - pa.Log("starting on demand rtsp source") - pa.lastDescribeActivation = time.Now() - atomic.AddInt64(pa.stats.CountSourcesRtspRunning, +1) - source.SetState(sourcertsp.StateRunning) - } - - // start rtmp source if needed - } else if source, ok := pa.source.(*sourcertmp.Source); ok { - if source.State() == sourcertmp.StateStopped { - pa.Log("starting on demand rtmp source") + // start source if needed + if source, ok := pa.source.(sourceExternal); ok { + if !source.IsRunning() { + pa.Log("starting on demand source") pa.lastDescribeActivation = time.Now() - atomic.AddInt64(pa.stats.CountSourcesRtmpRunning, +1) - source.SetState(sourcertmp.StateRunning) + source.SetRunning(true) } } diff --git a/sourcertmp/source.go b/sourcertmp/source.go index 18996e56..f169739d 100644 --- a/sourcertmp/source.go +++ b/sourcertmp/source.go @@ -12,6 +12,8 @@ import ( "github.com/notedit/rtmp/av" "github.com/notedit/rtmp/codec/h264" "github.com/notedit/rtmp/format/rtmp" + + "github.com/aler9/rtsp-simple-server/stats" ) const ( @@ -25,24 +27,18 @@ type Parent interface { OnFrame(int, gortsplib.StreamType, []byte) } -type State int - -const ( - StateStopped State = iota - StateRunning -) - type Source struct { ur string - state State + state bool + stats *stats.Stats parent Parent - innerRunning bool + innerState bool // in innerTerminate chan struct{} innerDone chan struct{} - stateChange chan State + stateChange chan bool terminate chan struct{} // out @@ -50,18 +46,23 @@ type Source struct { } func New(ur string, - state State, + state bool, + stats *stats.Stats, parent Parent) *Source { s := &Source{ ur: ur, state: state, + stats: stats, parent: parent, - stateChange: make(chan State), + stateChange: make(chan bool), terminate: make(chan struct{}), done: make(chan struct{}), } - go s.run(s.state) + atomic.AddInt64(s.stats.CountSourcesRtmp, +1) + + go s.run() + s.SetRunning(s.state) return s } @@ -72,32 +73,46 @@ func (s *Source) Close() { func (s *Source) IsSource() {} -func (s *Source) State() State { +func (s *Source) IsRunning() bool { return s.state } -func (s *Source) SetState(state State) { +func (s *Source) SetRunning(state bool) { s.state = state s.stateChange <- s.state } -func (s *Source) run(initialState State) { +func (s *Source) run() { defer close(s.done) - s.applyState(initialState) - outer: for { select { case state := <-s.stateChange: - s.applyState(state) + if state { + if !s.innerState { + atomic.AddInt64(s.stats.CountSourcesRtmpRunning, +1) + s.innerState = true + s.innerTerminate = make(chan struct{}) + s.innerDone = make(chan struct{}) + go s.runInner() + } + } else { + if s.innerState { + atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1) + close(s.innerTerminate) + <-s.innerDone + s.innerState = false + } + } case <-s.terminate: break outer } } - if s.innerRunning { + if s.innerState { + atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1) close(s.innerTerminate) <-s.innerDone } @@ -105,25 +120,6 @@ outer: close(s.stateChange) } -func (s *Source) applyState(state State) { - if state == StateRunning { - if !s.innerRunning { - s.parent.Log("rtmp source started") - s.innerRunning = true - s.innerTerminate = make(chan struct{}) - s.innerDone = make(chan struct{}) - go s.runInner() - } - } else { - if s.innerRunning { - close(s.innerTerminate) - <-s.innerDone - s.innerRunning = false - s.parent.Log("rtmp source stopped") - } - } -} - func (s *Source) runInner() { defer close(s.innerDone) @@ -349,7 +345,6 @@ outer: } s.parent.OnSourceNotReady() - s.parent.Log("rtmp source not ready") return ret } diff --git a/sourcertsp/source.go b/sourcertsp/source.go index f91fe8ec..d29f0ecf 100644 --- a/sourcertsp/source.go +++ b/sourcertsp/source.go @@ -3,9 +3,12 @@ package sourcertsp import ( "net/url" "sync" + "sync/atomic" "time" "github.com/aler9/gortsplib" + + "github.com/aler9/rtsp-simple-server/stats" ) const ( @@ -19,27 +22,21 @@ type Parent interface { OnFrame(int, gortsplib.StreamType, []byte) } -type State int - -const ( - StateStopped State = iota - StateRunning -) - type Source struct { ur string proto gortsplib.StreamProtocol readTimeout time.Duration writeTimeout time.Duration - state State + state bool + stats *stats.Stats parent Parent - innerRunning bool + innerState bool // in innerTerminate chan struct{} innerDone chan struct{} - stateChange chan State + stateChange chan bool terminate chan struct{} // out @@ -50,7 +47,8 @@ func New(ur string, proto gortsplib.StreamProtocol, readTimeout time.Duration, writeTimeout time.Duration, - state State, + state bool, + stats *stats.Stats, parent Parent) *Source { s := &Source{ ur: ur, @@ -58,13 +56,17 @@ func New(ur string, readTimeout: readTimeout, writeTimeout: writeTimeout, state: state, + stats: stats, parent: parent, - stateChange: make(chan State), + stateChange: make(chan bool), terminate: make(chan struct{}), done: make(chan struct{}), } - go s.run(s.state) + atomic.AddInt64(s.stats.CountSourcesRtsp, +1) + + go s.run() + s.SetRunning(s.state) return s } @@ -75,32 +77,46 @@ func (s *Source) Close() { func (s *Source) IsSource() {} -func (s *Source) State() State { +func (s *Source) IsRunning() bool { return s.state } -func (s *Source) SetState(state State) { +func (s *Source) SetRunning(state bool) { s.state = state s.stateChange <- s.state } -func (s *Source) run(initialState State) { +func (s *Source) run() { defer close(s.done) - s.applyState(initialState) - outer: for { select { case state := <-s.stateChange: - s.applyState(state) + if state { + if !s.innerState { + atomic.AddInt64(s.stats.CountSourcesRtspRunning, +1) + s.innerState = true + s.innerTerminate = make(chan struct{}) + s.innerDone = make(chan struct{}) + go s.runInner() + } + } else { + if s.innerState { + atomic.AddInt64(s.stats.CountSourcesRtspRunning, -1) + close(s.innerTerminate) + <-s.innerDone + s.innerState = false + } + } case <-s.terminate: break outer } } - if s.innerRunning { + if s.innerState { + atomic.AddInt64(s.stats.CountSourcesRtspRunning, -1) close(s.innerTerminate) <-s.innerDone } @@ -108,25 +124,6 @@ outer: close(s.stateChange) } -func (s *Source) applyState(state State) { - if state == StateRunning { - if !s.innerRunning { - s.parent.Log("rtsp source started") - s.innerRunning = true - s.innerTerminate = make(chan struct{}) - s.innerDone = make(chan struct{}) - go s.runInner() - } - } else { - if s.innerRunning { - close(s.innerTerminate) - <-s.innerDone - s.innerRunning = false - s.parent.Log("rtsp source stopped") - } - } -} - func (s *Source) runInner() { defer close(s.innerDone) @@ -281,7 +278,6 @@ outer: wg.Wait() s.parent.OnSourceNotReady() - s.parent.Log("rtsp source not ready") return ret } @@ -339,7 +335,6 @@ outer: } s.parent.OnSourceNotReady() - s.parent.Log("rtsp source not ready") return ret }