Browse Source

rewrite metrics system to provide more data (#492)

pull/509/head
aler9 4 years ago
parent
commit
f1a812ba5d
  1. 87
      README.md
  2. 70
      internal/core/api.go
  3. 25
      internal/core/core.go
  4. 9
      internal/core/hls_server.go
  5. 168
      internal/core/metrics.go
  6. 80
      internal/core/metrics_test.go
  7. 10
      internal/core/path.go
  8. 28
      internal/core/path_manager.go
  9. 3
      internal/core/rtmp_conn.go
  10. 24
      internal/core/rtmp_server.go
  11. 3
      internal/core/rtmp_source.go
  12. 3
      internal/core/rtsp_conn.go
  13. 36
      internal/core/rtsp_server.go
  14. 3
      internal/core/rtsp_source.go
  15. 16
      internal/core/stats.go

87
README.md

@ -56,8 +56,9 @@ Plus: @@ -56,8 +56,9 @@ Plus:
* [Fallback stream](#fallback-stream)
* [Start on boot with systemd](#start-on-boot-with-systemd)
* [Corrupted frames](#corrupted-frames)
* [Monitoring](#monitoring)
* [HTTP API](#http-api)
* [Metrics](#metrics)
* [Pprof](#pprof)
* [Command-line usage](#command-line-usage)
* [Compile and run from source](#compile-and-run-from-source)
* [Links](#links)
@ -497,62 +498,70 @@ In some scenarios, the server can send incomplete or corrupted frames. This can @@ -497,62 +498,70 @@ In some scenarios, the server can send incomplete or corrupted frames. This can
readBufferSize: 8192
```
### Monitoring
### HTTP API
There are multiple ways to monitor the server usage over time:
The server can be queried and controlled with an HTTP API, that must be enabled by setting the `api` parameter in the configuration:
* Use the [HTTP API](#http-api), described below.
```yml
api: yes
```
* A metrics exporter, compatible with Prometheus, can be enabled with the parameter `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request:
The API listens on `apiAddress`, that by default is `127.0.0.1:9997`; for instance, to obtain a list of active paths, run:
```
wget -qO- localhost:9998/metrics
```
```
curl http//127.0.0.1:9997/list
```
Obtaining:
Full documentation of the API is available on the [dedicated site](https://aler9.github.io/rtsp-simple-server/).
```
rtsp_clients{state="publishing"} 15 1596122687740
rtsp_clients{state="reading"} 8 1596122687740
rtsp_sources{type="rtsp",state="idle"} 3 1596122687740
rtsp_sources{type="rtsp",state="running"} 2 1596122687740
rtsp_sources{type="rtmp",state="idle"} 1 1596122687740
rtsp_sources{type="rtmp",state="running"} 0 1596122687740
```
### Metrics
where:
A metrics exporter, compatible with Prometheus, can be enabled with the parameter `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request:
* `rtsp_clients{state="publishing"}` is the count of clients that are publishing
* `rtsp_clients{state="reading"}` is the count of clients that are reading
* `rtsp_sources{type="rtsp",state="idle"}` is the count of rtsp sources that are not running
* `rtsp_sources{type="rtsp",state="running"}` is the count of rtsp sources that are running
* `rtsp_sources{type="rtmp",state="idle"}` is the count of rtmp sources that are not running
* `rtsp_sources{type="rtmp",state="running"}` is the count of rtmp sources that are running
```
wget -qO- localhost:9998/metrics
```
* A performance monitor, compatible with pprof, can be enabled with the parameter `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like:
Obtaining:
```
go tool pprof -text http://localhost:9999/debug/pprof/goroutine
go tool pprof -text http://localhost:9999/debug/pprof/heap
go tool pprof -text http://localhost:9999/debug/pprof/profile?seconds=30
```
```
paths{state="ready"} 2 1628760831152
paths{state="notReady"} 0 1628760831152
rtsp_sessions{state="idle"} 0 1628760831152
rtsp_sessions{state="read"} 0 1628760831152
rtsp_sessions{state="publish"} 1 1628760831152
rtsps_sessions{state="idle"} 0 1628760831152
rtsps_sessions{state="read"} 0 1628760831152
rtsps_sessions{state="publish"} 0 1628760831152
rtmp_conns{state="idle"} 0 1628760831152
rtmp_conns{state="read"} 0 1628760831152
rtmp_conns{state="publish"} 1 1628760831152
```
### HTTP API
where:
The server can be queried and controlled with an HTTP API, that must be enabled by setting the `api` parameter in the configuration:
* `paths{state="ready"}` is the count of paths that are ready
* `paths{state="notReady"}` is the count of paths that are not ready
* `rtsp_sessions{state="idle"}` is the count of RTSP sessions that are idle
* `rtsp_sessions{state="read"}` is the count of RTSP sessions that are reading
* `rtsp_sessions{state="publish"}` is the counf ot RTSP sessions that are publishing
* `rtsps_sessions{state="idle"}` is the count of RTSPS sessions that are idle
* `rtsps_sessions{state="read"}` is the count of RTSPS sessions that are reading
* `rtsps_sessions{state="publish"}` is the counf ot RTSPS sessions that are publishing
* `rtmp_conns{state="idle"}` is the count of RTMP connections that are idle
* `rtmp_conns{state="read"}` is the count of RTMP connections that are reading
* `rtmp_conns{state="publish"}` is the count of RTMP connections that are publishing
```yml
api: yes
```
### PProf
The API listens on `apiAddress`, that by default is `127.0.0.1:9997`; for instance, to obtain a list of active paths, run:
A performance monitor, compatible with pprof, can be enabled with the parameter `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like:
```
curl http//127.0.0.1:9997/list
go tool pprof -text http://localhost:9999/debug/pprof/goroutine
go tool pprof -text http://localhost:9999/debug/pprof/heap
go tool pprof -text http://localhost:9999/debug/pprof/profile?seconds=30
```
Full documentation of the API is available on the [dedicated site](https://aler9.github.io/rtsp-simple-server/).
### Command-line usage
```

70
internal/core/api.go

@ -18,6 +18,10 @@ import ( @@ -18,6 +18,10 @@ import (
"github.com/aler9/rtsp-simple-server/internal/logger"
)
func interfaceIsEmpty(i interface{}) bool {
return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil()
}
func fillStruct(dest interface{}, source interface{}) {
rvsource := reflect.ValueOf(source)
rvdest := reflect.ValueOf(dest)
@ -149,6 +153,7 @@ type apiPathsListData struct { @@ -149,6 +153,7 @@ type apiPathsListData struct {
}
type apiPathsListRes1 struct {
Data *apiPathsListData
Paths map[string]*path
Err error
}
@ -157,13 +162,9 @@ type apiPathsListReq1 struct { @@ -157,13 +162,9 @@ type apiPathsListReq1 struct {
Res chan apiPathsListRes1
}
type apiPathsListRes2 struct {
Err error
}
type apiPathsListReq2 struct {
Data *apiPathsListData
Res chan apiPathsListRes2
Res chan struct{}
}
type apiRTSPSessionsListItem struct {
@ -176,13 +177,12 @@ type apiRTSPSessionsListData struct { @@ -176,13 +177,12 @@ type apiRTSPSessionsListData struct {
}
type apiRTSPSessionsListRes struct {
Err error
}
type apiRTSPSessionsListReq struct {
Data *apiRTSPSessionsListData
Err error
}
type apiRTSPSessionsListReq struct{}
type apiRTSPSessionsKickRes struct {
Err error
}
@ -201,12 +201,12 @@ type apiRTMPConnsListData struct { @@ -201,12 +201,12 @@ type apiRTMPConnsListData struct {
}
type apiRTMPConnsListRes struct {
Err error
Data *apiRTMPConnsListData
Err error
}
type apiRTMPConnsListReq struct {
Data *apiRTMPConnsListData
Res chan apiRTMPConnsListRes
Res chan apiRTMPConnsListRes
}
type apiRTMPConnsKickRes struct {
@ -495,44 +495,32 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { @@ -495,44 +495,32 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) {
}
func (a *api) onPathsList(ctx *gin.Context) {
data := apiPathsListData{
Items: make(map[string]apiPathsItem),
}
res := a.pathManager.OnAPIPathsList(apiPathsListReq1{})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
for _, pa := range res.Paths {
pa.OnAPIPathsList(apiPathsListReq2{Data: &data})
}
ctx.JSON(http.StatusOK, data)
ctx.JSON(http.StatusOK, res.Data)
}
func (a *api) onRTSPSessionsList(ctx *gin.Context) {
if reflect.ValueOf(a.rtspServer).IsNil() {
if interfaceIsEmpty(a.rtspServer) {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
data := apiRTSPSessionsListData{
Items: make(map[string]apiRTSPSessionsListItem),
}
res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data})
res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, data)
ctx.JSON(http.StatusOK, res.Data)
}
func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
if reflect.ValueOf(a.rtspServer).IsNil() {
if interfaceIsEmpty(a.rtspServer) {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
@ -549,26 +537,22 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { @@ -549,26 +537,22 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
}
func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
if reflect.ValueOf(a.rtspsServer).IsNil() {
if interfaceIsEmpty(a.rtspsServer) {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
data := apiRTSPSessionsListData{
Items: make(map[string]apiRTSPSessionsListItem),
}
res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data})
res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, data)
ctx.JSON(http.StatusOK, res.Data)
}
func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
if reflect.ValueOf(a.rtspsServer).IsNil() {
if interfaceIsEmpty(a.rtspsServer) {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
@ -585,22 +569,18 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { @@ -585,22 +569,18 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
}
func (a *api) onRTMPConnsList(ctx *gin.Context) {
if reflect.ValueOf(a.rtmpServer).IsNil() {
if interfaceIsEmpty(a.rtmpServer) {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
data := apiRTMPConnsListData{
Items: make(map[string]apiRTMPConnsListItem),
}
res := a.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{Data: &data})
res := a.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, data)
ctx.JSON(http.StatusOK, res.Data)
}
// OnConfReload is called by core.
@ -611,7 +591,7 @@ func (a *api) OnConfReload(conf *conf.Conf) { @@ -611,7 +591,7 @@ func (a *api) OnConfReload(conf *conf.Conf) {
}
func (a *api) onRTMPConnsKick(ctx *gin.Context) {
if reflect.ValueOf(a.rtmpServer).IsNil() {
if interfaceIsEmpty(a.rtmpServer) {
ctx.AbortWithStatus(http.StatusNotFound)
return
}

25
internal/core/core.go

@ -200,7 +200,6 @@ func (p *Core) createResources(initial bool) error { @@ -200,7 +200,6 @@ func (p *Core) createResources(initial bool) error {
if p.metrics == nil {
p.metrics, err = newMetrics(
p.conf.MetricsAddress,
p.stats,
p)
if err != nil {
return err
@ -229,6 +228,7 @@ func (p *Core) createResources(initial bool) error { @@ -229,6 +228,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.ReadBufferSize,
p.conf.Paths,
p.stats,
p.metrics,
p)
}
@ -260,7 +260,7 @@ func (p *Core) createResources(initial bool) error { @@ -260,7 +260,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.ProtocolsParsed,
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.stats,
p.metrics,
p.pathManager,
p)
if err != nil {
@ -295,7 +295,7 @@ func (p *Core) createResources(initial bool) error { @@ -295,7 +295,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.ProtocolsParsed,
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.stats,
p.metrics,
p.pathManager,
p)
if err != nil {
@ -315,7 +315,7 @@ func (p *Core) createResources(initial bool) error { @@ -315,7 +315,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RTSPAddress,
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.stats,
p.metrics,
p.pathManager,
p)
if err != nil {
@ -334,7 +334,6 @@ func (p *Core) createResources(initial bool) error { @@ -334,7 +334,6 @@ func (p *Core) createResources(initial bool) error {
p.conf.HLSSegmentDuration,
p.conf.HLSAllowOrigin,
p.conf.ReadBufferCount,
p.stats,
p.pathManager,
p)
if err != nil {
@ -378,16 +377,14 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -378,16 +377,14 @@ func (p *Core) closeResources(newConf *conf.Conf) {
closeMetrics := false
if newConf == nil ||
newConf.Metrics != p.conf.Metrics ||
newConf.MetricsAddress != p.conf.MetricsAddress ||
closeStats {
newConf.MetricsAddress != p.conf.MetricsAddress {
closeMetrics = true
}
closePPROF := false
if newConf == nil ||
newConf.PPROF != p.conf.PPROF ||
newConf.PPROFAddress != p.conf.PPROFAddress ||
closeStats {
newConf.PPROFAddress != p.conf.PPROFAddress {
closePPROF = true
}
@ -398,7 +395,8 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -398,7 +395,8 @@ func (p *Core) closeResources(newConf *conf.Conf) {
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
newConf.ReadBufferSize != p.conf.ReadBufferSize ||
closeStats {
closeStats ||
closeMetrics {
closePathManager = true
} else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.pathManager.OnConfReload(newConf.Paths)
@ -423,7 +421,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -423,7 +421,7 @@ func (p *Core) closeResources(newConf *conf.Conf) {
!reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) ||
newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
closeStats ||
closeMetrics ||
closePathManager {
closeRTSPServer = true
}
@ -443,7 +441,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -443,7 +441,7 @@ func (p *Core) closeResources(newConf *conf.Conf) {
!reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) ||
newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
closeStats ||
closeMetrics ||
closePathManager {
closeRTSPSServer = true
}
@ -458,7 +456,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -458,7 +456,7 @@ func (p *Core) closeResources(newConf *conf.Conf) {
newConf.RTSPAddress != p.conf.RTSPAddress ||
newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
closeStats ||
closeMetrics ||
closePathManager {
closeRTMPServer = true
}
@ -472,7 +470,6 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -472,7 +470,6 @@ func (p *Core) closeResources(newConf *conf.Conf) {
newConf.HLSSegmentDuration != p.conf.HLSSegmentDuration ||
newConf.HLSAllowOrigin != p.conf.HLSAllowOrigin ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
closeStats ||
closePathManager {
closeHLSServer = true
}

9
internal/core/hls_server.go

@ -23,7 +23,6 @@ type hlsServer struct { @@ -23,7 +23,6 @@ type hlsServer struct {
hlsSegmentDuration time.Duration
hlsAllowOrigin string
readBufferCount int
stats *stats
pathManager *pathManager
parent hlsServerParent
@ -47,7 +46,6 @@ func newHLSServer( @@ -47,7 +46,6 @@ func newHLSServer(
hlsSegmentDuration time.Duration,
hlsAllowOrigin string,
readBufferCount int,
stats *stats,
pathManager *pathManager,
parent hlsServerParent,
) (*hlsServer, error) {
@ -64,7 +62,6 @@ func newHLSServer( @@ -64,7 +62,6 @@ func newHLSServer(
hlsSegmentDuration: hlsSegmentDuration,
hlsAllowOrigin: hlsAllowOrigin,
readBufferCount: readBufferCount,
stats: stats,
pathManager: pathManager,
parent: parent,
ctx: ctx,
@ -78,11 +75,11 @@ func newHLSServer( @@ -78,11 +75,11 @@ func newHLSServer(
s.Log(logger.Info, "listener opened on "+address)
s.pathManager.OnHLSServerSet(s)
s.wg.Add(1)
go s.run()
s.pathManager.OnHLSServer(s)
return s, nil
}
@ -130,7 +127,7 @@ outer: @@ -130,7 +127,7 @@ outer:
hs.Shutdown(context.Background())
s.pathManager.OnHLSServer(nil)
s.pathManager.OnHLSServerSet(nil)
}
// ServeHTTP implements http.Handler.

168
internal/core/metrics.go

@ -6,7 +6,7 @@ import ( @@ -6,7 +6,7 @@ import (
"net"
"net/http"
"strconv"
"sync/atomic"
"sync"
"time"
"github.com/aler9/rtsp-simple-server/internal/logger"
@ -17,21 +17,36 @@ func formatMetric(key string, value int64, nowUnix int64) string { @@ -17,21 +17,36 @@ func formatMetric(key string, value int64, nowUnix int64) string {
strconv.FormatInt(nowUnix, 10) + "\n"
}
type metricsPathManager interface {
OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1
}
type metricsRTSPServer interface {
OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes
}
type metricsRTMPServer interface {
OnAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes
}
type metricsParent interface {
Log(logger.Level, string, ...interface{})
}
type metrics struct {
stats *stats
listener net.Listener
mux *http.ServeMux
server *http.Server
mutex sync.Mutex
pathManager metricsPathManager
rtspServer metricsRTSPServer
rtspsServer metricsRTSPServer
rtmpServer metricsRTMPServer
}
func newMetrics(
address string,
stats *stats,
parent metricsParent,
) (*metrics, error) {
listener, err := net.Listen("tcp", address)
@ -40,7 +55,6 @@ func newMetrics( @@ -40,7 +55,6 @@ func newMetrics(
}
m := &metrics{
stats: stats,
listener: listener,
}
@ -72,30 +86,136 @@ func (m *metrics) run() { @@ -72,30 +86,136 @@ func (m *metrics) run() {
func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) {
nowUnix := time.Now().UnixNano() / 1000000
countPublishers := atomic.LoadInt64(m.stats.CountPublishers)
countReaders := atomic.LoadInt64(m.stats.CountReaders)
countSourcesRTSP := atomic.LoadInt64(m.stats.CountSourcesRTSP)
countSourcesRTSPRunning := atomic.LoadInt64(m.stats.CountSourcesRTSPRunning)
countSourcesRTMP := atomic.LoadInt64(m.stats.CountSourcesRTMP)
countSourcesRTMPRunning := atomic.LoadInt64(m.stats.CountSourcesRTMPRunning)
out := ""
out += formatMetric("rtsp_clients{state=\"publishing\"}",
countPublishers, nowUnix)
out += formatMetric("rtsp_clients{state=\"reading\"}",
countReaders, nowUnix)
res := m.pathManager.OnAPIPathsList(apiPathsListReq1{})
if res.Err == nil {
readyCount := int64(0)
notReadyCount := int64(0)
for _, p := range res.Data.Items {
if p.SourceReady {
readyCount++
} else {
notReadyCount++
}
}
out += formatMetric("paths{state=\"ready\"}",
readyCount, nowUnix)
out += formatMetric("paths{state=\"notReady\"}",
notReadyCount, nowUnix)
}
if !interfaceIsEmpty(m.rtspServer) {
res := m.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{})
if res.Err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.Data.Items {
switch i.State {
case "idle":
idleCount++
case "read":
readCount++
case "publish":
publishCount++
}
}
out += formatMetric("rtsp_sessions{state=\"idle\"}",
idleCount, nowUnix)
out += formatMetric("rtsp_sessions{state=\"read\"}",
readCount, nowUnix)
out += formatMetric("rtsp_sessions{state=\"publish\"}",
publishCount, nowUnix)
}
}
out += formatMetric("rtsp_sources{type=\"rtsp\",state=\"idle\"}",
countSourcesRTSP-countSourcesRTSPRunning, nowUnix)
out += formatMetric("rtsp_sources{type=\"rtsp\",state=\"running\"}",
countSourcesRTSPRunning, nowUnix)
if !interfaceIsEmpty(m.rtspsServer) {
res := m.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{})
if res.Err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.Data.Items {
switch i.State {
case "idle":
idleCount++
case "read":
readCount++
case "publish":
publishCount++
}
}
out += formatMetric("rtsps_sessions{state=\"idle\"}",
idleCount, nowUnix)
out += formatMetric("rtsps_sessions{state=\"read\"}",
readCount, nowUnix)
out += formatMetric("rtsps_sessions{state=\"publish\"}",
publishCount, nowUnix)
}
}
out += formatMetric("rtsp_sources{type=\"rtmp\",state=\"idle\"}",
countSourcesRTMP-countSourcesRTMPRunning, nowUnix)
out += formatMetric("rtsp_sources{type=\"rtmp\",state=\"running\"}",
countSourcesRTMPRunning, nowUnix)
if !interfaceIsEmpty(m.rtmpServer) {
res := m.rtmpServer.OnAPIRTMPConnsList(apiRTMPConnsListReq{})
if res.Err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.Data.Items {
switch i.State {
case "idle":
idleCount++
case "read":
readCount++
case "publish":
publishCount++
}
}
out += formatMetric("rtmp_conns{state=\"idle\"}",
idleCount, nowUnix)
out += formatMetric("rtmp_conns{state=\"read\"}",
readCount, nowUnix)
out += formatMetric("rtmp_conns{state=\"publish\"}",
publishCount, nowUnix)
}
}
w.WriteHeader(http.StatusOK)
io.WriteString(w, out)
}
// OnPathManagerSet is called by pathManager.
func (m *metrics) OnPathManagerSet(s metricsPathManager) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.pathManager = s
}
// OnRTSPServer is called by rtspServer (plain).
func (m *metrics) OnRTSPServerSet(s metricsRTSPServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rtspServer = s
}
// OnRTSPServer is called by rtspServer (plain).
func (m *metrics) OnRTSPSServerSet(s metricsRTSPServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rtspsServer = s
}
// OnRTMPServerSet is called by rtmpServer.
func (m *metrics) OnRTMPServerSet(s metricsRTMPServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rtmpServer = s
}

80
internal/core/metrics_test.go

@ -0,0 +1,80 @@ @@ -0,0 +1,80 @@
package core
import (
"io/ioutil"
"net/http"
"os"
"strings"
"testing"
"github.com/aler9/gortsplib"
"github.com/stretchr/testify/require"
)
func TestMetrics(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
p, ok := newInstance("metrics: yes\n" +
"encryption: optional\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n")
require.Equal(t, true, ok)
defer p.close()
track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost:1935/test1/test2",
})
require.NoError(t, err)
defer cnt1.close()
req, err := http.NewRequest(http.MethodGet, "http://localhost:9998/metrics", nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
bo, err := ioutil.ReadAll(res.Body)
require.NoError(t, err)
vals := make(map[string]string)
lines := strings.Split(string(bo), "\n")
for _, l := range lines[:len(lines)-1] {
fields := strings.Split(l, " ")
vals[fields[0]] = fields[1]
}
require.Equal(t, map[string]string{
"paths{state=\"notReady\"}": "0",
"paths{state=\"ready\"}": "2",
"rtmp_conns{state=\"idle\"}": "0",
"rtmp_conns{state=\"publish\"}": "1",
"rtmp_conns{state=\"read\"}": "0",
"rtsp_sessions{state=\"idle\"}": "0",
"rtsp_sessions{state=\"publish\"}": "1",
"rtsp_sessions{state=\"read\"}": "0",
"rtsps_sessions{state=\"idle\"}": "0",
"rtsps_sessions{state=\"publish\"}": "0",
"rtsps_sessions{state=\"read\"}": "0",
}, vals)
}

10
internal/core/path.go

@ -820,7 +820,7 @@ func (pa *path) handleAPIPathsList(req apiPathsListReq2) { @@ -820,7 +820,7 @@ func (pa *path) handleAPIPathsList(req apiPathsListReq2) {
return ret
}(),
}
req.Res <- apiPathsListRes2{}
close(req.Res)
}
// OnSourceStaticSetReady is called by a sourceStatic.
@ -936,12 +936,12 @@ func (pa *path) OnReaderPause(req pathReaderPauseReq) { @@ -936,12 +936,12 @@ func (pa *path) OnReaderPause(req pathReaderPauseReq) {
}
// OnAPIPathsList is called by api.
func (pa *path) OnAPIPathsList(req apiPathsListReq2) apiPathsListRes2 {
req.Res = make(chan apiPathsListRes2)
func (pa *path) OnAPIPathsList(req apiPathsListReq2) {
req.Res = make(chan struct{})
select {
case pa.apiPathsList <- req:
return <-req.Res
<-req.Res
case <-pa.ctx.Done():
return apiPathsListRes2{Err: fmt.Errorf("terminated")}
}
}

28
internal/core/path_manager.go

@ -29,6 +29,7 @@ type pathManager struct { @@ -29,6 +29,7 @@ type pathManager struct {
readBufferSize int
pathConfs map[string]*conf.PathConf
stats *stats
metrics *metrics
parent pathManagerParent
ctx context.Context
@ -57,6 +58,7 @@ func newPathManager( @@ -57,6 +58,7 @@ func newPathManager(
readBufferSize int,
pathConfs map[string]*conf.PathConf,
stats *stats,
metrics *metrics,
parent pathManagerParent) *pathManager {
ctx, ctxCancel := context.WithCancel(parentCtx)
@ -68,6 +70,7 @@ func newPathManager( @@ -68,6 +70,7 @@ func newPathManager(
readBufferSize: readBufferSize,
pathConfs: pathConfs,
stats: stats,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
@ -88,6 +91,10 @@ func newPathManager( @@ -88,6 +91,10 @@ func newPathManager(
}
}
if pm.metrics != nil {
pm.metrics.OnPathManagerSet(pm)
}
pm.wg.Add(1)
go pm.run()
@ -261,6 +268,10 @@ outer: @@ -261,6 +268,10 @@ outer:
}
pm.ctxCancel()
if pm.metrics != nil {
pm.metrics.OnPathManagerSet(nil)
}
}
func (pm *pathManager) createPath(confName string, conf *conf.PathConf, name string) {
@ -406,8 +417,8 @@ func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderS @@ -406,8 +417,8 @@ func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderS
}
}
// OnHLSServer is called by hlsServer.
func (pm *pathManager) OnHLSServer(s pathManagerHLSServer) {
// OnHLSServerSet is called by hlsServer.
func (pm *pathManager) OnHLSServerSet(s pathManagerHLSServer) {
select {
case pm.hlsServerSet <- s:
case <-pm.ctx.Done():
@ -419,7 +430,18 @@ func (pm *pathManager) OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 { @@ -419,7 +430,18 @@ func (pm *pathManager) OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1 {
req.Res = make(chan apiPathsListRes1)
select {
case pm.apiPathsList <- req:
return <-req.Res
res1 := <-req.Res
res1.Data = &apiPathsListData{
Items: make(map[string]apiPathsItem),
}
for _, pa := range res1.Paths {
pa.OnAPIPathsList(apiPathsListReq2{Data: res1.Data})
}
return res1
case <-pm.ctx.Done():
return apiPathsListRes1{Err: fmt.Errorf("terminated")}
}

3
internal/core/rtmp_conn.go

@ -66,7 +66,6 @@ type rtmpConn struct { @@ -66,7 +66,6 @@ type rtmpConn struct {
runOnConnect string
runOnConnectRestart bool
wg *sync.WaitGroup
stats *stats
conn *rtmp.Conn
pathManager rtmpConnPathManager
parent rtmpConnParent
@ -89,7 +88,6 @@ func newRTMPConn( @@ -89,7 +88,6 @@ func newRTMPConn(
runOnConnect string,
runOnConnectRestart bool,
wg *sync.WaitGroup,
stats *stats,
nconn net.Conn,
pathManager rtmpConnPathManager,
parent rtmpConnParent) *rtmpConn {
@ -104,7 +102,6 @@ func newRTMPConn( @@ -104,7 +102,6 @@ func newRTMPConn(
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
wg: wg,
stats: stats,
conn: rtmp.NewServerConn(nconn),
pathManager: pathManager,
parent: parent,

24
internal/core/rtmp_server.go

@ -26,7 +26,7 @@ type rtmpServer struct { @@ -26,7 +26,7 @@ type rtmpServer struct {
rtspAddress string
runOnConnect string
runOnConnectRestart bool
stats *stats
metrics *metrics
pathManager *pathManager
parent rtmpServerParent
@ -51,7 +51,7 @@ func newRTMPServer( @@ -51,7 +51,7 @@ func newRTMPServer(
rtspAddress string,
runOnConnect string,
runOnConnectRestart bool,
stats *stats,
metrics *metrics,
pathManager *pathManager,
parent rtmpServerParent) (*rtmpServer, error) {
l, err := net.Listen("tcp", address)
@ -68,7 +68,7 @@ func newRTMPServer( @@ -68,7 +68,7 @@ func newRTMPServer(
rtspAddress: rtspAddress,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
stats: stats,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
@ -82,6 +82,10 @@ func newRTMPServer( @@ -82,6 +82,10 @@ func newRTMPServer(
s.Log(logger.Info, "listener opened on %s", address)
if s.metrics != nil {
s.metrics.OnRTMPServerSet(s)
}
s.wg.Add(1)
go s.run()
@ -147,7 +151,6 @@ outer: @@ -147,7 +151,6 @@ outer:
s.runOnConnect,
s.runOnConnectRestart,
&s.wg,
s.stats,
nconn,
s.pathManager,
s)
@ -160,8 +163,12 @@ outer: @@ -160,8 +163,12 @@ outer:
delete(s.conns, c)
case req := <-s.apiRTMPConnsList:
data := &apiRTMPConnsListData{
Items: make(map[string]apiRTMPConnsListItem),
}
for c := range s.conns {
req.Data.Items[c.ID()] = apiRTMPConnsListItem{
data.Items[c.ID()] = apiRTMPConnsListItem{
RemoteAddr: c.RemoteAddr().String(),
State: func() string {
switch c.safeState() {
@ -175,7 +182,8 @@ outer: @@ -175,7 +182,8 @@ outer:
}(),
}
}
req.Res <- apiRTMPConnsListRes{}
req.Res <- apiRTMPConnsListRes{Data: data}
case req := <-s.apiRTMPConnsKick:
res := func() bool {
@ -202,6 +210,10 @@ outer: @@ -202,6 +210,10 @@ outer:
s.ctxCancel()
s.l.Close()
if s.metrics != nil {
s.metrics.OnRTMPServerSet(s)
}
}
func (s *rtmpServer) newConnID() (string, error) {

3
internal/core/rtmp_source.go

@ -4,7 +4,6 @@ import ( @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
@ -61,7 +60,6 @@ func newRTMPSource( @@ -61,7 +60,6 @@ func newRTMPSource(
ctxCancel: ctxCancel,
}
atomic.AddInt64(s.stats.CountSourcesRTMP, +1)
s.log(logger.Info, "started")
s.wg.Add(1)
@ -72,7 +70,6 @@ func newRTMPSource( @@ -72,7 +70,6 @@ func newRTMPSource(
// Close closes a Source.
func (s *rtmpSource) Close() {
atomic.AddInt64(s.stats.CountSourcesRTMPRunning, -1)
s.log(logger.Info, "stopped")
s.ctxCancel()
}

3
internal/core/rtsp_conn.go

@ -41,7 +41,6 @@ type rtspConn struct { @@ -41,7 +41,6 @@ type rtspConn struct {
runOnConnect string
runOnConnectRestart bool
pathManager *pathManager
stats *stats
conn *gortsplib.ServerConn
parent rtspConnParent
@ -59,7 +58,6 @@ func newRTSPConn( @@ -59,7 +58,6 @@ func newRTSPConn(
runOnConnect string,
runOnConnectRestart bool,
pathManager *pathManager,
stats *stats,
conn *gortsplib.ServerConn,
parent rtspConnParent) *rtspConn {
c := &rtspConn{
@ -69,7 +67,6 @@ func newRTSPConn( @@ -69,7 +67,6 @@ func newRTSPConn(
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
pathManager: pathManager,
stats: stats,
conn: conn,
parent: parent,
}

36
internal/core/rtsp_server.go

@ -30,7 +30,7 @@ type rtspServer struct { @@ -30,7 +30,7 @@ type rtspServer struct {
protocols map[conf.Protocol]struct{}
runOnConnect string
runOnConnectRestart bool
stats *stats
metrics *metrics
pathManager *pathManager
parent rtspServerParent
@ -65,7 +65,7 @@ func newRTSPServer( @@ -65,7 +65,7 @@ func newRTSPServer(
protocols map[conf.Protocol]struct{},
runOnConnect string,
runOnConnectRestart bool,
stats *stats,
metrics *metrics,
pathManager *pathManager,
parent rtspServerParent) (*rtspServer, error) {
ctx, ctxCancel := context.WithCancel(parentCtx)
@ -76,7 +76,7 @@ func newRTSPServer( @@ -76,7 +76,7 @@ func newRTSPServer(
isTLS: isTLS,
rtspAddress: rtspAddress,
protocols: protocols,
stats: stats,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
@ -128,6 +128,14 @@ func newRTSPServer( @@ -128,6 +128,14 @@ func newRTSPServer(
s.Log(logger.Info, "TCP listener opened on %s", address)
if s.metrics != nil {
if !isTLS {
s.metrics.OnRTSPServerSet(s)
} else {
s.metrics.OnRTSPSServerSet(s)
}
}
s.wg.Add(1)
go s.run()
@ -179,6 +187,14 @@ outer: @@ -179,6 +187,14 @@ outer:
s.ctxCancel()
s.srv.Close()
if s.metrics != nil {
if !s.isTLS {
s.metrics.OnRTSPServerSet(nil)
} else {
s.metrics.OnRTSPSServerSet(nil)
}
}
}
func (s *rtspServer) newSessionID() (string, error) {
@ -218,7 +234,6 @@ func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { @@ -218,7 +234,6 @@ func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
s.runOnConnect,
s.runOnConnectRestart,
s.pathManager,
s.stats,
ctx.Conn,
s)
@ -344,7 +359,7 @@ func (s *rtspServer) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { @@ -344,7 +359,7 @@ func (s *rtspServer) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
se.OnFrame(ctx)
}
// OnAPIRTSPSessionsList is called by api.
// OnAPIRTSPSessionsList is called by api and metrics.
func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes {
select {
case <-s.ctx.Done():
@ -353,8 +368,14 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe @@ -353,8 +368,14 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe
}
s.mutex.RLock()
defer s.mutex.RUnlock()
data := &apiRTSPSessionsListData{
Items: make(map[string]apiRTSPSessionsListItem),
}
for _, s := range s.sessions {
req.Data.Items[s.ID()] = apiRTSPSessionsListItem{
data.Items[s.ID()] = apiRTSPSessionsListItem{
RemoteAddr: s.RemoteAddr().String(),
State: func() string {
switch s.safeState() {
@ -370,9 +391,8 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe @@ -370,9 +391,8 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe
}(),
}
}
s.mutex.RUnlock()
return apiRTSPSessionsListRes{}
return apiRTSPSessionsListRes{Data: data}
}
// OnAPIRTSPSessionsKick is called by api.

3
internal/core/rtsp_source.go

@ -8,7 +8,6 @@ import ( @@ -8,7 +8,6 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
@ -75,7 +74,6 @@ func newRTSPSource( @@ -75,7 +74,6 @@ func newRTSPSource(
ctxCancel: ctxCancel,
}
atomic.AddInt64(s.stats.CountSourcesRTSP, +1)
s.log(logger.Info, "started")
s.wg.Add(1)
@ -85,7 +83,6 @@ func newRTSPSource( @@ -85,7 +83,6 @@ func newRTSPSource(
}
func (s *rtspSource) Close() {
atomic.AddInt64(s.stats.CountSourcesRTSP, -1)
s.log(logger.Info, "stopped")
s.ctxCancel()
}

16
internal/core/stats.go

@ -8,22 +8,14 @@ func ptrInt64() *int64 { @@ -8,22 +8,14 @@ func ptrInt64() *int64 {
type stats struct {
// use pointers to avoid a crash on 32bit platforms
// https://github.com/golang/go/issues/9959
CountPublishers *int64
CountReaders *int64
CountSourcesRTSP *int64
CountSourcesRTSPRunning *int64
CountSourcesRTMP *int64
CountSourcesRTMPRunning *int64
CountPublishers *int64
CountReaders *int64
}
func newStats() *stats {
return &stats{
CountPublishers: ptrInt64(),
CountReaders: ptrInt64(),
CountSourcesRTSP: ptrInt64(),
CountSourcesRTSPRunning: ptrInt64(),
CountSourcesRTMP: ptrInt64(),
CountSourcesRTMPRunning: ptrInt64(),
CountPublishers: ptrInt64(),
CountReaders: ptrInt64(),
}
}

Loading…
Cancel
Save