From c8710fee19842ab464beb043e394009689b002df Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 31 Aug 2020 15:46:03 +0200 Subject: [PATCH] cleanup source --- source.go | 109 +++++++++++++++++++++++++++--------------------------- 1 file changed, 54 insertions(+), 55 deletions(-) diff --git a/source.go b/source.go index 2116e877..53d2cbb7 100644 --- a/source.go +++ b/source.go @@ -24,25 +24,28 @@ const ( ) type source struct { - p *program - path *path - confp *confPath - state sourceState - tracks []*gortsplib.Track - - setState chan sourceState - terminate chan struct{} - done chan struct{} + p *program + path *path + confp *confPath + state sourceState + tracks []*gortsplib.Track + innerRunning bool + + innerTerminate chan struct{} + innerDone chan struct{} + setState chan sourceState + terminate chan struct{} + done chan struct{} } func newSource(p *program, path *path, confp *confPath) *source { s := &source{ - p: p, - path: path, - confp: confp, - setState: make(chan sourceState), - terminate: make(chan struct{}), - done: make(chan struct{}), + p: p, + path: path, + confp: confp, + setState: make(chan sourceState), + terminate: make(chan struct{}), + done: make(chan struct{}), } if confp.SourceOnDemand { @@ -61,57 +64,53 @@ func (s *source) log(format string, args ...interface{}) { func (s *source) isPublisher() {} func (s *source) run() { - running := false - var doTerminate chan struct{} - var doDone chan struct{} - - applyState := func(state sourceState) { - if state == sourceStateRunning { - if !running { - s.log("started") - running = true - doTerminate = make(chan struct{}) - doDone = make(chan struct{}) - go s.do(doTerminate, doDone) - } - } else { - if running { - close(doTerminate) - <-doDone - running = false - s.log("stopped") - } - } - } - - applyState(s.state) + s.applyState(s.state) outer: for { select { case state := <-s.setState: - applyState(state) + s.applyState(state) case <-s.terminate: break outer } } - if running { - close(doTerminate) - <-doDone + if s.innerRunning { + close(s.innerTerminate) + <-s.innerDone } close(s.setState) close(s.done) } -func (s *source) do(terminate chan struct{}, done chan struct{}) { - defer close(done) +func (s *source) applyState(state sourceState) { + if state == sourceStateRunning { + if !s.innerRunning { + s.log("started") + s.innerRunning = true + s.innerTerminate = make(chan struct{}) + s.innerDone = make(chan struct{}) + go s.runInner() + } + } else { + if s.innerRunning { + close(s.innerTerminate) + <-s.innerDone + s.innerRunning = false + s.log("stopped") + } + } +} + +func (s *source) runInner() { + defer close(s.innerDone) for { ok := func() bool { - ok := s.doInner(terminate) + ok := s.runInnerInner() if !ok { return false } @@ -120,7 +119,7 @@ func (s *source) do(terminate chan struct{}, done chan struct{}) { defer t.Stop() select { - case <-terminate: + case <-s.innerTerminate: return false case <-t.C: } @@ -133,7 +132,7 @@ func (s *source) do(terminate chan struct{}, done chan struct{}) { } } -func (s *source) doInner(terminate chan struct{}) bool { +func (s *source) runInnerInner() bool { s.log("connecting") var conn *gortsplib.ConnClient @@ -149,7 +148,7 @@ func (s *source) doInner(terminate chan struct{}) bool { }() select { - case <-terminate: + case <-s.innerTerminate: return false case <-dialDone: } @@ -181,13 +180,13 @@ func (s *source) doInner(terminate chan struct{}) bool { s.path.publisherSdpParsed = serverSdpParsed if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUdp { - return s.runUdp(terminate, conn) + return s.runUdp(conn) } else { - return s.runTcp(terminate, conn) + return s.runTcp(conn) } } -func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { +func (s *source) runUdp(conn *gortsplib.ConnClient) bool { var rtpReads []gortsplib.UdpReadFunc var rtcpReads []gortsplib.UdpReadFunc @@ -281,7 +280,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo outer: for { select { - case <-terminate: + case <-s.innerTerminate: conn.Close() <-tcpConnDone ret = false @@ -302,7 +301,7 @@ outer: return ret } -func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { +func (s *source) runTcp(conn *gortsplib.ConnClient) bool { for _, track := range s.tracks { _, err := conn.SetupTcp(s.confp.sourceUrl, track) if err != nil { @@ -345,7 +344,7 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo outer: for { select { - case <-terminate: + case <-s.innerTerminate: conn.Close() <-tcpConnDone ret = false