Browse Source

rename metrics (#61)

pull/101/head
aler9 6 years ago
parent
commit
2085944586
  1. 6
      README.md
  2. 6
      client.go
  3. 36
      main.go
  4. 33
      metrics.go

6
README.md

@ -207,9 +207,9 @@ There are multiple ways to monitor the server usage over time:
``` ```
Obtaining: Obtaining:
``` ```
clients 23 1596122687740 rtsp_clients{state="idle"} 2 1596122687740
publishers 15 1596122687740 rtsp_clients{state="publishing"} 15 1596122687740
readers 8 1596122687740 rtsp_clients{state="reading"} 8 1596122687740
``` ```
* A performance monitor, compatible with pprof, can be enabled with the option `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like: * A performance monitor, compatible with pprof, can be enabled with the option `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like:

6
client.go

@ -138,15 +138,15 @@ func newClient(p *program, nconn net.Conn) *client {
func (c *client) close() { func (c *client) close() {
delete(c.p.clients, c) delete(c.p.clients, c)
atomic.AddInt64(&c.p.countClient, -1) atomic.AddInt64(&c.p.countClients, -1)
switch c.state { switch c.state {
case clientStatePlay: case clientStatePlay:
atomic.AddInt64(&c.p.countReader, -1) atomic.AddInt64(&c.p.countReaders, -1)
c.p.readersMap.remove(c) c.p.readersMap.remove(c)
case clientStateRecord: case clientStateRecord:
atomic.AddInt64(&c.p.countPublisher, -1) atomic.AddInt64(&c.p.countPublishers, -1)
if c.streamProtocol == gortsplib.StreamProtocolUDP { if c.streamProtocol == gortsplib.StreamProtocolUDP {
for _, track := range c.streamTracks { for _, track := range c.streamTracks {

36
main.go

@ -31,11 +31,10 @@ type program struct {
clients map[*client]struct{} clients map[*client]struct{}
udpPublishersMap *udpPublishersMap udpPublishersMap *udpPublishersMap
readersMap *readersMap readersMap *readersMap
countClient int64 countClients int64
countPublisher int64 countPublishers int64
countReader int64 countReaders int64
metricsGather chan metricsGatherReq
clientNew chan net.Conn clientNew chan net.Conn
clientClose chan *client clientClose chan *client
clientDescribe chan clientDescribeReq clientDescribe chan clientDescribeReq
@ -80,7 +79,6 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
clients: make(map[*client]struct{}), clients: make(map[*client]struct{}),
udpPublishersMap: newUdpPublisherMap(), udpPublishersMap: newUdpPublisherMap(),
readersMap: newReadersMap(), readersMap: newReadersMap(),
metricsGather: make(chan metricsGatherReq),
clientNew: make(chan net.Conn), clientNew: make(chan net.Conn),
clientClose: make(chan *client), clientClose: make(chan *client),
clientDescribe: make(chan clientDescribeReq), clientDescribe: make(chan clientDescribeReq),
@ -139,12 +137,12 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
} }
func (p *program) log(format string, args ...interface{}) { func (p *program) log(format string, args ...interface{}) {
countClient := atomic.LoadInt64(&p.countClient) countClients := atomic.LoadInt64(&p.countClients)
countPublisher := atomic.LoadInt64(&p.countPublisher) countPublishers := atomic.LoadInt64(&p.countPublishers)
countReader := atomic.LoadInt64(&p.countReader) countReaders := atomic.LoadInt64(&p.countReaders)
log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClient, log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClients,
countPublisher, countReader}, args...)...)) countPublishers, countReaders}, args...)...))
} }
func (p *program) run() { func (p *program) run() {
@ -181,17 +179,10 @@ outer:
path.onCheck() path.onCheck()
} }
case req := <-p.metricsGather:
req.res <- &metricsData{
countClient: p.countClient,
countPublisher: p.countPublisher,
countReader: p.countReader,
}
case conn := <-p.clientNew: case conn := <-p.clientNew:
c := newClient(p, conn) c := newClient(p, conn)
p.clients[c] = struct{}{} p.clients[c] = struct{}{}
atomic.AddInt64(&p.countClient, 1) atomic.AddInt64(&p.countClients, 1)
c.log("connected") c.log("connected")
case client := <-p.clientClose: case client := <-p.clientClose:
@ -245,12 +236,12 @@ outer:
req.res <- nil req.res <- nil
case client := <-p.clientPlay: case client := <-p.clientPlay:
atomic.AddInt64(&p.countReader, 1) atomic.AddInt64(&p.countReaders, 1)
client.state = clientStatePlay client.state = clientStatePlay
p.readersMap.add(client) p.readersMap.add(client)
case client := <-p.clientRecord: case client := <-p.clientRecord:
atomic.AddInt64(&p.countPublisher, 1) atomic.AddInt64(&p.countPublishers, 1)
client.state = clientStateRecord client.state = clientStateRecord
if client.streamProtocol == gortsplib.StreamProtocolUDP { if client.streamProtocol == gortsplib.StreamProtocolUDP {
@ -289,13 +280,11 @@ outer:
go func() { go func() {
for { for {
select { select {
case req, ok := <-p.metricsGather: case _, ok := <-p.clientNew:
if !ok { if !ok {
return return
} }
req.res <- nil
case <-p.clientNew:
case <-p.clientClose: case <-p.clientClose:
case <-p.clientDescribe: case <-p.clientDescribe:
@ -345,7 +334,6 @@ outer:
p.logHandler.close() p.logHandler.close()
close(p.metricsGather)
close(p.clientNew) close(p.clientNew)
close(p.clientClose) close(p.clientClose)
close(p.clientDescribe) close(p.clientDescribe)

33
metrics.go

@ -6,6 +6,7 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"sync/atomic"
"time" "time"
) )
@ -13,16 +14,6 @@ const (
metricsAddress = ":9998" metricsAddress = ":9998"
) )
type metricsData struct {
countClient int64
countPublisher int64
countReader int64
}
type metricsGatherReq struct {
res chan *metricsData
}
type metrics struct { type metrics struct {
p *program p *program
listener net.Listener listener net.Listener
@ -64,21 +55,19 @@ func (m *metrics) close() {
} }
func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) { func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) {
res := make(chan *metricsData) now := time.Now().UnixNano() / 1000000
m.p.metricsGather <- metricsGatherReq{res}
data := <-res
if data == nil { countClients := atomic.LoadInt64(&m.p.countClients)
w.WriteHeader(http.StatusInternalServerError) countPublishers := atomic.LoadInt64(&m.p.countPublishers)
return countReaders := atomic.LoadInt64(&m.p.countReaders)
}
out := "" out := ""
now := time.Now().UnixNano() / 1000000 out += fmt.Sprintf("rtsp_clients{state=\"idle\"} %d %v\n",
countClients-countPublishers-countReaders, now)
out += fmt.Sprintf("clients %d %v\n", data.countClient, now) out += fmt.Sprintf("rtsp_clients{state=\"publishing\"} %d %v\n",
out += fmt.Sprintf("publishers %d %v\n", data.countPublisher, now) countPublishers, now)
out += fmt.Sprintf("readers %d %v\n", data.countReader, now) out += fmt.Sprintf("rtsp_clients{state=\"reading\"} %d %v\n",
countReaders, now)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
io.WriteString(w, out) io.WriteString(w, out)

Loading…
Cancel
Save