Browse Source

srt: add additional metrics (#2962)

* first attempt at the srt metrics

* Updated variables to better match mediamtx

* Prometheus metrics for SRT

* Update readme for SRT metrics

* Switch openapi to number from float

* SRT metrics metrics response fix

* Make the metric test less strict, and nolint line length on the SRT metrics struct

* remove nolint

* move BytesSent and BytesReceived into APISRTConnMetrics

* merge APISRTConn and APISRTConnMetrics

* improve tests

---------

Co-authored-by: slowe <slowe@clairglobal.com>
Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
pull/3035/head
Spencer Lowe 2 years ago committed by GitHub
parent
commit
e5e029a7f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 51
      README.md
  2. 201
      apidocs/openapi.yaml
  3. 580
      internal/core/api_test.go
  4. 51
      internal/core/metrics_test.go
  5. 135
      internal/defs/api.go
  6. 55
      internal/metrics/metrics.go
  7. 79
      internal/servers/srt/conn.go

51
README.md

@ -1612,8 +1612,57 @@ rtmps_conns_bytes_sent{id="[id]",state="[state]"} 187 @@ -1612,8 +1612,57 @@ rtmps_conns_bytes_sent{id="[id]",state="[state]"} 187
# metrics of every SRT connection
srt_conns{id="[id]",state="[state]"} 1
srt_conns_bytes_received{id="[id]",state="[state]"} 1234
srt_conns_packets_sent{id="[id]",state="[state]"} 123
srt_conns_packets_received{id="[id]",state="[state]"} 123
srt_conns_packets_sent_unique{id="[id]",state="[state]"} 123
srt_conns_packets_received_unique{id="[id]",state="[state]"} 123
srt_conns_packets_send_loss{id="[id]",state="[state]"} 123
srt_conns_packets_received_loss{id="[id]",state="[state]"} 123
srt_conns_packets_retrans{id="[id]",state="[state]"} 123
srt_conns_packets_received_retrans{id="[id]",state="[state]"} 123
srt_conns_packets_sent_ack{id="[id]",state="[state]"} 123
srt_conns_packets_received_ack{id="[id]",state="[state]"} 123
srt_conns_packets_sent_nak{id="[id]",state="[state]"} 123
srt_conns_packets_received_nak{id="[id]",state="[state]"} 123
srt_conns_packets_sent_km{id="[id]",state="[state]"} 123
srt_conns_packets_received_km{id="[id]",state="[state]"} 123
srt_conns_us_snd_duration{id="[id]",state="[state]"} 123
srt_conns_packets_send_drop{id="[id]",state="[state]"} 123
srt_conns_packets_received_drop{id="[id]",state="[state]"} 123
srt_conns_packets_received_undecrypt{id="[id]",state="[state]"} 123
srt_conns_bytes_sent{id="[id]",state="[state]"} 187
srt_conns_bytes_received{id="[id]",state="[state]"} 1234
srt_conns_bytes_sent_unique{id="[id]",state="[state]"} 123
srt_conns_bytes_received_unique{id="[id]",state="[state]"} 123
srt_conns_bytes_received_loss{id="[id]",state="[state]"} 123
srt_conns_bytes_retrans{id="[id]",state="[state]"} 123
srt_conns_bytes_received_retrans{id="[id]",state="[state]"} 123
srt_conns_bytes_send_drop{id="[id]",state="[state]"} 123
srt_conns_bytes_received_drop{id="[id]",state="[state]"} 123
srt_conns_bytes_received_undecrypt{id="[id]",state="[state]"} 123
srt_conns_us_packets_send_period{id="[id]",state="[state]"} 123.123
srt_conns_packets_flow_window{id="[id]",state="[state]"} 123
srt_conns_packets_flight_size{id="[id]",state="[state]"} 123
srt_conns_ms_rtt{id="[id]",state="[state]"} 123.123
srt_conns_mbps_send_rate{id="[id]",state="[state]"} 123
srt_conns_mbps_receive_rate{id="[id]",state="[state]"} 123.123
srt_conns_mbps_link_capacity{id="[id]",state="[state]"} 123.123
srt_conns_bytes_avail_send_buf{id="[id]",state="[state]"} 123
srt_conns_bytes_avail_receive_buf{id="[id]",state="[state]"} 123
srt_conns_mbps_max_bw{id="[id]",state="[state]"} -123
srt_conns_bytes_mss{id="[id]",state="[state]"} 123
srt_conns_packets_send_buf{id="[id]",state="[state]"} 123
srt_conns_bytes_send_buf{id="[id]",state="[state]"} 123
srt_conns_ms_send_buf{id="[id]",state="[state]"} 123
srt_conns_ms_send_tsb_pd_delay{id="[id]",state="[state]"} 123
srt_conns_packets_receive_buf{id="[id]",state="[state]"} 123
srt_conns_bytes_receive_buf{id="[id]",state="[state]"} 123
srt_conns_ms_receive_buf{id="[id]",state="[state]"} 123
srt_conns_ms_receive_tsb_pd_delay{id="[id]",state="[state]"} 123
srt_conns_packets_reorder_tolerance{id="[id]",state="[state]"} 123
srt_conns_packets_received_avg_belated_time{id="[id]",state="[state]"} 123
srt_conns_packets_send_loss_rate{id="[id]",state="[state]"} 123
srt_conns_packets_received_loss_rate{id="[id]",state="[state]"} 123
# metrics of every WebRTC session
webrtc_sessions{id="[id]",state="[state]"} 1

201
apidocs/openapi.yaml

@ -626,12 +626,211 @@ components: @@ -626,12 +626,211 @@ components:
type: string
query:
type: string
bytesReceived:
packetsSent:
type: integer
format: int64
description: The total number of sent DATA packets, including retransmitted packets
packetsReceived:
type: integer
format: int64
description: The total number of received DATA packets, including retransmitted packets
packetsSentUnique:
type: integer
format: int64
description: The total number of unique DATA packets sent by the SRT sender
packetsReceivedUnique:
type: integer
format: int64
description: The total number of unique original, retransmitted or recovered by the packet filter DATA packets received in time, decrypted without errors and, as a result, scheduled for delivery to the upstream application by the SRT receiver.
packetsSendLoss:
type: integer
format: int64
description: The total number of data packets considered or reported as lost at the sender side. Does not correspond to the packets detected as lost at the receiver side.
packetsReceivedLoss:
type: integer
format: int64
description: The total number of SRT DATA packets detected as presently missing (either reordered or lost) at the receiver side
packetsRetrans:
type: integer
format: int64
description: The total number of retransmitted packets sent by the SRT sender
packetsReceivedRetrans:
type: integer
format: int64
description: The total number of retransmitted packets registered at the receiver side
packetsSentACK:
type: integer
format: int64
description: The total number of sent ACK (Acknowledgement) control packets
packetsReceivedACK:
type: integer
format: int64
description: The total number of received ACK (Acknowledgement) control packets
packetsSentNAK:
type: integer
format: int64
description: The total number of sent NAK (Negative Acknowledgement) control packets
packetsReceivedNAK:
type: integer
format: int64
description: The total number of received NAK (Negative Acknowledgement) control packets
packetsSentKM:
type: integer
format: int64
description: The total number of sent KM (Key Material) control packets
packetsReceivedKM:
type: integer
format: int64
description: The total number of received KM (Key Material) control packets
usSndDuration:
type: integer
format: int64
description: The total accumulated time in microseconds, during which the SRT sender has some data to transmit, including packets that have been sent, but not yet acknowledged
packetsSendDrop:
type: integer
format: int64
description: The total number of dropped by the SRT sender DATA packets that have no chance to be delivered in time
packetsReceivedDrop:
type: integer
format: int64
description: The total number of dropped by the SRT receiver and, as a result, not delivered to the upstream application DATA packets
packetsReceivedUndecrypt:
type: integer
format: int64
description: The total number of packets that failed to be decrypted at the receiver side
bytesSent:
type: integer
format: int64
description: Same as packetsSent, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
bytesReceived:
type: integer
format: int64
description: Same as packetsReceived, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
bytesSentUnique:
type: integer
format: int64
description: Same as packetsSentUnique, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
bytesReceivedUnique:
type: integer
format: int64
description: Same as packetsReceivedUnique, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
bytesReceivedLoss:
type: integer
format: int64
description: Same as packetsReceivedLoss, but expressed in bytes, including payload and all the headers (IP, TCP, SRT), bytes for the presently missing (either reordered or lost) packets' payloads are estimated based on the average packet size
bytesRetrans:
type: integer
format: int64
description: Same as packetsRetrans, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
bytesReceivedRetrans:
type: integer
format: int64
description: Same as packetsReceivedRetrans, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
bytesSendDrop:
type: integer
format: int64
description: Same as packetsSendDrop, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
bytesReceivedDrop:
type: integer
format: int64
description: Same as packetsReceivedDrop, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
bytesReceivedUndecrypt:
type: integer
format: int64
description: Same as packetsReceivedUndecrypt, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
usPacketsSendPeriod:
type: number
format: float64
description: Current minimum time interval between which consecutive packets are sent, in microseconds
packetsFlowWindow:
type: integer
format: int64
description: The maximum number of packets that can be "in flight"
packetsFlightSize:
type: integer
format: int64
description: The number of packets in flight
msRTT:
type: number
format: float64
description: Smoothed round-trip time (SRTT), an exponentially-weighted moving average (EWMA) of an endpoint's RTT samples, in milliseconds
mbpsSendRate:
type: number
format: float64
description: Current transmission bandwidth, in Mbps
mbpsReceiveRate:
type: number
format: float64
description: Current receiving bandwidth, in Mbps
mbpsLinkCapacity:
type: number
format: float64
description: Estimated capacity of the network link, in Mbps
bytesAvailSendBuf:
type: integer
format: int64
description: The available space in the sender's buffer, in bytes
bytesAvailReceiveBuf:
type: integer
format: int64
description: The available space in the receiver's buffer, in bytes
mbpsMaxBW:
type: number
format: float64
description: Transmission bandwidth limit, in Mbps
byteMSS:
type: integer
format: int64
description: Maximum Segment Size (MSS), in bytes
packetsSendBuf:
type: integer
format: int64
description: The number of packets in the sender's buffer that are already scheduled for sending or even possibly sent, but not yet acknowledged
bytesSendBuf:
type: integer
format: int64
description: Instantaneous (current) value of packetsSndBuf, but expressed in bytes, including payload and all headers (IP, TCP, SRT)
msSendBuf:
type: integer
format: int64
description: The timespan (msec) of packets in the sender's buffer (unacknowledged packets)
msSendTsbPdDelay:
type: integer
format: int64
description: Timestamp-based Packet Delivery Delay value of the peer
packetsReceiveBuf:
type: integer
format: int64
description: The number of acknowledged packets in receiver's buffer
bytesReceiveBuf:
type: integer
format: int64
description: Instantaneous (current) value of packetsRcvBuf, expressed in bytes, including payload and all headers (IP, TCP, SRT)
msReceiveBuf:
type: integer
format: int64
description: The timespan (msec) of acknowledged packets in the receiver's buffer
msReceiveTsbPdDelay:
type: integer
format: int64
description: Timestamp-based Packet Delivery Delay value set on the socket via SRTO_RCVLATENCY or SRTO_LATENCY
packetsReorderTolerance:
type: integer
format: int64
description: Instant value of the packet reorder tolerance
packetsReceivedAvgBelatedTime:
type: integer
format: int64
description: Accumulated difference between the current time and the time-to-play of a packet that is received late
packetsSendLossRate:
type: number
format: float64
description: Percentage of resent data vs. sent data
packetsReceivedLossRate:
type: number
format: float64
description: Percentage of retransmitted data vs. received data
SRTConnList:
type: object

580
internal/core/api_test.go

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
//nolint:dupl,lll
package core
import (
@ -6,7 +7,6 @@ import ( @@ -6,7 +7,6 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
@ -339,7 +339,7 @@ func TestAPIPathsGet(t *testing.T) { @@ -339,7 +339,7 @@ func TestAPIPathsGet(t *testing.T) {
}
}
func TestAPIProtocolList(t *testing.T) {
func TestAPIProtocolListGet(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
@ -552,388 +552,262 @@ func TestAPIProtocolList(t *testing.T) { @@ -552,388 +552,262 @@ func TestAPIProtocolList(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}
var pa string
switch ca {
case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps", "srt":
var pa string
switch ca {
case "rtsp conns":
pa = "rtspconns"
case "rtsp sessions":
pa = "rtspsessions"
case "rtsps conns":
pa = "rtspsconns"
case "rtsps sessions":
pa = "rtspssessions"
case "rtsp conns":
pa = "rtspconns"
case "rtmp":
pa = "rtmpconns"
case "rtsp sessions":
pa = "rtspsessions"
case "rtmps":
pa = "rtmpsconns"
case "rtsps conns":
pa = "rtspsconns"
case "srt":
pa = "srtconns"
}
case "rtsps sessions":
pa = "rtspssessions"
type item struct {
State string `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
}
case "rtmp":
pa = "rtmpconns"
var out struct {
ItemCount int `json:"itemCount"`
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/list", nil, &out)
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, item{
State: "publish",
Path: "mypath",
Query: "key=val",
}, out.Items[0])
}
case "rtmps":
pa = "rtmpsconns"
case "hls":
type item struct {
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
}
var out struct {
ItemCount int `json:"itemCount"`
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/hlsmuxers/list", nil, &out)
s := fmt.Sprintf("^%d-", time.Now().Year())
require.Regexp(t, s, out.Items[0].Created)
require.Regexp(t, s, out.Items[0].LastRequest)
pa = "hlsmuxers"
case "webrtc":
type item struct {
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
State string `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
}
pa = "webrtcsessions"
var out struct {
ItemCount int `json:"itemCount"`
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/webrtcsessions/list", nil, &out)
require.Equal(t, item{
PeerConnectionEstablished: true,
State: "read",
Path: "mypath",
Query: "key=val",
}, out.Items[0])
case "srt":
pa = "srtconns"
}
})
}
}
func TestAPIProtocolGet(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp conns",
"rtsp sessions",
"rtsps conns",
"rtsps sessions",
"rtmp",
"rtmps",
"hls",
"webrtc",
"srt",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
var out1 interface{}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/list", nil, &out1)
switch ca {
case "rtsps conns", "rtsps sessions":
conf += "protocols: [tcp]\n" +
"encryption: strict\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n"
case "rtmps":
conf += "rtmpEncryption: strict\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
" all_others:\n"
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
medi := testMediaH264
switch ca { //nolint:dupl
case "rtsp conns", "rtsp sessions":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
case "rtsps conns", "rtsps sessions":
source := gortsplib.Client{
TLSConfig: &tls.Config{InsecureSkipVerify: true},
}
err := source.StartRecording("rtsps://localhost:8322/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
case "rtmp", "rtmps":
var port string
if ca == "rtmp" {
port = "1935"
} else {
port = "1936"
}
case "rtsp conns":
require.Equal(t, map[string]interface{}{
"pageCount": float64(1),
"itemCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"bytesReceived": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesReceived"],
"bytesSent": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesSent"],
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"id": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"],
"remoteAddr": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteAddr"],
},
},
}, out1)
u, err := url.Parse("rtmp://127.0.0.1:" + port + "/mypath")
require.NoError(t, err)
case "rtsp sessions":
require.Equal(t, map[string]interface{}{
"pageCount": float64(1),
"itemCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"bytesReceived": float64(0),
"bytesSent": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesSent"],
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"id": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"],
"path": "mypath",
"query": "key=val",
"remoteAddr": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteAddr"],
"state": "publish",
"transport": "UDP",
},
},
}, out1)
nconn, err := func() (net.Conn, error) {
if ca == "rtmp" {
return net.Dial("tcp", u.Host)
}
return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})
}()
require.NoError(t, err)
defer nconn.Close()
case "rtsps conns":
require.Equal(t, map[string]interface{}{
"pageCount": float64(1),
"itemCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"bytesReceived": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesReceived"],
"bytesSent": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesSent"],
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"id": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"],
"remoteAddr": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteAddr"],
},
},
}, out1)
conn, err := rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
case "rtsps sessions":
require.Equal(t, map[string]interface{}{
"pageCount": float64(1),
"itemCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"bytesReceived": float64(0),
"bytesSent": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesSent"],
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"id": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"],
"path": "mypath",
"query": "key=val",
"remoteAddr": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteAddr"],
"state": "publish",
"transport": "TCP",
},
},
}, out1)
_, err = rtmp.NewWriter(conn, test.FormatH264, nil)
require.NoError(t, err)
case "rtmp":
require.Equal(t, map[string]interface{}{
"pageCount": float64(1),
"itemCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"bytesReceived": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesReceived"],
"bytesSent": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesSent"],
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"id": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"],
"path": "mypath",
"query": "key=val",
"remoteAddr": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteAddr"],
"state": "publish",
},
},
}, out1)
time.Sleep(500 * time.Millisecond)
case "rtmps":
require.Equal(t, map[string]interface{}{
"pageCount": float64(1),
"itemCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"bytesReceived": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesReceived"],
"bytesSent": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesSent"],
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"id": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"],
"path": "mypath",
"query": "key=val",
"remoteAddr": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteAddr"],
"state": "publish",
},
},
}, out1)
case "hls":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
go func() {
time.Sleep(500 * time.Millisecond)
for i := 0; i < 3; i++ {
/*source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i)*90000,
SSRC: 563423,
},
Payload: []byte{
testSPS,
0x05,
},
})
[]byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},*/
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i)*90000,
SSRC: 563423,
},
Payload: []byte{
// testSPS,
0x05,
},
})
require.NoError(t, err)
}
}()
func() {
res, err := hc.Get("http://localhost:8888/mypath/index.m3u8")
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, 200, res.StatusCode)
}()
require.Equal(t, map[string]interface{}{
"itemCount": float64(1),
"pageCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"bytesSent": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesSent"],
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"lastRequest": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["lastRequest"],
"path": "mypath",
},
},
}, out1)
case "webrtc":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()
u, err := url.Parse("http://localhost:8889/mypath/whep")
require.NoError(t, err)
go func() {
time.Sleep(500 * time.Millisecond)
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
require.Equal(t, map[string]interface{}{
"itemCount": float64(1),
"pageCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"bytesReceived": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesReceived"],
"bytesSent": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["bytesSent"],
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"id": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"],
"localCandidate": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["localCandidate"],
"path": "mypath",
"peerConnectionEstablished": true,
"query": "key=val",
"remoteAddr": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteAddr"],
"remoteCandidate": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteCandidate"],
"state": "read",
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
}()
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
Log: test.NilLogger{},
}
_, err = c.Read(context.Background())
require.NoError(t, err)
defer checkClose(t, c.Close)
},
}, out1)
case "srt":
conf := srt.DefaultConfig()
conf.StreamId = "publish:mypath"
conn, err := srt.Dial("srt", "localhost:8890", conf)
require.NoError(t, err)
defer conn.Close()
track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
bw := bufio.NewWriter(conn)
w := mpegts.NewWriter(bw, []*mpegts.Track{track})
require.NoError(t, err)
err = w.WriteH26x(track, 0, 0, true, [][]byte{{1}})
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
require.Equal(t, map[string]interface{}{
"itemCount": float64(1),
"pageCount": float64(1),
"items": []interface{}{
map[string]interface{}{
"byteMSS": float64(1500),
"bytesAvailReceiveBuf": float64(0),
"bytesAvailSendBuf": float64(0),
"bytesReceiveBuf": float64(0),
"bytesReceived": float64(628),
"bytesReceivedBelated": float64(0),
"bytesReceivedDrop": float64(0),
"bytesReceivedLoss": float64(0),
"bytesReceivedRetrans": float64(0),
"bytesReceivedUndecrypt": float64(0),
"bytesReceivedUnique": float64(628),
"bytesRetrans": float64(0),
"bytesSendBuf": float64(0),
"bytesSendDrop": float64(0),
"bytesSent": float64(0),
"bytesSentUnique": float64(0),
"created": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["created"],
"id": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"],
"mbpsLinkCapacity": float64(0),
"mbpsMaxBW": float64(-1),
"mbpsReceiveRate": float64(0),
"mbpsSendRate": float64(0),
"msRTT": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["msRTT"],
"msReceiveBuf": float64(0),
"msReceiveTsbPdDelay": float64(120),
"msSendBuf": float64(0),
"msSendTsbPdDelay": float64(120),
"packetsFlightSize": float64(0),
"packetsFlowWindow": float64(25600),
"packetsReceiveBuf": float64(0),
"packetsReceived": float64(1),
"packetsReceivedACK": float64(0),
"packetsReceivedAvgBelatedTime": float64(0),
"packetsReceivedBelated": float64(0),
"packetsReceivedDrop": float64(0),
"packetsReceivedKM": float64(0),
"packetsReceivedLoss": float64(0),
"packetsReceivedLossRate": float64(0),
"packetsReceivedNAK": float64(0),
"packetsReceivedRetrans": float64(0),
"packetsReceivedUndecrypt": float64(0),
"packetsReceivedUnique": float64(1),
"packetsReorderTolerance": float64(0),
"packetsRetrans": float64(0),
"packetsSendBuf": float64(0),
"packetsSendDrop": float64(0),
"packetsSendLoss": float64(0),
"packetsSendLossRate": float64(0),
"packetsSent": float64(0),
"packetsSentACK": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["packetsSentACK"],
"packetsSentKM": float64(0),
"packetsSentNAK": float64(0),
"packetsSentUnique": float64(0),
"path": "mypath",
"query": "key=val",
"remoteAddr": out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["remoteAddr"],
"state": "publish",
"usPacketsSendPeriod": float64(10.967254638671875),
"usSndDuration": float64(0),
},
},
}, out1)
}
switch ca {
case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps", "srt":
var pa string
switch ca {
case "rtsp conns":
pa = "rtspconns"
case "rtsp sessions":
pa = "rtspsessions"
case "rtsps conns":
pa = "rtspsconns"
case "rtsps sessions":
pa = "rtspssessions"
var out2 interface{}
case "rtmp":
pa = "rtmpconns"
case "rtmps":
pa = "rtmpsconns"
case "srt":
pa = "srtconns"
}
type item struct {
ID string `json:"id"`
State string `json:"state"`
}
var out1 struct {
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/list", nil, &out1)
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out1.Items[0].State)
}
var out2 item
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/get/"+out1.Items[0].ID, nil, &out2)
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out2.State)
}
case "hls":
type item struct {
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
}
var out item
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/hlsmuxers/get/mypath", nil, &out)
s := fmt.Sprintf("^%d-", time.Now().Year())
require.Regexp(t, s, out.Created)
require.Regexp(t, s, out.LastRequest)
case "webrtc":
type item struct {
ID string `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
var out1 struct {
Items []item `json:"items"`
}
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/webrtcsessions/list", nil, &out1)
var out2 item
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/webrtcsessions/get/"+out1.Items[0].ID, nil, &out2)
require.Equal(t, true, out2.PeerConnectionEstablished)
if ca == "hls" {
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/get/"+
out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["path"].(string),
nil, &out2)
} else {
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/"+pa+"/get/"+
out1.(map[string]interface{})["items"].([]interface{})[0].(map[string]interface{})["id"].(string),
nil, &out2)
}
require.Equal(t, out1.(map[string]interface{})["items"].([]interface{})[0], out2)
})
}
}

51
internal/core/metrics_test.go

@ -285,8 +285,57 @@ webrtc_sessions_bytes_sent 0 @@ -285,8 +285,57 @@ webrtc_sessions_bytes_sent 0
`rtmps_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`rtmps_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns\{id=".*?",state="publish"\} 1`+"\n"+
`srt_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_sent_unique\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_unique\{id=".*?",state="publish"\} 1`+"\n"+
`srt_conns_packets_send_loss\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_loss\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_retrans\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_retrans\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_sent_ack\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_ack\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_sent_nak\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_nak\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_sent_km\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_km\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_us_snd_duration\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_send_drop\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_drop\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_undecrypt\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_sent\{id=".*?",state="publish"\} 0`+"\n"+
`srt_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_sent_unique\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_received_unique\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_received_loss\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_retrans\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_received_retrans\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_send_drop\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_received_drop\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_received_undecrypt\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_us_packets_send_period\{id=".*?",state="publish"\} \d+\.\d+`+"\n"+
`srt_conns_packets_flow_window\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_flight_size\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_ms_rtt\{id=".*?",state="publish"\} \d+\.\d+`+"\n"+
`srt_conns_mbps_send_rate\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_mbps_receive_rate\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_mbps_link_capacity\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_avail_send_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_avail_receive_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_mbps_max_bw\{id=".*?",state="publish"\} -1`+"\n"+
`srt_conns_bytes_mss\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_send_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_send_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_ms_send_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_ms_send_tsb_pd_delay\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_receive_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_receive_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_ms_receive_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_ms_receive_tsb_pd_delay\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_reorder_tolerance\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_avg_belated_time\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_send_loss_rate\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_packets_received_loss_rate\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`webrtc_sessions\{id=".*?",state="publish"\} 1`+"\n"+
`webrtc_sessions_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`webrtc_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+

135
internal/defs/api.go

@ -148,14 +148,133 @@ const ( @@ -148,14 +148,133 @@ const (
// APISRTConn is a SRT connection.
type APISRTConn struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State APISRTConnState `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State APISRTConnState `json:"state"`
Path string `json:"path"`
Query string `json:"query"`
// The metric names/comments are pulled from GoSRT
// The total number of sent DATA packets, including retransmitted packets
PacketsSent uint64 `json:"packetsSent"`
// The total number of received DATA packets, including retransmitted packets
PacketsReceived uint64 `json:"packetsReceived"`
// The total number of unique DATA packets sent by the SRT sender
PacketsSentUnique uint64 `json:"packetsSentUnique"`
// The total number of unique original, retransmitted or recovered by the packet filter DATA packets
// received in time, decrypted without errors and, as a result, scheduled for delivery to the
// upstream application by the SRT receiver.
PacketsReceivedUnique uint64 `json:"packetsReceivedUnique"`
// The total number of data packets considered or reported as lost at the sender side.
// Does not correspond to the packets detected as lost at the receiver side.
PacketsSendLoss uint64 `json:"packetsSendLoss"`
// The total number of SRT DATA packets detected as presently missing (either reordered or lost) at the receiver side
PacketsReceivedLoss uint64 `json:"packetsReceivedLoss"`
// The total number of retransmitted packets sent by the SRT sender
PacketsRetrans uint64 `json:"packetsRetrans"`
// The total number of retransmitted packets registered at the receiver side
PacketsReceivedRetrans uint64 `json:"packetsReceivedRetrans"`
// The total number of sent ACK (Acknowledgement) control packets
PacketsSentACK uint64 `json:"packetsSentACK"`
// The total number of received ACK (Acknowledgement) control packets
PacketsReceivedACK uint64 `json:"packetsReceivedACK"`
// The total number of sent NAK (Negative Acknowledgement) control packets
PacketsSentNAK uint64 `json:"packetsSentNAK"`
// The total number of received NAK (Negative Acknowledgement) control packets
PacketsReceivedNAK uint64 `json:"packetsReceivedNAK"`
// The total number of sent KM (Key Material) control packets
PacketsSentKM uint64 `json:"packetsSentKM"`
// The total number of received KM (Key Material) control packets
PacketsReceivedKM uint64 `json:"packetsReceivedKM"`
// The total accumulated time in microseconds, during which the SRT sender has some data to transmit,
// including packets that have been sent, but not yet acknowledged
UsSndDuration uint64 `json:"usSndDuration"`
// ??
PacketsReceivedBelated uint64 `json:"packetsReceivedBelated"`
// The total number of dropped by the SRT sender DATA packets that have no chance to be delivered in time
PacketsSendDrop uint64 `json:"packetsSendDrop"`
// The total number of dropped by the SRT receiver and, as a result,
// not delivered to the upstream application DATA packets
PacketsReceivedDrop uint64 `json:"packetsReceivedDrop"`
// The total number of packets that failed to be decrypted at the receiver side
PacketsReceivedUndecrypt uint64 `json:"packetsReceivedUndecrypt"`
// Same as packetsReceived, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesReceived uint64 `json:"bytesReceived"`
// Same as packetsSent, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesSent uint64 `json:"bytesSent"`
// Same as packetsSentUnique, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesSentUnique uint64 `json:"bytesSentUnique"`
// Same as packetsReceivedUnique, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesReceivedUnique uint64 `json:"bytesReceivedUnique"`
// Same as packetsReceivedLoss, but expressed in bytes, including payload and all the headers (IP, TCP, SRT),
// bytes for the presently missing (either reordered or lost) packets' payloads are estimated
// based on the average packet size
BytesReceivedLoss uint64 `json:"bytesReceivedLoss"`
// Same as packetsRetrans, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesRetrans uint64 `json:"bytesRetrans"`
// Same as packetsReceivedRetrans, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesReceivedRetrans uint64 `json:"bytesReceivedRetrans"`
// Same as PacketsReceivedBelated, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesReceivedBelated uint64 `json:"bytesReceivedBelated"`
// Same as packetsSendDrop, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesSendDrop uint64 `json:"bytesSendDrop"`
// Same as packetsReceivedDrop, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesReceivedDrop uint64 `json:"bytesReceivedDrop"`
// Same as packetsReceivedUndecrypt, but expressed in bytes, including payload and all the headers (IP, TCP, SRT)
BytesReceivedUndecrypt uint64 `json:"bytesReceivedUndecrypt"`
// Current minimum time interval between which consecutive packets are sent, in microseconds
UsPacketsSendPeriod float64 `json:"usPacketsSendPeriod"`
// The maximum number of packets that can be "in flight"
PacketsFlowWindow uint64 `json:"packetsFlowWindow"`
// The number of packets in flight
PacketsFlightSize uint64 `json:"packetsFlightSize"`
// Smoothed round-trip time (SRTT), an exponentially-weighted moving average (EWMA)
// of an endpoint's RTT samples, in milliseconds
MsRTT float64 `json:"msRTT"`
// Current transmission bandwidth, in Mbps
MbpsSendRate float64 `json:"mbpsSendRate"`
// Current receiving bandwidth, in Mbps
MbpsReceiveRate float64 `json:"mbpsReceiveRate"`
// Estimated capacity of the network link, in Mbps
MbpsLinkCapacity float64 `json:"mbpsLinkCapacity"`
// The available space in the sender's buffer, in bytes
BytesAvailSendBuf uint64 `json:"bytesAvailSendBuf"`
// The available space in the receiver's buffer, in bytes
BytesAvailReceiveBuf uint64 `json:"bytesAvailReceiveBuf"`
// Transmission bandwidth limit, in Mbps
MbpsMaxBW float64 `json:"mbpsMaxBW"`
// Maximum Segment Size (MSS), in bytes
ByteMSS uint64 `json:"byteMSS"`
// The number of packets in the sender's buffer that are already scheduled
// for sending or even possibly sent, but not yet acknowledged
PacketsSendBuf uint64 `json:"packetsSendBuf"`
// Instantaneous (current) value of packetsSndBuf, but expressed in bytes,
// including payload and all headers (IP, TCP, SRT)
BytesSendBuf uint64 `json:"bytesSendBuf"`
// The timespan (msec) of packets in the sender's buffer (unacknowledged packets)
MsSendBuf uint64 `json:"msSendBuf"`
// Timestamp-based Packet Delivery Delay value of the peer
MsSendTsbPdDelay uint64 `json:"msSendTsbPdDelay"`
// The number of acknowledged packets in receiver's buffer
PacketsReceiveBuf uint64 `json:"packetsReceiveBuf"`
// Instantaneous (current) value of packetsRcvBuf, expressed in bytes, including payload and all headers (IP, TCP, SRT)
BytesReceiveBuf uint64 `json:"bytesReceiveBuf"`
// The timespan (msec) of acknowledged packets in the receiver's buffer
MsReceiveBuf uint64 `json:"msReceiveBuf"`
// Timestamp-based Packet Delivery Delay value set on the socket via SRTO_RCVLATENCY or SRTO_LATENCY
MsReceiveTsbPdDelay uint64 `json:"msReceiveTsbPdDelay"`
// Instant value of the packet reorder tolerance
PacketsReorderTolerance uint64 `json:"packetsReorderTolerance"`
// Accumulated difference between the current time and the time-to-play of a packet that is received late
PacketsReceivedAvgBelatedTime uint64 `json:"packetsReceivedAvgBelatedTime"`
// Percentage of resent data vs. sent data
PacketsSendLossRate float64 `json:"packetsSendLossRate"`
// Percentage of retransmitted data vs. received data
PacketsReceivedLossRate float64 `json:"packetsReceivedLossRate"`
}
// APISRTConnList is a list of SRT connections.

55
internal/metrics/metrics.go

@ -26,6 +26,10 @@ func metric(key string, tags string, value int64) string { @@ -26,6 +26,10 @@ func metric(key string, tags string, value int64) string {
return key + tags + " " + strconv.FormatInt(value, 10) + "\n"
}
func metricFloat(key string, tags string, value float64) string {
return key + tags + " " + strconv.FormatFloat(value, 'f', -1, 64) + "\n"
}
type metricsParent interface {
logger.Writer
}
@ -229,8 +233,57 @@ func (m *Metrics) onMetrics(ctx *gin.Context) { @@ -229,8 +233,57 @@ func (m *Metrics) onMetrics(ctx *gin.Context) {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}"
out += metric("srt_conns", tags, 1)
out += metric("srt_conns_bytes_received", tags, int64(i.BytesReceived))
out += metric("srt_conns_packets_sent", tags, int64(i.PacketsSent))
out += metric("srt_conns_packets_received", tags, int64(i.PacketsReceived))
out += metric("srt_conns_packets_sent_unique", tags, int64(i.PacketsSentUnique))
out += metric("srt_conns_packets_received_unique", tags, int64(i.PacketsReceivedUnique))
out += metric("srt_conns_packets_send_loss", tags, int64(i.PacketsSendLoss))
out += metric("srt_conns_packets_received_loss", tags, int64(i.PacketsReceivedLoss))
out += metric("srt_conns_packets_retrans", tags, int64(i.PacketsRetrans))
out += metric("srt_conns_packets_received_retrans", tags, int64(i.PacketsReceivedRetrans))
out += metric("srt_conns_packets_sent_ack", tags, int64(i.PacketsSentACK))
out += metric("srt_conns_packets_received_ack", tags, int64(i.PacketsReceivedACK))
out += metric("srt_conns_packets_sent_nak", tags, int64(i.PacketsSentNAK))
out += metric("srt_conns_packets_received_nak", tags, int64(i.PacketsReceivedNAK))
out += metric("srt_conns_packets_sent_km", tags, int64(i.PacketsSentKM))
out += metric("srt_conns_packets_received_km", tags, int64(i.PacketsReceivedKM))
out += metric("srt_conns_us_snd_duration", tags, int64(i.UsSndDuration))
out += metric("srt_conns_packets_send_drop", tags, int64(i.PacketsSendDrop))
out += metric("srt_conns_packets_received_drop", tags, int64(i.PacketsReceivedDrop))
out += metric("srt_conns_packets_received_undecrypt", tags, int64(i.PacketsReceivedUndecrypt))
out += metric("srt_conns_bytes_sent", tags, int64(i.BytesSent))
out += metric("srt_conns_bytes_received", tags, int64(i.BytesReceived))
out += metric("srt_conns_bytes_sent_unique", tags, int64(i.BytesSentUnique))
out += metric("srt_conns_bytes_received_unique", tags, int64(i.BytesReceivedUnique))
out += metric("srt_conns_bytes_received_loss", tags, int64(i.BytesReceivedLoss))
out += metric("srt_conns_bytes_retrans", tags, int64(i.BytesRetrans))
out += metric("srt_conns_bytes_received_retrans", tags, int64(i.BytesReceivedRetrans))
out += metric("srt_conns_bytes_send_drop", tags, int64(i.BytesSendDrop))
out += metric("srt_conns_bytes_received_drop", tags, int64(i.BytesReceivedDrop))
out += metric("srt_conns_bytes_received_undecrypt", tags, int64(i.BytesReceivedUndecrypt))
out += metricFloat("srt_conns_us_packets_send_period", tags, i.UsPacketsSendPeriod)
out += metric("srt_conns_packets_flow_window", tags, int64(i.PacketsFlowWindow))
out += metric("srt_conns_packets_flight_size", tags, int64(i.PacketsFlightSize))
out += metricFloat("srt_conns_ms_rtt", tags, i.MsRTT)
out += metricFloat("srt_conns_mbps_send_rate", tags, i.MbpsSendRate)
out += metricFloat("srt_conns_mbps_receive_rate", tags, i.MbpsReceiveRate)
out += metricFloat("srt_conns_mbps_link_capacity", tags, i.MbpsLinkCapacity)
out += metric("srt_conns_bytes_avail_send_buf", tags, int64(i.BytesAvailSendBuf))
out += metric("srt_conns_bytes_avail_receive_buf", tags, int64(i.BytesAvailReceiveBuf))
out += metricFloat("srt_conns_mbps_max_bw", tags, i.MbpsMaxBW)
out += metric("srt_conns_bytes_mss", tags, int64(i.ByteMSS))
out += metric("srt_conns_packets_send_buf", tags, int64(i.PacketsSendBuf))
out += metric("srt_conns_bytes_send_buf", tags, int64(i.BytesSendBuf))
out += metric("srt_conns_ms_send_buf", tags, int64(i.MsSendBuf))
out += metric("srt_conns_ms_send_tsb_pd_delay", tags, int64(i.MsSendTsbPdDelay))
out += metric("srt_conns_packets_receive_buf", tags, int64(i.PacketsReceiveBuf))
out += metric("srt_conns_bytes_receive_buf", tags, int64(i.BytesReceiveBuf))
out += metric("srt_conns_ms_receive_buf", tags, int64(i.MsReceiveBuf))
out += metric("srt_conns_ms_receive_tsb_pd_delay", tags, int64(i.MsReceiveTsbPdDelay))
out += metric("srt_conns_packets_reorder_tolerance", tags, int64(i.PacketsReorderTolerance))
out += metric("srt_conns_packets_received_avg_belated_time", tags, int64(i.PacketsReceivedAvgBelatedTime))
out += metricFloat("srt_conns_packets_send_loss_rate", tags, i.PacketsSendLossRate)
out += metricFloat("srt_conns_packets_received_loss_rate", tags, i.PacketsReceivedLossRate)
}
} else {
out += metric("srt_conns", "", 0)

79
internal/servers/srt/conn.go

@ -389,17 +389,7 @@ func (c *conn) apiItem() *defs.APISRTConn { @@ -389,17 +389,7 @@ func (c *conn) apiItem() *defs.APISRTConn {
c.mutex.RLock()
defer c.mutex.RUnlock()
bytesReceived := uint64(0)
bytesSent := uint64(0)
if c.sconn != nil {
var s srt.Statistics
c.sconn.Stats(&s)
bytesReceived = s.Accumulated.ByteRecv
bytesSent = s.Accumulated.ByteSent
}
return &defs.APISRTConn{
item := &defs.APISRTConn{
ID: c.uuid,
Created: c.created,
RemoteAddr: c.connReq.RemoteAddr().String(),
@ -415,9 +405,68 @@ func (c *conn) apiItem() *defs.APISRTConn { @@ -415,9 +405,68 @@ func (c *conn) apiItem() *defs.APISRTConn {
return defs.APISRTConnStateIdle
}
}(),
Path: c.pathName,
Query: c.query,
BytesReceived: bytesReceived,
BytesSent: bytesSent,
Path: c.pathName,
Query: c.query,
}
if c.sconn != nil {
var s srt.Statistics
c.sconn.Stats(&s)
item.PacketsSent = s.Accumulated.PktSent
item.PacketsReceived = s.Accumulated.PktRecv
item.PacketsSentUnique = s.Accumulated.PktSentUnique
item.PacketsReceivedUnique = s.Accumulated.PktRecvUnique
item.PacketsSendLoss = s.Accumulated.PktSendLoss
item.PacketsReceivedLoss = s.Accumulated.PktRecvLoss
item.PacketsRetrans = s.Accumulated.PktRetrans
item.PacketsReceivedRetrans = s.Accumulated.PktRecvRetrans
item.PacketsSentACK = s.Accumulated.PktSentACK
item.PacketsReceivedACK = s.Accumulated.PktRecvACK
item.PacketsSentNAK = s.Accumulated.PktSentNAK
item.PacketsReceivedNAK = s.Accumulated.PktRecvNAK
item.PacketsSentKM = s.Accumulated.PktSentKM
item.PacketsReceivedKM = s.Accumulated.PktRecvKM
item.UsSndDuration = s.Accumulated.UsSndDuration
item.PacketsReceivedBelated = s.Accumulated.PktRecvBelated
item.PacketsSendDrop = s.Accumulated.PktSendDrop
item.PacketsReceivedDrop = s.Accumulated.PktRecvDrop
item.PacketsReceivedUndecrypt = s.Accumulated.PktRecvUndecrypt
item.BytesSent = s.Accumulated.ByteSent
item.BytesReceived = s.Accumulated.ByteRecv
item.BytesSentUnique = s.Accumulated.ByteSentUnique
item.BytesReceivedUnique = s.Accumulated.ByteRecvUnique
item.BytesReceivedLoss = s.Accumulated.ByteRecvLoss
item.BytesRetrans = s.Accumulated.ByteRetrans
item.BytesReceivedRetrans = s.Accumulated.ByteRecvRetrans
item.BytesReceivedBelated = s.Accumulated.ByteRecvBelated
item.BytesSendDrop = s.Accumulated.ByteSendDrop
item.BytesReceivedDrop = s.Accumulated.ByteRecvDrop
item.BytesReceivedUndecrypt = s.Accumulated.ByteRecvUndecrypt
item.UsPacketsSendPeriod = s.Instantaneous.UsPktSendPeriod
item.PacketsFlowWindow = s.Instantaneous.PktFlowWindow
item.PacketsFlightSize = s.Instantaneous.PktFlightSize
item.MsRTT = s.Instantaneous.MsRTT
item.MbpsSendRate = s.Instantaneous.MbpsSentRate
item.MbpsReceiveRate = s.Instantaneous.MbpsRecvRate
item.MbpsLinkCapacity = s.Instantaneous.MbpsLinkCapacity
item.BytesAvailSendBuf = s.Instantaneous.ByteAvailSendBuf
item.BytesAvailReceiveBuf = s.Instantaneous.ByteAvailRecvBuf
item.MbpsMaxBW = s.Instantaneous.MbpsMaxBW
item.ByteMSS = s.Instantaneous.ByteMSS
item.PacketsSendBuf = s.Instantaneous.PktSendBuf
item.BytesSendBuf = s.Instantaneous.ByteSendBuf
item.MsSendBuf = s.Instantaneous.MsSendBuf
item.MsSendTsbPdDelay = s.Instantaneous.MsSendTsbPdDelay
item.PacketsReceiveBuf = s.Instantaneous.PktRecvBuf
item.BytesReceiveBuf = s.Instantaneous.ByteRecvBuf
item.MsReceiveBuf = s.Instantaneous.MsRecvBuf
item.MsReceiveTsbPdDelay = s.Instantaneous.MsRecvTsbPdDelay
item.PacketsReorderTolerance = s.Instantaneous.PktReorderTolerance
item.PacketsReceivedAvgBelatedTime = s.Instantaneous.PktRecvAvgBelatedTime
item.PacketsSendLossRate = s.Instantaneous.PktSendLossRate
item.PacketsReceivedLossRate = s.Instantaneous.PktRecvLossRate
}
return item
}

Loading…
Cancel
Save