Browse Source

rename source into proxy

pull/101/head
aler9 6 years ago
parent
commit
f6e846ca45
  1. 28
      main.go
  2. 42
      path.go
  3. 84
      proxy.go
  4. 2
      rtsp-simple-server.yml

28
main.go

@ -44,8 +44,8 @@ type program struct {
clientSetupPlay chan clientSetupPlayReq clientSetupPlay chan clientSetupPlayReq
clientPlay chan *client clientPlay chan *client
clientRecord chan *client clientRecord chan *client
sourceReady chan *source proxyReady chan *proxy
sourceNotReady chan *source proxyNotReady chan *proxy
terminate chan struct{} terminate chan struct{}
done chan struct{} done chan struct{}
} }
@ -88,8 +88,8 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
clientSetupPlay: make(chan clientSetupPlayReq), clientSetupPlay: make(chan clientSetupPlayReq),
clientPlay: make(chan *client), clientPlay: make(chan *client),
clientRecord: make(chan *client), clientRecord: make(chan *client),
sourceReady: make(chan *source), proxyReady: make(chan *proxy),
sourceNotReady: make(chan *source), proxyNotReady: make(chan *proxy),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -266,13 +266,13 @@ outer:
client.path.onPublisherSetReady() client.path.onPublisherSetReady()
case source := <-p.sourceReady: case proxy := <-p.proxyReady:
source.path.log("source ready") proxy.path.log("proxy ready")
source.path.onPublisherSetReady() proxy.path.onPublisherSetReady()
case source := <-p.sourceNotReady: case proxy := <-p.proxyNotReady:
source.path.log("source not ready") proxy.path.log("proxy not ready")
source.path.onPublisherSetNotReady() proxy.path.onPublisherSetNotReady()
case <-p.terminate: case <-p.terminate:
break outer break outer
@ -298,8 +298,8 @@ outer:
case <-p.clientPlay: case <-p.clientPlay:
case <-p.clientRecord: case <-p.clientRecord:
case <-p.sourceReady: case <-p.proxyReady:
case <-p.sourceNotReady: case <-p.proxyNotReady:
} }
} }
}() }()
@ -343,8 +343,8 @@ outer:
close(p.clientSetupPlay) close(p.clientSetupPlay)
close(p.clientPlay) close(p.clientPlay)
close(p.clientRecord) close(p.clientRecord)
close(p.sourceReady) close(p.proxyReady)
close(p.sourceNotReady) close(p.proxyNotReady)
close(p.done) close(p.done)
} }

42
path.go

@ -8,11 +8,11 @@ import (
const ( const (
describeTimeout = 5 * time.Second describeTimeout = 5 * time.Second
sourceStopAfterDescribeSecs = 10 * time.Second proxyStopAfterDescribeSecs = 10 * time.Second
onDemandCmdStopAfterDescribeSecs = 10 * time.Second onDemandCmdStopAfterDescribeSecs = 10 * time.Second
) )
// a publisher is either a client or a source // a publisher is either a client or a proxy
type publisher interface { type publisher interface {
isPublisher() isPublisher()
} }
@ -21,7 +21,7 @@ type path struct {
p *program p *program
name string name string
conf *pathConf conf *pathConf
source *source proxy *proxy
publisher publisher publisher publisher
publisherReady bool publisherReady bool
publisherTrackCount int publisherTrackCount int
@ -40,8 +40,8 @@ func newPath(p *program, name string, conf *pathConf) *path {
} }
if conf.Source != "record" { if conf.Source != "record" {
s := newSource(p, pa, conf) s := newProxy(p, pa, conf)
pa.source = s pa.proxy = s
pa.publisher = s pa.publisher = s
} }
@ -53,8 +53,8 @@ func (pa *path) log(format string, args ...interface{}) {
} }
func (pa *path) onInit() { func (pa *path) onInit() {
if pa.source != nil { if pa.proxy != nil {
go pa.source.run(pa.source.state) go pa.proxy.run(pa.proxy.state)
} }
if pa.conf.RunOnInit != "" { if pa.conf.RunOnInit != "" {
@ -69,9 +69,9 @@ func (pa *path) onInit() {
} }
func (pa *path) onClose(wait bool) { func (pa *path) onClose(wait bool) {
if pa.source != nil { if pa.proxy != nil {
close(pa.source.terminate) close(pa.proxy.terminate)
<-pa.source.done <-pa.proxy.done
} }
if pa.onInitCmd != nil { if pa.onInitCmd != nil {
@ -142,16 +142,16 @@ func (pa *path) onCheck() {
} }
} }
// stop on demand source if needed // stop on demand proxy if needed
if pa.source != nil && if pa.proxy != nil &&
pa.conf.SourceOnDemand && pa.conf.SourceOnDemand &&
pa.source.state == sourceStateRunning && pa.proxy.state == proxyStateRunning &&
!pa.hasClients() && !pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs { time.Since(pa.lastDescribeReq) >= proxyStopAfterDescribeSecs {
pa.log("stopping on demand source (not requested anymore)") pa.log("stopping on demand proxy (not requested anymore)")
atomic.AddInt64(&pa.p.countProxiesRunning, -1) atomic.AddInt64(&pa.p.countProxiesRunning, -1)
pa.source.state = sourceStateStopped pa.proxy.state = proxyStateStopped
pa.source.setState <- pa.source.state pa.proxy.setState <- pa.proxy.state
} }
// stop on demand command if needed // stop on demand command if needed
@ -240,12 +240,12 @@ func (pa *path) onDescribe(client *client) {
// publisher was found but is not ready: put the client on hold // publisher was found but is not ready: put the client on hold
} else if !pa.publisherReady { } else if !pa.publisherReady {
if pa.source != nil && pa.source.state == sourceStateStopped { // start if needed if pa.proxy != nil && pa.proxy.state == proxyStateStopped { // start if needed
pa.log("starting on demand source") pa.log("starting on demand proxy")
pa.lastDescribeActivation = time.Now() pa.lastDescribeActivation = time.Now()
atomic.AddInt64(&pa.p.countProxiesRunning, +1) atomic.AddInt64(&pa.p.countProxiesRunning, +1)
pa.source.state = sourceStateRunning pa.proxy.state = proxyStateRunning
pa.source.setState <- pa.source.state pa.proxy.setState <- pa.proxy.state
} }
client.path = pa client.path = pa

84
source.go → proxy.go

@ -10,39 +10,39 @@ import (
) )
const ( const (
sourceRetryInterval = 5 * time.Second proxyRetryInterval = 5 * time.Second
sourceUDPReadBufferSize = 2048 proxyUDPReadBufferSize = 2048
sourceTCPReadBufferSize = 128 * 1024 proxyTCPReadBufferSize = 128 * 1024
) )
type sourceState int type proxyState int
const ( const (
sourceStateStopped sourceState = iota proxyStateStopped proxyState = iota
sourceStateRunning proxyStateRunning
) )
type source struct { type proxy struct {
p *program p *program
path *path path *path
pathConf *pathConf pathConf *pathConf
state sourceState state proxyState
tracks []*gortsplib.Track tracks []*gortsplib.Track
innerRunning bool innerRunning bool
innerTerminate chan struct{} innerTerminate chan struct{}
innerDone chan struct{} innerDone chan struct{}
setState chan sourceState setState chan proxyState
terminate chan struct{} terminate chan struct{}
done chan struct{} done chan struct{}
} }
func newSource(p *program, path *path, pathConf *pathConf) *source { func newProxy(p *program, path *path, pathConf *pathConf) *proxy {
s := &source{ s := &proxy{
p: p, p: p,
path: path, path: path,
pathConf: pathConf, pathConf: pathConf,
setState: make(chan sourceState), setState: make(chan proxyState),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -50,18 +50,18 @@ func newSource(p *program, path *path, pathConf *pathConf) *source {
atomic.AddInt64(&p.countProxies, +1) atomic.AddInt64(&p.countProxies, +1)
if pathConf.SourceOnDemand { if pathConf.SourceOnDemand {
s.state = sourceStateStopped s.state = proxyStateStopped
} else { } else {
s.state = sourceStateRunning s.state = proxyStateRunning
atomic.AddInt64(&p.countProxiesRunning, +1) atomic.AddInt64(&p.countProxiesRunning, +1)
} }
return s return s
} }
func (s *source) isPublisher() {} func (s *proxy) isPublisher() {}
func (s *source) run(initialState sourceState) { func (s *proxy) run(initialState proxyState) {
s.applyState(initialState) s.applyState(initialState)
outer: outer:
@ -84,10 +84,10 @@ outer:
close(s.done) close(s.done)
} }
func (s *source) applyState(state sourceState) { func (s *proxy) applyState(state proxyState) {
if state == sourceStateRunning { if state == proxyStateRunning {
if !s.innerRunning { if !s.innerRunning {
s.path.log("source started") s.path.log("proxy started")
s.innerRunning = true s.innerRunning = true
s.innerTerminate = make(chan struct{}) s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{}) s.innerDone = make(chan struct{})
@ -98,12 +98,12 @@ func (s *source) applyState(state sourceState) {
close(s.innerTerminate) close(s.innerTerminate)
<-s.innerDone <-s.innerDone
s.innerRunning = false s.innerRunning = false
s.path.log("source stopped") s.path.log("proxy stopped")
} }
} }
} }
func (s *source) runInner() { func (s *proxy) runInner() {
defer close(s.innerDone) defer close(s.innerDone)
for { for {
@ -113,7 +113,7 @@ func (s *source) runInner() {
return false return false
} }
t := time.NewTimer(sourceRetryInterval) t := time.NewTimer(proxyRetryInterval)
defer t.Stop() defer t.Stop()
select { select {
@ -130,8 +130,8 @@ func (s *source) runInner() {
} }
} }
func (s *source) runInnerInner() bool { func (s *proxy) runInnerInner() bool {
s.path.log("source connecting") s.path.log("proxy connecting")
var conn *gortsplib.ConnClient var conn *gortsplib.ConnClient
var err error var err error
@ -152,21 +152,21 @@ func (s *source) runInnerInner() bool {
} }
if err != nil { if err != nil {
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
return true return true
} }
_, err = conn.Options(s.pathConf.sourceUrl) _, err = conn.Options(s.pathConf.sourceUrl)
if err != nil { if err != nil {
conn.Close() conn.Close()
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
return true return true
} }
tracks, _, err := conn.Describe(s.pathConf.sourceUrl) tracks, _, err := conn.Describe(s.pathConf.sourceUrl)
if err != nil { if err != nil {
conn.Close() conn.Close()
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
return true return true
} }
@ -184,7 +184,7 @@ func (s *source) runInnerInner() bool {
} }
} }
func (s *source) runUDP(conn *gortsplib.ConnClient) bool { func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool {
var rtpReads []gortsplib.UDPReadFunc var rtpReads []gortsplib.UDPReadFunc
var rtcpReads []gortsplib.UDPReadFunc var rtcpReads []gortsplib.UDPReadFunc
@ -202,7 +202,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
} }
conn.Close() conn.Close()
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
return true return true
} }
@ -215,11 +215,11 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
_, err := conn.Play(s.pathConf.sourceUrl) _, err := conn.Play(s.pathConf.sourceUrl)
if err != nil { if err != nil {
conn.Close() conn.Close()
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
return true return true
} }
s.p.sourceReady <- s s.p.proxyReady <- s
var wg sync.WaitGroup var wg sync.WaitGroup
@ -229,7 +229,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
go func(trackId int, rtpRead gortsplib.UDPReadFunc) { go func(trackId int, rtpRead gortsplib.UDPReadFunc) {
defer wg.Done() defer wg.Done()
multiBuf := newMultiBuffer(2, sourceUDPReadBufferSize) multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize)
for { for {
buf := multiBuf.next() buf := multiBuf.next()
@ -250,7 +250,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
go func(trackId int, rtcpRead gortsplib.UDPReadFunc) { go func(trackId int, rtcpRead gortsplib.UDPReadFunc) {
defer wg.Done() defer wg.Done()
multiBuf := newMultiBuffer(2, sourceUDPReadBufferSize) multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize)
for { for {
buf := multiBuf.next() buf := multiBuf.next()
@ -283,7 +283,7 @@ outer:
case err := <-tcpConnDone: case err := <-tcpConnDone:
conn.Close() conn.Close()
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
ret = true ret = true
break outer break outer
} }
@ -291,17 +291,17 @@ outer:
wg.Wait() wg.Wait()
s.p.sourceNotReady <- s s.p.proxyNotReady <- s
return ret return ret
} }
func (s *source) runTCP(conn *gortsplib.ConnClient) bool { func (s *proxy) runTCP(conn *gortsplib.ConnClient) bool {
for _, track := range s.tracks { for _, track := range s.tracks {
_, err := conn.SetupTCP(s.pathConf.sourceUrl, track) _, err := conn.SetupTCP(s.pathConf.sourceUrl, track)
if err != nil { if err != nil {
conn.Close() conn.Close()
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
return true return true
} }
} }
@ -309,14 +309,14 @@ func (s *source) runTCP(conn *gortsplib.ConnClient) bool {
_, err := conn.Play(s.pathConf.sourceUrl) _, err := conn.Play(s.pathConf.sourceUrl)
if err != nil { if err != nil {
conn.Close() conn.Close()
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
return true return true
} }
s.p.sourceReady <- s s.p.proxyReady <- s
frame := &gortsplib.InterleavedFrame{} frame := &gortsplib.InterleavedFrame{}
multiBuf := newMultiBuffer(2, sourceTCPReadBufferSize) multiBuf := newMultiBuffer(2, proxyTCPReadBufferSize)
tcpConnDone := make(chan error) tcpConnDone := make(chan error)
go func() { go func() {
@ -347,13 +347,13 @@ outer:
case err := <-tcpConnDone: case err := <-tcpConnDone:
conn.Close() conn.Close()
s.path.log("source ERR: %s", err) s.path.log("proxy ERR: %s", err)
ret = true ret = true
break outer break outer
} }
} }
s.p.sourceNotReady <- s s.p.proxyNotReady <- s
return ret return ret
} }

2
rtsp-simple-server.yml

@ -39,7 +39,7 @@ paths:
all: all:
# source of the stream - this can be: # source of the stream - this can be:
# * record -> the stream is provided by a client through the RECORD command (like ffmpeg) # * record -> the stream is provided by a client through the RECORD command (like ffmpeg)
# * rtsp://original-url -> the stream is pulled from another RTSP server # * rtsp://original-url -> the stream is pulled from another RTSP server (proxy mode)
source: record source: record
# if the source is an RTSP url, this is the protocol that will be used to pull the stream # if the source is an RTSP url, this is the protocol that will be used to pull the stream
sourceProtocol: udp sourceProtocol: udp

Loading…
Cancel
Save