Browse Source

add rtsp_proxies metrics (#61)

pull/101/head
aler9 6 years ago
parent
commit
a3a52101c0
  1. 2
      README.md
  2. 2
      conf.go
  3. 2
      main.go
  4. 6
      metrics.go
  5. 5
      path.go
  6. 4
      source.go

2
README.md

@ -216,6 +216,8 @@ There are multiple ways to monitor the server usage over time:
* `rtsp_clients{state="idle"}` is the count of clients that are neither publishing nor reading * `rtsp_clients{state="idle"}` is the count of clients that are neither publishing nor reading
* `rtsp_clients{state="publishing"}` is the count of clients that are publishing * `rtsp_clients{state="publishing"}` is the count of clients that are publishing
* `rtsp_clients{state="reading"}` is the count of clients that are reading * `rtsp_clients{state="reading"}` is the count of clients that are reading
* `rtsp_proxies{state="idle"}` is the count of proxy sources that are not running
* `rtsp_proxies{state="running"}` is the count of proxy sources that are running
* A performance monitor, compatible with pprof, can be enabled with the option `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like: * A performance monitor, compatible with pprof, can be enabled with the option `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like:
``` ```

2
conf.go

@ -216,7 +216,7 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
if pconf.Source != "record" { if pconf.Source != "record" {
if pconf.regexp != nil { if pconf.regexp != nil {
return nil, fmt.Errorf("a path with a regular expression cannot have a RTSP source; use another path") return nil, fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTSP source; use another path")
} }
pconf.sourceUrl, err = url.Parse(pconf.Source) pconf.sourceUrl, err = url.Parse(pconf.Source)

2
main.go

@ -34,6 +34,8 @@ type program struct {
countClients int64 countClients int64
countPublishers int64 countPublishers int64
countReaders int64 countReaders int64
countProxies int64
countProxiesRunning int64
clientNew chan net.Conn clientNew chan net.Conn
clientClose chan *client clientClose chan *client

6
metrics.go

@ -60,6 +60,8 @@ func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) {
countClients := atomic.LoadInt64(&m.p.countClients) countClients := atomic.LoadInt64(&m.p.countClients)
countPublishers := atomic.LoadInt64(&m.p.countPublishers) countPublishers := atomic.LoadInt64(&m.p.countPublishers)
countReaders := atomic.LoadInt64(&m.p.countReaders) countReaders := atomic.LoadInt64(&m.p.countReaders)
countProxies := atomic.LoadInt64(&m.p.countProxies)
countProxiesRunning := atomic.LoadInt64(&m.p.countProxiesRunning)
out := "" out := ""
out += fmt.Sprintf("rtsp_clients{state=\"idle\"} %d %v\n", out += fmt.Sprintf("rtsp_clients{state=\"idle\"} %d %v\n",
@ -68,6 +70,10 @@ func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) {
countPublishers, now) countPublishers, now)
out += fmt.Sprintf("rtsp_clients{state=\"reading\"} %d %v\n", out += fmt.Sprintf("rtsp_clients{state=\"reading\"} %d %v\n",
countReaders, now) countReaders, now)
out += fmt.Sprintf("rtsp_proxies{state=\"idle\"} %d %v\n",
countProxies, now)
out += fmt.Sprintf("rtsp_proxies{state=\"running\"} %d %v\n",
countProxiesRunning, now)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
io.WriteString(w, out) io.WriteString(w, out)

5
path.go

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"sync/atomic"
"time" "time"
) )
@ -147,7 +148,8 @@ func (pa *path) onCheck() {
pa.source.state == sourceStateRunning && pa.source.state == sourceStateRunning &&
!pa.hasClients() && !pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs { time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs {
pa.log("stopping on demand source since (not requested anymore)") pa.log("stopping on demand source (not requested anymore)")
atomic.AddInt64(&pa.p.countProxiesRunning, -1)
pa.source.state = sourceStateStopped pa.source.state = sourceStateStopped
pa.source.setState <- pa.source.state pa.source.setState <- pa.source.state
} }
@ -241,6 +243,7 @@ func (pa *path) onDescribe(client *client) {
if pa.source != nil && pa.source.state == sourceStateStopped { // start if needed if pa.source != nil && pa.source.state == sourceStateStopped { // start if needed
pa.log("starting on demand source") pa.log("starting on demand source")
pa.lastDescribeActivation = time.Now() pa.lastDescribeActivation = time.Now()
atomic.AddInt64(&pa.p.countProxiesRunning, +1)
pa.source.state = sourceStateRunning pa.source.state = sourceStateRunning
pa.source.setState <- pa.source.state pa.source.setState <- pa.source.state
} }

4
source.go

@ -3,6 +3,7 @@ package main
import ( import (
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
@ -46,10 +47,13 @@ func newSource(p *program, path *path, pathConf *pathConf) *source {
done: make(chan struct{}), done: make(chan struct{}),
} }
atomic.AddInt64(&p.countProxies, +1)
if pathConf.SourceOnDemand { if pathConf.SourceOnDemand {
s.state = sourceStateStopped s.state = sourceStateStopped
} else { } else {
s.state = sourceStateRunning s.state = sourceStateRunning
atomic.AddInt64(&p.countProxiesRunning, +1)
} }
return s return s

Loading…
Cancel
Save