Browse Source

make sure all clients are closed before closing server

pull/169/head
aler9 6 years ago
parent
commit
bcab6aabcb
  1. 13
      client.go
  2. 15
      main.go
  3. 8
      path.go
  4. 4
      servertcp.go
  5. 4
      serverudp.go
  6. 3
      sourcertmp.go
  7. 3
      sourcertsp.go

13
client.go

@ -115,10 +115,9 @@ type client struct {
describe chan describeRes describe chan describeRes
tcpFrame chan *base.InterleavedFrame tcpFrame chan *base.InterleavedFrame
terminate chan struct{} terminate chan struct{}
done chan struct{}
} }
func newClient(p *program, nconn net.Conn) *client { func newClient(p *program, nconn net.Conn) {
c := &client{ c := &client{
p: p, p: p,
conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{ conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{
@ -132,11 +131,14 @@ func newClient(p *program, nconn net.Conn) *client {
describe: make(chan describeRes), describe: make(chan describeRes),
tcpFrame: make(chan *base.InterleavedFrame), tcpFrame: make(chan *base.InterleavedFrame),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}),
} }
p.clients[c] = struct{}{}
atomic.AddInt64(p.countClients, 1)
c.log("connected")
p.clientsWg.Add(1)
go c.run() go c.run()
return c
} }
func (c *client) close() { func (c *client) close() {
@ -194,6 +196,8 @@ var errRunPlay = errors.New("play")
var errRunRecord = errors.New("record") var errRunRecord = errors.New("record")
func (c *client) run() { func (c *client) run() {
defer c.p.clientsWg.Done()
var onConnectCmd *externalcmd.ExternalCmd var onConnectCmd *externalcmd.ExternalCmd
if c.p.conf.RunOnConnect != "" { if c.p.conf.RunOnConnect != "" {
var err error var err error
@ -215,7 +219,6 @@ func (c *client) run() {
close(c.describe) close(c.describe)
close(c.tcpFrame) close(c.tcpFrame)
close(c.done)
} }
func (c *client) writeResError(cseq base.HeaderValue, code base.StatusCode, err error) { func (c *client) writeResError(cseq base.HeaderValue, code base.StatusCode, err error) {

15
main.go

@ -5,6 +5,7 @@ import (
"log" "log"
"net" "net"
"os" "os"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -31,6 +32,7 @@ type program struct {
serverUdpRtcp *serverUDP serverUdpRtcp *serverUDP
serverTcp *serverTCP serverTcp *serverTCP
clients map[*client]struct{} clients map[*client]struct{}
clientsWg sync.WaitGroup
udpPublishersMap *udpPublishersMap udpPublishersMap *udpPublishersMap
readersMap *readersMap readersMap *readersMap
// use pointers to avoid a crash on 32bit platforms // use pointers to avoid a crash on 32bit platforms
@ -186,6 +188,8 @@ func (p *program) log(format string, args ...interface{}) {
} }
func (p *program) run() { func (p *program) run() {
defer close(p.done)
if p.metrics != nil { if p.metrics != nil {
go p.metrics.run() go p.metrics.run()
} }
@ -220,10 +224,7 @@ outer:
} }
case conn := <-p.clientNew: case conn := <-p.clientNew:
c := newClient(p, conn) newClient(p, conn)
p.clients[c] = struct{}{}
atomic.AddInt64(p.countClients, 1)
c.log("connected")
case client := <-p.clientClose: case client := <-p.clientClose:
if _, ok := p.clients[client]; !ok { if _, ok := p.clients[client]; !ok {
@ -352,7 +353,7 @@ outer:
p.readersMap.clear() p.readersMap.clear()
for _, p := range p.paths { for _, p := range p.paths {
p.onClose(true) p.onClose()
} }
p.serverTcp.close() p.serverTcp.close()
@ -367,9 +368,10 @@ outer:
for c := range p.clients { for c := range p.clients {
c.close() c.close()
<-c.done
} }
p.clientsWg.Wait()
if p.metrics != nil { if p.metrics != nil {
p.metrics.close() p.metrics.close()
} }
@ -389,7 +391,6 @@ outer:
close(p.clientRecord) close(p.clientRecord)
close(p.sourceRtspReady) close(p.sourceRtspReady)
close(p.sourceRtspNotReady) close(p.sourceRtspNotReady)
close(p.done)
} }
func (p *program) close() { func (p *program) close() {

8
path.go

@ -77,7 +77,7 @@ func (pa *path) onInit() {
} }
} }
func (pa *path) onClose(wait bool) { func (pa *path) onClose() {
if source, ok := pa.source.(*sourceRtsp); ok { if source, ok := pa.source.(*sourceRtsp); ok {
close(source.terminate) close(source.terminate)
<-source.done <-source.done
@ -105,10 +105,6 @@ func (pa *path) onClose(wait bool) {
c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
} else { } else {
c.close() c.close()
if wait {
<-c.done
}
} }
} }
} }
@ -193,7 +189,7 @@ func (pa *path) onCheck() {
if pa.conf.Regexp != nil && if pa.conf.Regexp != nil &&
pa.source == nil && pa.source == nil &&
!pa.hasClients() { !pa.hasClients() {
pa.onClose(false) pa.onClose()
delete(pa.p.paths, pa.name) delete(pa.p.paths, pa.name)
} }
} }

4
servertcp.go

@ -34,6 +34,8 @@ func (l *serverTCP) log(format string, args ...interface{}) {
} }
func (l *serverTCP) run() { func (l *serverTCP) run() {
defer close(l.done)
for { for {
conn, err := l.listener.AcceptTCP() conn, err := l.listener.AcceptTCP()
if err != nil { if err != nil {
@ -42,8 +44,6 @@ func (l *serverTCP) run() {
l.p.clientNew <- conn l.p.clientNew <- conn
} }
close(l.done)
} }
func (l *serverTCP) close() { func (l *serverTCP) close() {

4
serverudp.go

@ -60,6 +60,8 @@ func (l *serverUDP) log(format string, args ...interface{}) {
} }
func (l *serverUDP) run() { func (l *serverUDP) run() {
defer close(l.done)
writeDone := make(chan struct{}) writeDone := make(chan struct{})
go func() { go func() {
defer close(writeDone) defer close(writeDone)
@ -99,8 +101,6 @@ func (l *serverUDP) run() {
close(l.writec) close(l.writec)
<-writeDone <-writeDone
close(l.done)
} }
func (l *serverUDP) close() { func (l *serverUDP) close() {

3
sourcertmp.go

@ -62,6 +62,8 @@ func newSourceRtmp(p *program, path *path) *sourceRtmp {
func (s *sourceRtmp) isSource() {} func (s *sourceRtmp) isSource() {}
func (s *sourceRtmp) run(initialState sourceRtmpState) { func (s *sourceRtmp) run(initialState sourceRtmpState) {
defer close(s.done)
s.applyState(initialState) s.applyState(initialState)
outer: outer:
@ -81,7 +83,6 @@ outer:
} }
close(s.setState) close(s.setState)
close(s.done)
} }
func (s *sourceRtmp) applyState(state sourceRtmpState) { func (s *sourceRtmp) applyState(state sourceRtmpState) {

3
sourcertsp.go

@ -57,6 +57,8 @@ func newSourceRtsp(p *program, path *path) *sourceRtsp {
func (s *sourceRtsp) isSource() {} func (s *sourceRtsp) isSource() {}
func (s *sourceRtsp) run(initialState sourceRtspState) { func (s *sourceRtsp) run(initialState sourceRtspState) {
defer close(s.done)
s.applyState(initialState) s.applyState(initialState)
outer: outer:
@ -76,7 +78,6 @@ outer:
} }
close(s.setState) close(s.setState)
close(s.done)
} }
func (s *sourceRtsp) applyState(state sourceRtspState) { func (s *sourceRtsp) applyState(state sourceRtspState) {

Loading…
Cancel
Save