diff --git a/client.go b/client.go index 2b9a3bd0..7653c915 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "os/exec" "strconv" "strings" + "sync/atomic" "time" "github.com/aler9/gortsplib" @@ -171,12 +172,14 @@ func newClient(p *program, nconn net.Conn) *client { func (c *client) close() { delete(c.p.clients, c) + atomic.AddInt64(&c.p.countClient, -1) + switch c.state { case clientStatePlay: - c.p.readerCount -= 1 + atomic.AddInt64(&c.p.countReader, -1) case clientStateRecord: - c.p.publisherCount -= 1 + atomic.AddInt64(&c.p.countPublisher, -1) if c.streamProtocol == gortsplib.StreamProtocolUdp { for _, track := range c.streamTracks { diff --git a/main.go b/main.go index 64608953..3220ec42 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "log" "net" "os" + "sync/atomic" "time" "github.com/aler9/gortsplib" @@ -36,8 +37,9 @@ type program struct { serverRtsp *serverTcp clients map[*client]struct{} udpClientsByAddr map[udpClientAddr]*udpClient - publisherCount int - readerCount int + countClient int64 + countPublisher int64 + countReader int64 metricsGather chan metricsGatherReq clientNew chan net.Conn @@ -150,8 +152,12 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { } func (p *program) log(format string, args ...interface{}) { - line := fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{len(p.clients), - p.publisherCount, p.readerCount}, args...)...) + countClient := atomic.LoadInt64(&p.countClient) + countPublisher := atomic.LoadInt64(&p.countPublisher) + countReader := atomic.LoadInt64(&p.countReader) + + line := fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClient, + countPublisher, countReader}, args...)...) if _, ok := p.conf.logDestinationsParsed[logDestinationStdout]; ok { log.Println(line) @@ -198,14 +204,15 @@ outer: case req := <-p.metricsGather: req.res <- &metricsData{ - clientCount: len(p.clients), - publisherCount: p.publisherCount, - readerCount: p.readerCount, + countClient: p.countClient, + countPublisher: p.countPublisher, + countReader: p.countReader, } case conn := <-p.clientNew: c := newClient(p, conn) p.clients[c] = struct{}{} + atomic.AddInt64(&p.countClient, 1) c.log("connected") case client := <-p.clientClose: @@ -259,11 +266,11 @@ outer: req.res <- nil case client := <-p.clientPlay: - p.readerCount += 1 + atomic.AddInt64(&p.countReader, 1) client.state = clientStatePlay case client := <-p.clientRecord: - p.publisherCount += 1 + atomic.AddInt64(&p.countPublisher, 1) client.state = clientStateRecord if client.streamProtocol == gortsplib.StreamProtocolUdp { diff --git a/metrics.go b/metrics.go index be83648f..09ab92e5 100644 --- a/metrics.go +++ b/metrics.go @@ -14,9 +14,9 @@ const ( ) type metricsData struct { - clientCount int - publisherCount int - readerCount int + countClient int64 + countPublisher int64 + countReader int64 } type metricsGatherReq struct { @@ -76,9 +76,9 @@ func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { out := "" now := time.Now().UnixNano() / 1000000 - out += fmt.Sprintf("clients %d %v\n", data.clientCount, now) - out += fmt.Sprintf("publishers %d %v\n", data.publisherCount, now) - out += fmt.Sprintf("readers %d %v\n", data.readerCount, now) + out += fmt.Sprintf("clients %d %v\n", data.countClient, now) + out += fmt.Sprintf("publishers %d %v\n", data.countPublisher, now) + out += fmt.Sprintf("readers %d %v\n", data.countReader, now) w.WriteHeader(http.StatusOK) io.WriteString(w, out) diff --git a/path.go b/path.go index b006ef9c..1043e6b5 100644 --- a/path.go +++ b/path.go @@ -106,7 +106,7 @@ func (pa *path) onClose(wait bool) { c.close() if wait { - <- c.done + <-c.done } } }