Browse Source

Merge branch 'master' into dev

pull/101/head
aler9 5 years ago
parent
commit
a71bac5983
  1. 6
      client.go
  2. 44
      main.go
  3. 10
      metrics.go
  4. 4
      path.go
  5. 4
      proxy.go

6
client.go

@ -135,15 +135,15 @@ func newClient(p *program, nconn net.Conn) *client {
func (c *client) close() { func (c *client) close() {
delete(c.p.clients, c) delete(c.p.clients, c)
atomic.AddInt64(&c.p.countClients, -1) atomic.AddInt64(c.p.countClients, -1)
switch c.state { switch c.state {
case clientStatePlay: case clientStatePlay:
atomic.AddInt64(&c.p.countReaders, -1) atomic.AddInt64(c.p.countReaders, -1)
c.p.readersMap.remove(c) c.p.readersMap.remove(c)
case clientStateRecord: case clientStateRecord:
atomic.AddInt64(&c.p.countPublishers, -1) atomic.AddInt64(c.p.countPublishers, -1)
if c.streamProtocol == gortsplib.StreamProtocolUDP { if c.streamProtocol == gortsplib.StreamProtocolUDP {
for _, track := range c.streamTracks { for _, track := range c.streamTracks {

44
main.go

@ -31,11 +31,13 @@ type program struct {
clients map[*client]struct{} clients map[*client]struct{}
udpPublishersMap *udpPublishersMap udpPublishersMap *udpPublishersMap
readersMap *readersMap readersMap *readersMap
countClients int64 // use pointers to avoid a crash on 32bit platforms
countPublishers int64 // https://github.com/golang/go/issues/9959
countReaders int64 countClients *int64
countProxies int64 countPublishers *int64
countProxiesRunning int64 countReaders *int64
countProxies *int64
countProxiesRunning *int64
clientNew chan net.Conn clientNew chan net.Conn
clientClose chan *client clientClose chan *client
@ -81,6 +83,26 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
clients: make(map[*client]struct{}), clients: make(map[*client]struct{}),
udpPublishersMap: newUdpPublisherMap(), udpPublishersMap: newUdpPublisherMap(),
readersMap: newReadersMap(), readersMap: newReadersMap(),
countClients: func() *int64 {
v := int64(0)
return &v
}(),
countPublishers: func() *int64 {
v := int64(0)
return &v
}(),
countReaders: func() *int64 {
v := int64(0)
return &v
}(),
countProxies: func() *int64 {
v := int64(0)
return &v
}(),
countProxiesRunning: func() *int64 {
v := int64(0)
return &v
}(),
clientNew: make(chan net.Conn), clientNew: make(chan net.Conn),
clientClose: make(chan *client), clientClose: make(chan *client),
clientDescribe: make(chan clientDescribeReq), clientDescribe: make(chan clientDescribeReq),
@ -139,9 +161,9 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
} }
func (p *program) log(format string, args ...interface{}) { func (p *program) log(format string, args ...interface{}) {
countClients := atomic.LoadInt64(&p.countClients) countClients := atomic.LoadInt64(p.countClients)
countPublishers := atomic.LoadInt64(&p.countPublishers) countPublishers := atomic.LoadInt64(p.countPublishers)
countReaders := atomic.LoadInt64(&p.countReaders) countReaders := atomic.LoadInt64(p.countReaders)
log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClients, log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClients,
countPublishers, countReaders}, args...)...)) countPublishers, countReaders}, args...)...))
@ -184,7 +206,7 @@ outer:
case conn := <-p.clientNew: case conn := <-p.clientNew:
c := newClient(p, conn) c := newClient(p, conn)
p.clients[c] = struct{}{} p.clients[c] = struct{}{}
atomic.AddInt64(&p.countClients, 1) atomic.AddInt64(p.countClients, 1)
c.log("connected") c.log("connected")
case client := <-p.clientClose: case client := <-p.clientClose:
@ -238,12 +260,12 @@ outer:
req.res <- nil req.res <- nil
case client := <-p.clientPlay: case client := <-p.clientPlay:
atomic.AddInt64(&p.countReaders, 1) atomic.AddInt64(p.countReaders, 1)
client.state = clientStatePlay client.state = clientStatePlay
p.readersMap.add(client) p.readersMap.add(client)
case client := <-p.clientRecord: case client := <-p.clientRecord:
atomic.AddInt64(&p.countPublishers, 1) atomic.AddInt64(p.countPublishers, 1)
client.state = clientStateRecord client.state = clientStateRecord
if client.streamProtocol == gortsplib.StreamProtocolUDP { if client.streamProtocol == gortsplib.StreamProtocolUDP {

10
metrics.go

@ -57,11 +57,11 @@ func (m *metrics) close() {
func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) {
now := time.Now().UnixNano() / 1000000 now := time.Now().UnixNano() / 1000000
countClients := atomic.LoadInt64(&m.p.countClients) countClients := atomic.LoadInt64(m.p.countClients)
countPublishers := atomic.LoadInt64(&m.p.countPublishers) countPublishers := atomic.LoadInt64(m.p.countPublishers)
countReaders := atomic.LoadInt64(&m.p.countReaders) countReaders := atomic.LoadInt64(m.p.countReaders)
countProxies := atomic.LoadInt64(&m.p.countProxies) countProxies := atomic.LoadInt64(m.p.countProxies)
countProxiesRunning := atomic.LoadInt64(&m.p.countProxiesRunning) countProxiesRunning := atomic.LoadInt64(m.p.countProxiesRunning)
out := "" out := ""
out += fmt.Sprintf("rtsp_clients{state=\"idle\"} %d %v\n", out += fmt.Sprintf("rtsp_clients{state=\"idle\"} %d %v\n",

4
path.go

@ -149,7 +149,7 @@ func (pa *path) onCheck() {
!pa.hasClients() && !pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= proxyStopAfterDescribeSecs { time.Since(pa.lastDescribeReq) >= proxyStopAfterDescribeSecs {
pa.log("stopping on demand proxy (not requested anymore)") pa.log("stopping on demand proxy (not requested anymore)")
atomic.AddInt64(&pa.p.countProxiesRunning, -1) atomic.AddInt64(pa.p.countProxiesRunning, -1)
pa.proxy.state = proxyStateStopped pa.proxy.state = proxyStateStopped
pa.proxy.setState <- pa.proxy.state pa.proxy.setState <- pa.proxy.state
} }
@ -243,7 +243,7 @@ func (pa *path) onDescribe(client *client) {
if pa.proxy != nil && pa.proxy.state == proxyStateStopped { // start if needed if pa.proxy != nil && pa.proxy.state == proxyStateStopped { // start if needed
pa.log("starting on demand proxy") pa.log("starting on demand proxy")
pa.lastDescribeActivation = time.Now() pa.lastDescribeActivation = time.Now()
atomic.AddInt64(&pa.p.countProxiesRunning, +1) atomic.AddInt64(pa.p.countProxiesRunning, +1)
pa.proxy.state = proxyStateRunning pa.proxy.state = proxyStateRunning
pa.proxy.setState <- pa.proxy.state pa.proxy.setState <- pa.proxy.state
} }

4
proxy.go

@ -44,13 +44,13 @@ func newProxy(p *program, path *path, pathConf *pathConf) *proxy {
done: make(chan struct{}), done: make(chan struct{}),
} }
atomic.AddInt64(&p.countProxies, +1) atomic.AddInt64(p.countProxies, +1)
if pathConf.SourceOnDemand { if pathConf.SourceOnDemand {
s.state = proxyStateStopped s.state = proxyStateStopped
} else { } else {
s.state = proxyStateRunning s.state = proxyStateRunning
atomic.AddInt64(&p.countProxiesRunning, +1) atomic.AddInt64(p.countProxiesRunning, +1)
} }
return s return s

Loading…
Cancel
Save