Browse Source

manage external sources with a common interface

pull/169/head
aler9 5 years ago
parent
commit
ae1b7f4ded
  1. 101
      path/path.go
  2. 75
      sourcertmp/source.go
  3. 77
      sourcertsp/source.go

101
path/path.go

@ -31,11 +31,19 @@ type Parent interface {
OnPathClientClose(*client.Client) OnPathClientClose(*client.Client)
} }
// a source can be a client, a sourcertsp.Source or a sourcertmp.Source // a source is either a client.Client, a sourcertsp.Source or a sourcertmp.Source
type source interface { type source interface {
IsSource() IsSource()
} }
// a sourceExternal is either a sourcertsp.Source or a sourcertmp.Source
type sourceExternal interface {
IsSource()
Close()
IsRunning() bool
SetRunning(bool)
}
type ClientDescribeRes struct { type ClientDescribeRes struct {
Path client.Path Path client.Path
Err error Err error
@ -183,46 +191,26 @@ func (pa *Path) run() {
defer pa.wg.Done() defer pa.wg.Done()
if strings.HasPrefix(pa.conf.Source, "rtsp://") { if strings.HasPrefix(pa.conf.Source, "rtsp://") {
state := sourcertsp.StateStopped state := !pa.conf.SourceOnDemand
if !pa.conf.SourceOnDemand { if state {
state = sourcertsp.StateRunning pa.Log("starting source")
} }
s := sourcertsp.New( pa.source = sourcertsp.New(pa.conf.Source, pa.conf.SourceProtocolParsed,
pa.conf.Source, pa.readTimeout, pa.writeTimeout, state, pa.stats, pa)
pa.conf.SourceProtocolParsed,
pa.readTimeout,
pa.writeTimeout,
state,
pa)
pa.source = s
atomic.AddInt64(pa.stats.CountSourcesRtsp, +1)
if !pa.conf.SourceOnDemand {
atomic.AddInt64(pa.stats.CountSourcesRtspRunning, +1)
}
} else if strings.HasPrefix(pa.conf.Source, "rtmp://") { } else if strings.HasPrefix(pa.conf.Source, "rtmp://") {
state := sourcertmp.StateStopped state := !pa.conf.SourceOnDemand
if !pa.conf.SourceOnDemand { if state {
state = sourcertmp.StateRunning pa.Log("starting source")
} }
s := sourcertmp.New( pa.source = sourcertmp.New(pa.conf.Source, state,
pa.conf.Source, pa.stats, pa)
state,
pa)
pa.source = s
atomic.AddInt64(pa.stats.CountSourcesRtmp, +1)
if !pa.conf.SourceOnDemand {
atomic.AddInt64(pa.stats.CountSourcesRtmpRunning, +1)
}
} }
if pa.conf.RunOnInit != "" { if pa.conf.RunOnInit != "" {
pa.Log("starting on init command") pa.Log("starting on init command")
var err error var err error
pa.onInitCmd, err = externalcmd.New(pa.conf.RunOnInit, pa.name) pa.onInitCmd, err = externalcmd.New(pa.conf.RunOnInit, pa.name)
if err != nil { if err != nil {
@ -312,10 +300,10 @@ outer:
pa.onInitCmd.Close() pa.onInitCmd.Close()
} }
if source, ok := pa.source.(*sourcertsp.Source); ok { if source, ok := pa.source.(sourceExternal); ok {
source.Close() if source.IsRunning() {
pa.Log("stopping on demand source (closing)")
} else if source, ok := pa.source.(*sourcertmp.Source); ok { }
source.Close() source.Close()
} }
@ -451,26 +439,14 @@ func (pa *Path) onCheck() bool {
} }
} }
// stop on demand rtsp source if needed // stop on demand source if needed
if source, ok := pa.source.(*sourcertsp.Source); ok { if source, ok := pa.source.(sourceExternal); ok {
if pa.conf.SourceOnDemand && if pa.conf.SourceOnDemand &&
source.State() == sourcertsp.StateRunning && source.IsRunning() &&
!pa.hasClients() && !pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod { time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod {
pa.Log("stopping on demand rtsp source (not requested anymore)") pa.Log("stopping on demand source (not requested anymore)")
atomic.AddInt64(pa.stats.CountSourcesRtspRunning, -1) source.SetRunning(false)
source.SetState(sourcertsp.StateStopped)
}
// stop on demand rtmp source if needed
} else if source, ok := pa.source.(*sourcertmp.Source); ok {
if pa.conf.SourceOnDemand &&
source.State() == sourcertmp.StateRunning &&
!pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod {
pa.Log("stopping on demand rtmp source (not requested anymore)")
atomic.AddInt64(pa.stats.CountSourcesRtmpRunning, -1)
source.SetState(sourcertmp.StateStopped)
} }
} }
@ -528,7 +504,6 @@ func (pa *Path) onClientDescribe(c *client.Client) {
if pa.onDemandCmd == nil { // start if needed if pa.onDemandCmd == nil { // start if needed
pa.Log("starting on demand command") pa.Log("starting on demand command")
pa.lastDescribeActivation = time.Now() pa.lastDescribeActivation = time.Now()
var err error var err error
pa.onDemandCmd, err = externalcmd.New(pa.conf.RunOnDemand, pa.name) pa.onDemandCmd, err = externalcmd.New(pa.conf.RunOnDemand, pa.name)
if err != nil { if err != nil {
@ -549,22 +524,12 @@ func (pa *Path) onClientDescribe(c *client.Client) {
// publisher was found but is not ready: put the client on hold // publisher was found but is not ready: put the client on hold
} else if !pa.sourceReady { } else if !pa.sourceReady {
// start rtsp source if needed // start source if needed
if source, ok := pa.source.(*sourcertsp.Source); ok { if source, ok := pa.source.(sourceExternal); ok {
if source.State() == sourcertsp.StateStopped { if !source.IsRunning() {
pa.Log("starting on demand rtsp source") pa.Log("starting on demand source")
pa.lastDescribeActivation = time.Now()
atomic.AddInt64(pa.stats.CountSourcesRtspRunning, +1)
source.SetState(sourcertsp.StateRunning)
}
// start rtmp source if needed
} else if source, ok := pa.source.(*sourcertmp.Source); ok {
if source.State() == sourcertmp.StateStopped {
pa.Log("starting on demand rtmp source")
pa.lastDescribeActivation = time.Now() pa.lastDescribeActivation = time.Now()
atomic.AddInt64(pa.stats.CountSourcesRtmpRunning, +1) source.SetRunning(true)
source.SetState(sourcertmp.StateRunning)
} }
} }

75
sourcertmp/source.go

@ -12,6 +12,8 @@ import (
"github.com/notedit/rtmp/av" "github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/codec/h264" "github.com/notedit/rtmp/codec/h264"
"github.com/notedit/rtmp/format/rtmp" "github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/stats"
) )
const ( const (
@ -25,24 +27,18 @@ type Parent interface {
OnFrame(int, gortsplib.StreamType, []byte) OnFrame(int, gortsplib.StreamType, []byte)
} }
type State int
const (
StateStopped State = iota
StateRunning
)
type Source struct { type Source struct {
ur string ur string
state State state bool
stats *stats.Stats
parent Parent parent Parent
innerRunning bool innerState bool
// in // in
innerTerminate chan struct{} innerTerminate chan struct{}
innerDone chan struct{} innerDone chan struct{}
stateChange chan State stateChange chan bool
terminate chan struct{} terminate chan struct{}
// out // out
@ -50,18 +46,23 @@ type Source struct {
} }
func New(ur string, func New(ur string,
state State, state bool,
stats *stats.Stats,
parent Parent) *Source { parent Parent) *Source {
s := &Source{ s := &Source{
ur: ur, ur: ur,
state: state, state: state,
stats: stats,
parent: parent, parent: parent,
stateChange: make(chan State), stateChange: make(chan bool),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
go s.run(s.state) atomic.AddInt64(s.stats.CountSourcesRtmp, +1)
go s.run()
s.SetRunning(s.state)
return s return s
} }
@ -72,32 +73,46 @@ func (s *Source) Close() {
func (s *Source) IsSource() {} func (s *Source) IsSource() {}
func (s *Source) State() State { func (s *Source) IsRunning() bool {
return s.state return s.state
} }
func (s *Source) SetState(state State) { func (s *Source) SetRunning(state bool) {
s.state = state s.state = state
s.stateChange <- s.state s.stateChange <- s.state
} }
func (s *Source) run(initialState State) { func (s *Source) run() {
defer close(s.done) defer close(s.done)
s.applyState(initialState)
outer: outer:
for { for {
select { select {
case state := <-s.stateChange: case state := <-s.stateChange:
s.applyState(state) if state {
if !s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtmpRunning, +1)
s.innerState = true
s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{})
go s.runInner()
}
} else {
if s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1)
close(s.innerTerminate)
<-s.innerDone
s.innerState = false
}
}
case <-s.terminate: case <-s.terminate:
break outer break outer
} }
} }
if s.innerRunning { if s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1)
close(s.innerTerminate) close(s.innerTerminate)
<-s.innerDone <-s.innerDone
} }
@ -105,25 +120,6 @@ outer:
close(s.stateChange) close(s.stateChange)
} }
func (s *Source) applyState(state State) {
if state == StateRunning {
if !s.innerRunning {
s.parent.Log("rtmp source started")
s.innerRunning = true
s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{})
go s.runInner()
}
} else {
if s.innerRunning {
close(s.innerTerminate)
<-s.innerDone
s.innerRunning = false
s.parent.Log("rtmp source stopped")
}
}
}
func (s *Source) runInner() { func (s *Source) runInner() {
defer close(s.innerDone) defer close(s.innerDone)
@ -349,7 +345,6 @@ outer:
} }
s.parent.OnSourceNotReady() s.parent.OnSourceNotReady()
s.parent.Log("rtmp source not ready")
return ret return ret
} }

77
sourcertsp/source.go

@ -3,9 +3,12 @@ package sourcertsp
import ( import (
"net/url" "net/url"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/rtsp-simple-server/stats"
) )
const ( const (
@ -19,27 +22,21 @@ type Parent interface {
OnFrame(int, gortsplib.StreamType, []byte) OnFrame(int, gortsplib.StreamType, []byte)
} }
type State int
const (
StateStopped State = iota
StateRunning
)
type Source struct { type Source struct {
ur string ur string
proto gortsplib.StreamProtocol proto gortsplib.StreamProtocol
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
state State state bool
stats *stats.Stats
parent Parent parent Parent
innerRunning bool innerState bool
// in // in
innerTerminate chan struct{} innerTerminate chan struct{}
innerDone chan struct{} innerDone chan struct{}
stateChange chan State stateChange chan bool
terminate chan struct{} terminate chan struct{}
// out // out
@ -50,7 +47,8 @@ func New(ur string,
proto gortsplib.StreamProtocol, proto gortsplib.StreamProtocol,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration, writeTimeout time.Duration,
state State, state bool,
stats *stats.Stats,
parent Parent) *Source { parent Parent) *Source {
s := &Source{ s := &Source{
ur: ur, ur: ur,
@ -58,13 +56,17 @@ func New(ur string,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
state: state, state: state,
stats: stats,
parent: parent, parent: parent,
stateChange: make(chan State), stateChange: make(chan bool),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
go s.run(s.state) atomic.AddInt64(s.stats.CountSourcesRtsp, +1)
go s.run()
s.SetRunning(s.state)
return s return s
} }
@ -75,32 +77,46 @@ func (s *Source) Close() {
func (s *Source) IsSource() {} func (s *Source) IsSource() {}
func (s *Source) State() State { func (s *Source) IsRunning() bool {
return s.state return s.state
} }
func (s *Source) SetState(state State) { func (s *Source) SetRunning(state bool) {
s.state = state s.state = state
s.stateChange <- s.state s.stateChange <- s.state
} }
func (s *Source) run(initialState State) { func (s *Source) run() {
defer close(s.done) defer close(s.done)
s.applyState(initialState)
outer: outer:
for { for {
select { select {
case state := <-s.stateChange: case state := <-s.stateChange:
s.applyState(state) if state {
if !s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtspRunning, +1)
s.innerState = true
s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{})
go s.runInner()
}
} else {
if s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtspRunning, -1)
close(s.innerTerminate)
<-s.innerDone
s.innerState = false
}
}
case <-s.terminate: case <-s.terminate:
break outer break outer
} }
} }
if s.innerRunning { if s.innerState {
atomic.AddInt64(s.stats.CountSourcesRtspRunning, -1)
close(s.innerTerminate) close(s.innerTerminate)
<-s.innerDone <-s.innerDone
} }
@ -108,25 +124,6 @@ outer:
close(s.stateChange) close(s.stateChange)
} }
func (s *Source) applyState(state State) {
if state == StateRunning {
if !s.innerRunning {
s.parent.Log("rtsp source started")
s.innerRunning = true
s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{})
go s.runInner()
}
} else {
if s.innerRunning {
close(s.innerTerminate)
<-s.innerDone
s.innerRunning = false
s.parent.Log("rtsp source stopped")
}
}
}
func (s *Source) runInner() { func (s *Source) runInner() {
defer close(s.innerDone) defer close(s.innerDone)
@ -281,7 +278,6 @@ outer:
wg.Wait() wg.Wait()
s.parent.OnSourceNotReady() s.parent.OnSourceNotReady()
s.parent.Log("rtsp source not ready")
return ret return ret
} }
@ -339,7 +335,6 @@ outer:
} }
s.parent.OnSourceNotReady() s.parent.OnSourceNotReady()
s.parent.Log("rtsp source not ready")
return ret return ret
} }

Loading…
Cancel
Save