diff --git a/client.go b/client.go index f2c45a51..e0e3d551 100644 --- a/client.go +++ b/client.go @@ -135,15 +135,15 @@ func newClient(p *program, nconn net.Conn) *client { func (c *client) close() { delete(c.p.clients, c) - atomic.AddInt64(&c.p.countClients, -1) + atomic.AddInt64(c.p.countClients, -1) switch c.state { case clientStatePlay: - atomic.AddInt64(&c.p.countReaders, -1) + atomic.AddInt64(c.p.countReaders, -1) c.p.readersMap.remove(c) case clientStateRecord: - atomic.AddInt64(&c.p.countPublishers, -1) + atomic.AddInt64(c.p.countPublishers, -1) if c.streamProtocol == gortsplib.StreamProtocolUDP { for _, track := range c.streamTracks { diff --git a/main.go b/main.go index e347b281..1cc7639c 100644 --- a/main.go +++ b/main.go @@ -20,22 +20,24 @@ const ( ) type program struct { - conf *conf - logHandler *logHandler - metrics *metrics - pprof *pprof - paths map[string]*path - serverRtp *serverUDP - serverRtcp *serverUDP - serverRtsp *serverTCP - clients map[*client]struct{} - udpPublishersMap *udpPublishersMap - readersMap *readersMap - countClients int64 - countPublishers int64 - countReaders int64 - countProxies int64 - countProxiesRunning int64 + conf *conf + logHandler *logHandler + metrics *metrics + pprof *pprof + paths map[string]*path + serverRtp *serverUDP + serverRtcp *serverUDP + serverRtsp *serverTCP + clients map[*client]struct{} + udpPublishersMap *udpPublishersMap + readersMap *readersMap + // use pointers to avoid a crash on 32bit platforms + // https://github.com/golang/go/issues/9959 + countClients *int64 + countPublishers *int64 + countReaders *int64 + countProxies *int64 + countProxiesRunning *int64 clientNew chan net.Conn clientClose chan *client @@ -81,17 +83,37 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { clients: make(map[*client]struct{}), udpPublishersMap: newUdpPublisherMap(), readersMap: newReadersMap(), - clientNew: make(chan net.Conn), - clientClose: make(chan *client), - clientDescribe: make(chan clientDescribeReq), - clientAnnounce: make(chan clientAnnounceReq), - clientSetupPlay: make(chan clientSetupPlayReq), - clientPlay: make(chan *client), - clientRecord: make(chan *client), - proxyReady: make(chan *proxy), - proxyNotReady: make(chan *proxy), - terminate: make(chan struct{}), - done: make(chan struct{}), + countClients: func() *int64 { + v := int64(0) + return &v + }(), + countPublishers: func() *int64 { + v := int64(0) + return &v + }(), + countReaders: func() *int64 { + v := int64(0) + return &v + }(), + countProxies: func() *int64 { + v := int64(0) + return &v + }(), + countProxiesRunning: func() *int64 { + v := int64(0) + return &v + }(), + clientNew: make(chan net.Conn), + clientClose: make(chan *client), + clientDescribe: make(chan clientDescribeReq), + clientAnnounce: make(chan clientAnnounceReq), + clientSetupPlay: make(chan clientSetupPlayReq), + clientPlay: make(chan *client), + clientRecord: make(chan *client), + proxyReady: make(chan *proxy), + proxyNotReady: make(chan *proxy), + terminate: make(chan struct{}), + done: make(chan struct{}), } p.log("rtsp-simple-server %s", Version) @@ -139,9 +161,9 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { } func (p *program) log(format string, args ...interface{}) { - countClients := atomic.LoadInt64(&p.countClients) - countPublishers := atomic.LoadInt64(&p.countPublishers) - countReaders := atomic.LoadInt64(&p.countReaders) + countClients := atomic.LoadInt64(p.countClients) + countPublishers := atomic.LoadInt64(p.countPublishers) + countReaders := atomic.LoadInt64(p.countReaders) log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClients, countPublishers, countReaders}, args...)...)) @@ -184,7 +206,7 @@ outer: case conn := <-p.clientNew: c := newClient(p, conn) p.clients[c] = struct{}{} - atomic.AddInt64(&p.countClients, 1) + atomic.AddInt64(p.countClients, 1) c.log("connected") case client := <-p.clientClose: @@ -238,12 +260,12 @@ outer: req.res <- nil case client := <-p.clientPlay: - atomic.AddInt64(&p.countReaders, 1) + atomic.AddInt64(p.countReaders, 1) client.state = clientStatePlay p.readersMap.add(client) case client := <-p.clientRecord: - atomic.AddInt64(&p.countPublishers, 1) + atomic.AddInt64(p.countPublishers, 1) client.state = clientStateRecord if client.streamProtocol == gortsplib.StreamProtocolUDP { diff --git a/metrics.go b/metrics.go index 753df7a5..83e64a27 100644 --- a/metrics.go +++ b/metrics.go @@ -57,11 +57,11 @@ func (m *metrics) close() { func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { now := time.Now().UnixNano() / 1000000 - countClients := atomic.LoadInt64(&m.p.countClients) - countPublishers := atomic.LoadInt64(&m.p.countPublishers) - countReaders := atomic.LoadInt64(&m.p.countReaders) - countProxies := atomic.LoadInt64(&m.p.countProxies) - countProxiesRunning := atomic.LoadInt64(&m.p.countProxiesRunning) + countClients := atomic.LoadInt64(m.p.countClients) + countPublishers := atomic.LoadInt64(m.p.countPublishers) + countReaders := atomic.LoadInt64(m.p.countReaders) + countProxies := atomic.LoadInt64(m.p.countProxies) + countProxiesRunning := atomic.LoadInt64(m.p.countProxiesRunning) out := "" out += fmt.Sprintf("rtsp_clients{state=\"idle\"} %d %v\n", diff --git a/path.go b/path.go index 88b3bf17..f00cf16c 100644 --- a/path.go +++ b/path.go @@ -149,7 +149,7 @@ func (pa *path) onCheck() { !pa.hasClients() && time.Since(pa.lastDescribeReq) >= proxyStopAfterDescribeSecs { pa.log("stopping on demand proxy (not requested anymore)") - atomic.AddInt64(&pa.p.countProxiesRunning, -1) + atomic.AddInt64(pa.p.countProxiesRunning, -1) pa.proxy.state = proxyStateStopped pa.proxy.setState <- pa.proxy.state } @@ -243,7 +243,7 @@ func (pa *path) onDescribe(client *client) { if pa.proxy != nil && pa.proxy.state == proxyStateStopped { // start if needed pa.log("starting on demand proxy") pa.lastDescribeActivation = time.Now() - atomic.AddInt64(&pa.p.countProxiesRunning, +1) + atomic.AddInt64(pa.p.countProxiesRunning, +1) pa.proxy.state = proxyStateRunning pa.proxy.setState <- pa.proxy.state } diff --git a/proxy.go b/proxy.go index 53a3e83b..2dcf3f8c 100644 --- a/proxy.go +++ b/proxy.go @@ -44,13 +44,13 @@ func newProxy(p *program, path *path, pathConf *pathConf) *proxy { done: make(chan struct{}), } - atomic.AddInt64(&p.countProxies, +1) + atomic.AddInt64(p.countProxies, +1) if pathConf.SourceOnDemand { s.state = proxyStateStopped } else { s.state = proxyStateRunning - atomic.AddInt64(&p.countProxiesRunning, +1) + atomic.AddInt64(p.countProxiesRunning, +1) } return s