Browse Source

fix race condition

pull/80/head
aler9 5 years ago
parent
commit
f40f3a34ab
  1. 7
      client.go
  2. 25
      main.go
  3. 12
      metrics.go
  4. 2
      path.go

7
client.go

@ -9,6 +9,7 @@ import ( @@ -9,6 +9,7 @@ import (
"os/exec"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
@ -171,12 +172,14 @@ func newClient(p *program, nconn net.Conn) *client { @@ -171,12 +172,14 @@ func newClient(p *program, nconn net.Conn) *client {
func (c *client) close() {
delete(c.p.clients, c)
atomic.AddInt64(&c.p.countClient, -1)
switch c.state {
case clientStatePlay:
c.p.readerCount -= 1
atomic.AddInt64(&c.p.countReader, -1)
case clientStateRecord:
c.p.publisherCount -= 1
atomic.AddInt64(&c.p.countPublisher, -1)
if c.streamProtocol == gortsplib.StreamProtocolUdp {
for _, track := range c.streamTracks {

25
main.go

@ -6,6 +6,7 @@ import ( @@ -6,6 +6,7 @@ import (
"log"
"net"
"os"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
@ -36,8 +37,9 @@ type program struct { @@ -36,8 +37,9 @@ type program struct {
serverRtsp *serverTcp
clients map[*client]struct{}
udpClientsByAddr map[udpClientAddr]*udpClient
publisherCount int
readerCount int
countClient int64
countPublisher int64
countReader int64
metricsGather chan metricsGatherReq
clientNew chan net.Conn
@ -150,8 +152,12 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { @@ -150,8 +152,12 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
}
func (p *program) log(format string, args ...interface{}) {
line := fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{len(p.clients),
p.publisherCount, p.readerCount}, args...)...)
countClient := atomic.LoadInt64(&p.countClient)
countPublisher := atomic.LoadInt64(&p.countPublisher)
countReader := atomic.LoadInt64(&p.countReader)
line := fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClient,
countPublisher, countReader}, args...)...)
if _, ok := p.conf.logDestinationsParsed[logDestinationStdout]; ok {
log.Println(line)
@ -198,14 +204,15 @@ outer: @@ -198,14 +204,15 @@ outer:
case req := <-p.metricsGather:
req.res <- &metricsData{
clientCount: len(p.clients),
publisherCount: p.publisherCount,
readerCount: p.readerCount,
countClient: p.countClient,
countPublisher: p.countPublisher,
countReader: p.countReader,
}
case conn := <-p.clientNew:
c := newClient(p, conn)
p.clients[c] = struct{}{}
atomic.AddInt64(&p.countClient, 1)
c.log("connected")
case client := <-p.clientClose:
@ -259,11 +266,11 @@ outer: @@ -259,11 +266,11 @@ outer:
req.res <- nil
case client := <-p.clientPlay:
p.readerCount += 1
atomic.AddInt64(&p.countReader, 1)
client.state = clientStatePlay
case client := <-p.clientRecord:
p.publisherCount += 1
atomic.AddInt64(&p.countPublisher, 1)
client.state = clientStateRecord
if client.streamProtocol == gortsplib.StreamProtocolUdp {

12
metrics.go

@ -14,9 +14,9 @@ const ( @@ -14,9 +14,9 @@ const (
)
type metricsData struct {
clientCount int
publisherCount int
readerCount int
countClient int64
countPublisher int64
countReader int64
}
type metricsGatherReq struct {
@ -76,9 +76,9 @@ func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { @@ -76,9 +76,9 @@ func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) {
out := ""
now := time.Now().UnixNano() / 1000000
out += fmt.Sprintf("clients %d %v\n", data.clientCount, now)
out += fmt.Sprintf("publishers %d %v\n", data.publisherCount, now)
out += fmt.Sprintf("readers %d %v\n", data.readerCount, now)
out += fmt.Sprintf("clients %d %v\n", data.countClient, now)
out += fmt.Sprintf("publishers %d %v\n", data.countPublisher, now)
out += fmt.Sprintf("readers %d %v\n", data.countReader, now)
w.WriteHeader(http.StatusOK)
io.WriteString(w, out)

2
path.go

@ -106,7 +106,7 @@ func (pa *path) onClose(wait bool) { @@ -106,7 +106,7 @@ func (pa *path) onClose(wait bool) {
c.close()
if wait {
<- c.done
<-c.done
}
}
}

Loading…
Cancel
Save