Browse Source

cleanup source

pull/80/head
aler9 5 years ago
parent
commit
c8710fee19
  1. 109
      source.go

109
source.go

@ -24,25 +24,28 @@ const (
) )
type source struct { type source struct {
p *program p *program
path *path path *path
confp *confPath confp *confPath
state sourceState state sourceState
tracks []*gortsplib.Track tracks []*gortsplib.Track
innerRunning bool
setState chan sourceState
terminate chan struct{} innerTerminate chan struct{}
done chan struct{} innerDone chan struct{}
setState chan sourceState
terminate chan struct{}
done chan struct{}
} }
func newSource(p *program, path *path, confp *confPath) *source { func newSource(p *program, path *path, confp *confPath) *source {
s := &source{ s := &source{
p: p, p: p,
path: path, path: path,
confp: confp, confp: confp,
setState: make(chan sourceState), setState: make(chan sourceState),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
if confp.SourceOnDemand { if confp.SourceOnDemand {
@ -61,57 +64,53 @@ func (s *source) log(format string, args ...interface{}) {
func (s *source) isPublisher() {} func (s *source) isPublisher() {}
func (s *source) run() { func (s *source) run() {
running := false s.applyState(s.state)
var doTerminate chan struct{}
var doDone chan struct{}
applyState := func(state sourceState) {
if state == sourceStateRunning {
if !running {
s.log("started")
running = true
doTerminate = make(chan struct{})
doDone = make(chan struct{})
go s.do(doTerminate, doDone)
}
} else {
if running {
close(doTerminate)
<-doDone
running = false
s.log("stopped")
}
}
}
applyState(s.state)
outer: outer:
for { for {
select { select {
case state := <-s.setState: case state := <-s.setState:
applyState(state) s.applyState(state)
case <-s.terminate: case <-s.terminate:
break outer break outer
} }
} }
if running { if s.innerRunning {
close(doTerminate) close(s.innerTerminate)
<-doDone <-s.innerDone
} }
close(s.setState) close(s.setState)
close(s.done) close(s.done)
} }
func (s *source) do(terminate chan struct{}, done chan struct{}) { func (s *source) applyState(state sourceState) {
defer close(done) if state == sourceStateRunning {
if !s.innerRunning {
s.log("started")
s.innerRunning = true
s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{})
go s.runInner()
}
} else {
if s.innerRunning {
close(s.innerTerminate)
<-s.innerDone
s.innerRunning = false
s.log("stopped")
}
}
}
func (s *source) runInner() {
defer close(s.innerDone)
for { for {
ok := func() bool { ok := func() bool {
ok := s.doInner(terminate) ok := s.runInnerInner()
if !ok { if !ok {
return false return false
} }
@ -120,7 +119,7 @@ func (s *source) do(terminate chan struct{}, done chan struct{}) {
defer t.Stop() defer t.Stop()
select { select {
case <-terminate: case <-s.innerTerminate:
return false return false
case <-t.C: case <-t.C:
} }
@ -133,7 +132,7 @@ func (s *source) do(terminate chan struct{}, done chan struct{}) {
} }
} }
func (s *source) doInner(terminate chan struct{}) bool { func (s *source) runInnerInner() bool {
s.log("connecting") s.log("connecting")
var conn *gortsplib.ConnClient var conn *gortsplib.ConnClient
@ -149,7 +148,7 @@ func (s *source) doInner(terminate chan struct{}) bool {
}() }()
select { select {
case <-terminate: case <-s.innerTerminate:
return false return false
case <-dialDone: case <-dialDone:
} }
@ -181,13 +180,13 @@ func (s *source) doInner(terminate chan struct{}) bool {
s.path.publisherSdpParsed = serverSdpParsed s.path.publisherSdpParsed = serverSdpParsed
if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUdp { if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUdp {
return s.runUdp(terminate, conn) return s.runUdp(conn)
} else { } else {
return s.runTcp(terminate, conn) return s.runTcp(conn)
} }
} }
func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { func (s *source) runUdp(conn *gortsplib.ConnClient) bool {
var rtpReads []gortsplib.UdpReadFunc var rtpReads []gortsplib.UdpReadFunc
var rtcpReads []gortsplib.UdpReadFunc var rtcpReads []gortsplib.UdpReadFunc
@ -281,7 +280,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
outer: outer:
for { for {
select { select {
case <-terminate: case <-s.innerTerminate:
conn.Close() conn.Close()
<-tcpConnDone <-tcpConnDone
ret = false ret = false
@ -302,7 +301,7 @@ outer:
return ret return ret
} }
func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { func (s *source) runTcp(conn *gortsplib.ConnClient) bool {
for _, track := range s.tracks { for _, track := range s.tracks {
_, err := conn.SetupTcp(s.confp.sourceUrl, track) _, err := conn.SetupTcp(s.confp.sourceUrl, track)
if err != nil { if err != nil {
@ -345,7 +344,7 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
outer: outer:
for { for {
select { select {
case <-terminate: case <-s.innerTerminate:
conn.Close() conn.Close()
<-tcpConnDone <-tcpConnDone
ret = false ret = false

Loading…
Cancel
Save