Browse Source

metrics: add paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent (#2620) (#2619) (#2629)

* add missing Prometheus exports (#2620, #2619):
paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent

* protect Stream.BytesSent()

* add tests

---------

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
pull/2662/head
Dr. Ralf S. Engelschall 2 years ago committed by GitHub
parent
commit
4bf0d10079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      README.md
  2. 3
      apidocs/openapi.yaml
  3. 2
      internal/core/api_test.go
  4. 1
      internal/core/core.go
  5. 25
      internal/core/metrics.go
  6. 197
      internal/core/metrics_test.go
  7. 6
      internal/core/path.go
  8. 7
      internal/core/srt_server.go
  9. 1
      internal/defs/api.go
  10. 17
      internal/stream/stream.go
  11. 5
      internal/stream/stream_format.go

6
README.md

@ -1450,6 +1450,7 @@ Obtaining: @@ -1450,6 +1450,7 @@ Obtaining:
# metrics of every path
paths{name="[path_name]",state="[state]"} 1
paths_bytes_received{name="[path_name]",state="[state]"} 1234
paths_bytes_sent{name="[path_name]",state="[state]"} 1234
# metrics of every HLS muxer
hls_muxers{name="[name]"} 1
@ -1480,6 +1481,11 @@ rtmp_conns{id="[id]",state="[state]"} 1 @@ -1480,6 +1481,11 @@ rtmp_conns{id="[id]",state="[state]"} 1
rtmp_conns_bytes_received{id="[id]",state="[state]"} 1234
rtmp_conns_bytes_sent{id="[id]",state="[state]"} 187
# metrics of every SRT connection
srt_conns{id="[id]",state="[state]"} 1
srt_conns_bytes_received{id="[id]",state="[state]"} 1234
srt_conns_bytes_sent{id="[id]",state="[state]"} 187
# metrics of every WebRTC session
webrtc_sessions{id="[id]"} 1
webrtc_sessions_bytes_received{id="[id]",state="[state]"} 1234

3
apidocs/openapi.yaml

@ -395,6 +395,9 @@ components: @@ -395,6 +395,9 @@ components:
bytesReceived:
type: integer
format: int64
bytesSent:
type: integer
format: int64
readers:
type: array
items:

2
internal/core/api_test.go

@ -422,6 +422,7 @@ func TestAPIPathsList(t *testing.T) { @@ -422,6 +422,7 @@ func TestAPIPathsList(t *testing.T) {
Ready bool `json:"ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type pathList struct {
@ -625,6 +626,7 @@ func TestAPIPathsGet(t *testing.T) { @@ -625,6 +626,7 @@ func TestAPIPathsGet(t *testing.T) {
Ready bool `json:"Ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
var pathName string

1
internal/core/core.go

@ -509,6 +509,7 @@ func (p *Core) createResources(initial bool) error { @@ -509,6 +509,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)

25
internal/core/metrics.go

@ -32,6 +32,7 @@ type metrics struct { @@ -32,6 +32,7 @@ type metrics struct {
rtspServer apiRTSPServer
rtspsServer apiRTSPServer
rtmpServer apiRTMPServer
srtServer apiSRTServer
hlsManager apiHLSManager
webRTCManager apiWebRTCManager
}
@ -96,6 +97,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -96,6 +97,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
tags := "{name=\"" + i.Name + "\",state=\"" + state + "\"}"
out += metric("paths", tags, 1)
out += metric("paths_bytes_received", tags, int64(i.BytesReceived))
out += metric("paths_bytes_sent", tags, int64(i.BytesSent))
}
} else {
out += metric("paths", "", 0)
@ -199,6 +201,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -199,6 +201,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
}
if !interfaceIsEmpty(m.srtServer) {
data, err := m.srtServer.apiConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}"
out += metric("srt_conns", tags, 1)
out += metric("srt_conns_bytes_received", tags, int64(i.BytesReceived))
out += metric("srt_conns_bytes_sent", tags, int64(i.BytesSent))
}
} else {
out += metric("srt_conns", "", 0)
out += metric("srt_conns_bytes_received", "", 0)
out += metric("srt_conns_bytes_sent", "", 0)
}
}
if !interfaceIsEmpty(m.webRTCManager) {
data, err := m.webRTCManager.apiSessionsList()
if err == nil && len(data.Items) != 0 {
@ -254,6 +272,13 @@ func (m *metrics) rtmpServerSet(s apiRTMPServer) { @@ -254,6 +272,13 @@ func (m *metrics) rtmpServerSet(s apiRTMPServer) {
m.rtmpServer = s
}
// srtServerSet is called by srtServer.
func (m *metrics) srtServerSet(s apiSRTServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.srtServer = s
}
// webRTCManagerSet is called by webRTCManager.
func (m *metrics) webRTCManagerSet(s apiWebRTCManager) {
m.mutex.Lock()

197
internal/core/metrics_test.go

@ -1,20 +1,27 @@ @@ -1,20 +1,27 @@
package core
import (
"bufio"
"context"
"crypto/tls"
"net"
"net/http"
"net/url"
"os"
"sync"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/datarhei/gosrt"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/protocols/rtmp"
"github.com/bluenviron/mediamtx/internal/protocols/webrtc"
)
func TestMetrics(t *testing.T) {
@ -60,48 +67,133 @@ rtsps_sessions_bytes_sent 0 @@ -60,48 +67,133 @@ rtsps_sessions_bytes_sent 0
rtmp_conns 0
rtmp_conns_bytes_received 0
rtmp_conns_bytes_sent 0
srt_conns 0
srt_conns_bytes_received 0
srt_conns_bytes_sent 0
webrtc_sessions 0
webrtc_sessions_bytes_received 0
webrtc_sessions_bytes_sent 0
`, string(bo))
medi := testMediaH264
source := gortsplib.Client{}
err = source.StartRecording("rtsp://localhost:8554/rtsp_path",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err = source2.StartRecording("rtsps://localhost:8322/rtsps_path",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source2.Close()
u, err := url.Parse("rtmp://localhost:1935/rtmp_path")
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err)
defer nconn.Close()
conn, err := rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
videoTrack := &format.H264{
PayloadTyp: 96,
SPS: []byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x08, 0x06, 0x07, 0x08},
PacketizationMode: 1,
}
_, err = rtmp.NewWriter(conn, videoTrack, nil)
require.NoError(t, err)
terminate := make(chan struct{})
var wg sync.WaitGroup
wg.Add(5)
go func() {
defer wg.Done()
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/rtsp_path",
&description.Session{Medias: []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{testFormatH264},
}}})
require.NoError(t, err)
defer source.Close()
<-terminate
}()
go func() {
defer wg.Done()
source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err := source2.StartRecording("rtsps://localhost:8322/rtsps_path",
&description.Session{Medias: []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{testFormatH264},
}}})
require.NoError(t, err)
defer source2.Close()
<-terminate
}()
go func() {
defer wg.Done()
u, err := url.Parse("rtmp://localhost:1935/rtmp_path")
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err)
defer nconn.Close()
conn, err := rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
_, err = rtmp.NewWriter(conn, testFormatH264, nil)
require.NoError(t, err)
<-terminate
}()
go func() {
defer wg.Done()
su, err := url.Parse("http://localhost:8889/webrtc_path/whip")
require.NoError(t, err)
s := &webrtc.WHIPClient{
HTTPClient: &http.Client{Transport: &http.Transport{}},
URL: su,
}
tracks, err := s.Publish(context.Background(), testMediaH264.Formats[0], nil)
require.NoError(t, err)
defer checkClose(t, s.Close)
err = tracks[0].WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{1},
})
require.NoError(t, err)
<-terminate
}()
go func() {
defer wg.Done()
srtConf := srt.DefaultConfig()
address, err := srtConf.UnmarshalURL("srt://localhost:8890?streamid=publish:srt_path")
require.NoError(t, err)
err = srtConf.Validate()
require.NoError(t, err)
publisher, err := srt.Dial("srt", address, srtConf)
require.NoError(t, err)
defer publisher.Close()
track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
bw := bufio.NewWriter(publisher)
w := mpegts.NewWriter(bw, []*mpegts.Track{track})
require.NoError(t, err)
err = w.WriteH26x(track, 0, 0, true, [][]byte{
{ // SPS
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9,
0x20,
},
{ // PPS
0x08, 0x06, 0x07, 0x08,
},
{ // IDR
0x05, 1,
},
})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
<-terminate
}()
time.Sleep(500 * time.Millisecond)
@ -109,11 +201,24 @@ webrtc_sessions_bytes_sent 0 @@ -109,11 +201,24 @@ webrtc_sessions_bytes_sent 0
require.Regexp(t,
`^paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
`hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
`hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
`hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
@ -135,9 +240,15 @@ webrtc_sessions_bytes_sent 0 @@ -135,9 +240,15 @@ webrtc_sessions_bytes_sent 0
`rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+
`rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`webrtc_sessions 0`+"\n"+
`webrtc_sessions_bytes_received 0`+"\n"+
`webrtc_sessions_bytes_sent 0`+"\n"+
`srt_conns\{id=".*?",state="publish"\} 1`+"\n"+
`srt_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_sent\{id=".*?",state="publish"\} 0`+"\n"+
`webrtc_sessions\{id=".*?"\} 1`+"\n"+
`webrtc_sessions_bytes_received\{id=".*?"\} [0-9]+`+"\n"+
`webrtc_sessions_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+
"$",
string(bo))
close(terminate)
wg.Wait()
}

6
internal/core/path.go

@ -718,6 +718,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) { @@ -718,6 +718,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
}
return pa.stream.BytesReceived()
}(),
BytesSent: func() uint64 {
if pa.stream == nil {
return 0
}
return pa.stream.BytesSent()
}(),
Readers: func() []defs.APIPathSourceOrReader {
ret := []defs.APIPathSourceOrReader{}
for r := range pa.readers {

7
internal/core/srt_server.go

@ -67,6 +67,7 @@ type srtServer struct { @@ -67,6 +67,7 @@ type srtServer struct {
runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent srtServerParent
@ -96,6 +97,7 @@ func newSRTServer( @@ -96,6 +97,7 @@ func newSRTServer(
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
pathManager *pathManager,
parent srtServerParent,
) (*srtServer, error) {
@ -120,6 +122,7 @@ func newSRTServer( @@ -120,6 +122,7 @@ func newSRTServer(
runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
@ -136,6 +139,10 @@ func newSRTServer( @@ -136,6 +139,10 @@ func newSRTServer(
s.Log(logger.Info, "listener opened on "+address+" (UDP)")
if s.metrics != nil {
s.metrics.srtServerSet(s)
}
newSRTListener(
s.ln,
&s.wg,

1
internal/defs/api.go

@ -35,6 +35,7 @@ type APIPath struct { @@ -35,6 +35,7 @@ type APIPath struct {
ReadyTime *time.Time `json:"readyTime"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
Readers []APIPathSourceOrReader `json:"readers"`
}

17
internal/stream/stream.go

@ -24,6 +24,7 @@ type Stream struct { @@ -24,6 +24,7 @@ type Stream struct {
desc *description.Session
bytesReceived *uint64
bytesSent *uint64
smedias map[*description.Media]*streamMedia
mutex sync.RWMutex
rtspStream *gortsplib.ServerStream
@ -40,6 +41,7 @@ func New( @@ -40,6 +41,7 @@ func New(
s := &Stream{
desc: desc,
bytesReceived: new(uint64),
bytesSent: new(uint64),
}
s.smedias = make(map[*description.Media]*streamMedia)
@ -75,6 +77,21 @@ func (s *Stream) BytesReceived() uint64 { @@ -75,6 +77,21 @@ func (s *Stream) BytesReceived() uint64 {
return atomic.LoadUint64(s.bytesReceived)
}
// BytesSent returns sent bytes.
func (s *Stream) BytesSent() uint64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
bytesSent := atomic.LoadUint64(s.bytesSent)
if s.rtspStream != nil {
bytesSent += s.rtspStream.BytesSent()
}
if s.rtspsStream != nil {
bytesSent += s.rtspsStream.BytesSent()
}
return bytesSent
}
// RTSPStream returns the RTSP stream.
func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream {
s.mutex.Lock()

5
internal/stream/stream_format.go

@ -85,7 +85,9 @@ func (sf *streamFormat) writeRTPPacket( @@ -85,7 +85,9 @@ func (sf *streamFormat) writeRTPPacket(
}
func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u unit.Unit) {
atomic.AddUint64(s.bytesReceived, unitSize(u))
size := unitSize(u)
atomic.AddUint64(s.bytesReceived, size)
if s.rtspStream != nil {
for _, pkt := range u.GetRTPPackets() {
@ -102,6 +104,7 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni @@ -102,6 +104,7 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni
for writer, cb := range sf.readers {
ccb := cb
writer.Push(func() error {
atomic.AddUint64(s.bytesSent, size)
return ccb(u)
})
}

Loading…
Cancel
Save