Browse Source

api, metrics: add number of bytes received and sent from/to all entities (#1235)

* API: number of bytes received/sent from/to RTSP connections
* API: number of bytes received/sent from/to RTSP sessions
* API: number of bytes received/sent from/to RTMP connections
* API: number of bytes sent to HLS connections
* API: number of bytes received from paths
* metrics of all the above
pull/1245/head
Alessandro Ros 3 years ago committed by GitHub
parent
commit
8bee4af86a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 62
      README.md
  2. 53
      apidocs/openapi.yaml
  3. 2
      go.mod
  4. 4
      go.sum
  5. 61
      internal/core/api_test.go
  6. 38
      internal/core/hls_muxer.go
  7. 13
      internal/core/hls_server.go
  8. 112
      internal/core/metrics.go
  9. 57
      internal/core/metrics_test.go
  10. 21
      internal/core/path.go
  11. 10
      internal/core/rtmp_server.go
  12. 22
      internal/core/rtsp_server.go
  13. 32
      internal/core/rtsp_session.go
  14. 10
      internal/core/stream.go
  15. 4
      internal/core/streamtrack_h264.go
  16. 4
      internal/core/streamtrack_mpeg4audio.go
  17. 38
      internal/rtmp/bytecounter/reader.go
  18. 2
      internal/rtmp/bytecounter/reader_test.go
  19. 17
      internal/rtmp/bytecounter/writer.go
  20. 2
      internal/rtmp/bytecounter/writer_test.go
  21. 21
      internal/rtmp/conn.go
  22. 8
      internal/rtmp/conn_test.go
  23. 13
      internal/rtmp/rawmessage/reader.go
  24. 9
      internal/rtmp/rawmessage/reader_test.go
  25. 2
      internal/rtmp/rawmessage/writer.go

62
README.md

@ -433,7 +433,7 @@ Full documentation of the API is available on the [dedicated site](https://aler9 @@ -433,7 +433,7 @@ Full documentation of the API is available on the [dedicated site](https://aler9
### Metrics
A metrics exporter, compatible with Prometheus, can be enabled with the parameter `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request:
A metrics exporter, compatible with [Prometheus](https://prometheus.io/), can be enabled with the parameter `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request:
```
wget -qO- localhost:9998/metrics
@ -441,34 +441,40 @@ wget -qO- localhost:9998/metrics @@ -441,34 +441,40 @@ wget -qO- localhost:9998/metrics
Obtaining:
```ini
# metrics of every path
paths{name="[path_name]",state="[state]"} 1
paths_bytes_received{name="[path_name]",state="[state]"} 1234
# metrics of every RTSP connection
rtsp_conns{id="[id]"} 1
rtsp_conns_bytes_received{id="[id]"} 1234
rtsp_conns_bytes_sent{id="[id]"} 187
# metrics of every RTSP session
rtsp_sessions{id="[id]",state="idle"} 1
rtsp_sessions_bytes_received{id="[id]",state="[state]"} 1234
rtsp_sessions_bytes_sent{id="[id]",state="[state]"} 187
# metrics of every RTSPS connection
rtsps_conns{id="[id]"} 1
rtsps_conns_bytes_received{id="[id]"} 1234
rtsps_conns_bytes_sent{id="[id]"} 187
# metrics of every RTSPS session
rtsps_sessions{id="[id]",state="[state]"} 1
rtsps_sessions_bytes_received{id="[id]",state="[state]"} 1234
rtsps_sessions_bytes_sent{id="[id]",state="[state]"} 187
# metrics of every RTMP connection
rtmp_conns{id="[id]",state="[state]"} 1
rtmp_conns_bytes_received{id="[id]",state="[state]"} 1234
rtmp_conns_bytes_sent{id="[id]",state="[state]"} 187
# metrics of every HLS muxer
hls_muxers{name="[name]"} 1
hls_muxers_bytes_sent{name="[name]"} 187
```
paths{name="<path_name>",state="ready"} 1
rtsp_conns 1
rtsp_sessions{state="idle"} 0
rtsp_sessions{state="read"} 0
rtsp_sessions{state="publish"} 1
rtsps_sessions{state="idle"} 0
rtsps_sessions{state="read"} 0
rtsps_sessions{state="publish"} 0
rtmp_conns{state="idle"} 0
rtmp_conns{state="read"} 0
rtmp_conns{state="publish"} 1
hls_muxers{name="<name>"} 1
```
where:
* `paths{name="<path_name>",state="ready"} 1` is replicated for every path and shows the name and state of every path
* `rtsp_sessions{state="idle"}` is the count of RTSP sessions that are idle
* `rtsp_sessions{state="read"}` is the count of RTSP sessions that are reading
* `rtsp_sessions{state="publish"}` is the counf ot RTSP sessions that are publishing
* `rtsps_sessions{state="idle"}` is the count of RTSPS sessions that are idle
* `rtsps_sessions{state="read"}` is the count of RTSPS sessions that are reading
* `rtsps_sessions{state="publish"}` is the counf ot RTSPS sessions that are publishing
* `rtmp_conns{state="idle"}` is the count of RTMP connections that are idle
* `rtmp_conns{state="read"}` is the count of RTMP connections that are reading
* `rtmp_conns{state="publish"}` is the count of RTMP connections that are publishing
* `hls_muxers{name="<name>"}` is replicated for every HLS muxer and shows the name and state of every HLS muxer
### pprof

53
apidocs/openapi.yaml

@ -268,6 +268,8 @@ components: @@ -268,6 +268,8 @@ components:
- PCMU
- VP8
- VP9
bytesReceived:
type: number
readers:
type: array
items:
@ -382,6 +384,10 @@ components: @@ -382,6 +384,10 @@ components:
type: string
remoteAddr:
type: string
bytesReceived:
type: number
bytesSent:
type: number
RTSPSession:
type: object
@ -393,6 +399,10 @@ components: @@ -393,6 +399,10 @@ components:
state:
type: string
enum: [idle, read, publish]
bytesReceived:
type: number
bytesSent:
type: number
RTMPConn:
type: object
@ -404,17 +414,10 @@ components: @@ -404,17 +414,10 @@ components:
state:
type: string
enum: [idle, read, publish]
RTMPSConn:
type: object
properties:
created:
type: string
remoteAddr:
type: string
state:
type: string
enum: [idle, read, publish]
bytesReceived:
type: number
bytesSent:
type: number
HLSMuxer:
type: object
@ -423,6 +426,8 @@ components: @@ -423,6 +426,8 @@ components:
type: string
lastRequest:
type: string
bytesSent:
type: number
PathsList:
type: object
@ -464,14 +469,6 @@ components: @@ -464,14 +469,6 @@ components:
additionalProperties:
$ref: '#/components/schemas/RTMPConn'
RTMPSConnsList:
type: object
properties:
items:
type: object
additionalProperties:
$ref: '#/components/schemas/RTMPSConn'
HLSMuxersList:
type: object
properties:
@ -592,7 +589,7 @@ paths: @@ -592,7 +589,7 @@ paths:
/v1/paths/list:
get:
operationId: pathsList
summary: returns all active paths.
summary: returns all paths.
description: ''
responses:
'200':
@ -609,7 +606,7 @@ paths: @@ -609,7 +606,7 @@ paths:
/v1/rtspconns/list:
get:
operationId: rtspConnsList
summary: returns all active RTSP connections.
summary: returns all RTSP connections.
description: ''
responses:
'200':
@ -626,7 +623,7 @@ paths: @@ -626,7 +623,7 @@ paths:
/v1/rtspsessions/list:
get:
operationId: rtspSessionsList
summary: returns all active RTSP sessions.
summary: returns all RTSP sessions.
description: ''
responses:
'200':
@ -643,7 +640,7 @@ paths: @@ -643,7 +640,7 @@ paths:
/v1/rtspsconns/list:
get:
operationId: rtspsConnsList
summary: returns all active RTSPS connections.
summary: returns all RTSPS connections.
description: ''
responses:
'200':
@ -680,7 +677,7 @@ paths: @@ -680,7 +677,7 @@ paths:
/v1/rtspssessions/list:
get:
operationId: rtspsSessionsList
summary: returns all active RTSPS sessions.
summary: returns all RTSPS sessions.
description: ''
responses:
'200':
@ -717,7 +714,7 @@ paths: @@ -717,7 +714,7 @@ paths:
/v1/rtmpconns/list:
get:
operationId: rtmpConnsList
summary: returns all active RTMP connections.
summary: returns all RTMP connections.
description: ''
responses:
'200':
@ -754,7 +751,7 @@ paths: @@ -754,7 +751,7 @@ paths:
/v1/rtmpsconns/list:
get:
operationId: rtmpsConnsList
summary: returns all active RTMPS connections.
summary: returns all RTMPS connections.
description: ''
responses:
'200':
@ -762,7 +759,7 @@ paths: @@ -762,7 +759,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/RTMPSConnsList'
$ref: '#/components/schemas/RTMPConnsList'
'400':
description: invalid request.
'500':
@ -791,7 +788,7 @@ paths: @@ -791,7 +788,7 @@ paths:
/v1/hlsmuxers/list:
get:
operationId: hlsMuxersList
summary: returns all active HLS muxers.
summary: returns all HLS muxers.
description: ''
responses:
'200':

2
go.mod

@ -5,7 +5,7 @@ go 1.18 @@ -5,7 +5,7 @@ go 1.18
require (
code.cloudfoundry.org/bytefmt v0.0.0-20211005130812-5bb3c17173e5
github.com/abema/go-mp4 v0.8.0
github.com/aler9/gortsplib v0.0.0-20221105162652-b1ed0a8abb48
github.com/aler9/gortsplib v0.0.0-20221110211534-12c8845fef0d
github.com/asticode/go-astits v1.10.1-0.20220319093903-4abe66a9b757
github.com/fsnotify/fsnotify v1.4.9
github.com/gin-gonic/gin v1.8.1

4
go.sum

@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20221105162652-b1ed0a8abb48 h1:SVcUqrsR+RXT8cxopOahKQaj8kb02adaEeRexLLKcMc=
github.com/aler9/gortsplib v0.0.0-20221105162652-b1ed0a8abb48/go.mod h1:BOWNZ/QBkY/eVcRqUzJbPFEsRJshwxaxBT01K260Jeo=
github.com/aler9/gortsplib v0.0.0-20221110211534-12c8845fef0d h1:fRx79L1YMXaoiSMkB32xgVCUMbOcmQ4JfySaUv7XZpc=
github.com/aler9/gortsplib v0.0.0-20221110211534-12c8845fef0d/go.mod h1:BOWNZ/QBkY/eVcRqUzJbPFEsRJshwxaxBT01K260Jeo=
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9ToSVV7sRTBYZ1GGOZUpq4+5H3SN0UZq4=
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=

61
internal/core/api_test.go

@ -15,6 +15,7 @@ import ( @@ -15,6 +15,7 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/aler9/rtsp-simple-server/internal/rtmp"
@ -170,9 +171,10 @@ func TestAPIPathsList(t *testing.T) { @@ -170,9 +171,10 @@ func TestAPIPathsList(t *testing.T) {
}
type path struct {
Source pathSource `json:"source"`
SourceReady bool `json:"sourceReady"`
Tracks []string `json:"tracks"`
Source pathSource `json:"source"`
SourceReady bool `json:"sourceReady"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
}
type pathList struct {
@ -186,30 +188,38 @@ func TestAPIPathsList(t *testing.T) { @@ -186,30 +188,38 @@ func TestAPIPathsList(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
tracks := gortsplib.Tracks{
&gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
},
&gortsplib.TrackMPEG4Audio{
PayloadType: 97,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
},
}
source := gortsplib.Client{}
err := source.StartPublishing("rtsp://localhost:8554/mypath", tracks)
err := source.StartPublishing(
"rtsp://localhost:8554/mypath",
gortsplib.Tracks{
&gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
},
&gortsplib.TrackMPEG4Audio{
PayloadType: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
},
})
require.NoError(t, err)
defer source.Close()
source.WritePacketRTP(0, &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, true)
var out pathList
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/paths/list", nil, &out)
require.NoError(t, err)
@ -219,8 +229,9 @@ func TestAPIPathsList(t *testing.T) { @@ -219,8 +229,9 @@ func TestAPIPathsList(t *testing.T) {
Source: pathSource{
Type: "rtspSession",
},
SourceReady: true,
Tracks: []string{"H264", "MPEG4Audio"},
SourceReady: true,
Tracks: []string{"H264", "MPEG4Audio"},
BytesReceived: 16,
},
},
}, out)

38
internal/core/hls_muxer.go

@ -94,11 +94,16 @@ window.addEventListener('DOMContentLoaded', create); @@ -94,11 +94,16 @@ window.addEventListener('DOMContentLoaded', create);
</html>
`
type hlsMuxerResponse struct {
muxer *hlsMuxer
cb func() *hls.MuxerFileResponse
}
type hlsMuxerRequest struct {
dir string
file string
ctx *gin.Context
res chan func() *hls.MuxerFileResponse
res chan hlsMuxerResponse
}
type hlsMuxerPathManager interface {
@ -133,6 +138,7 @@ type hlsMuxer struct { @@ -133,6 +138,7 @@ type hlsMuxer struct {
lastRequestTime *int64
muxer *hls.Muxer
requests []*hlsMuxerRequest
bytesSent *uint64
// in
chRequest chan *hlsMuxerRequest
@ -179,6 +185,7 @@ func newHLSMuxer( @@ -179,6 +185,7 @@ func newHLSMuxer(
v := time.Now().UnixNano()
return &v
}(),
bytesSent: new(uint64),
chRequest: make(chan *hlsMuxerRequest),
chAPIHLSMuxersList: make(chan hlsServerAPIMuxersListSubReq),
}
@ -235,7 +242,10 @@ func (m *hlsMuxer) run() { @@ -235,7 +242,10 @@ func (m *hlsMuxer) run() {
case req := <-m.chRequest:
if isReady {
req.res <- m.handleRequest(req)
req.res <- hlsMuxerResponse{
muxer: m,
cb: m.handleRequest(req),
}
} else {
m.requests = append(m.requests, req)
}
@ -244,13 +254,17 @@ func (m *hlsMuxer) run() { @@ -244,13 +254,17 @@ func (m *hlsMuxer) run() {
req.data.Items[m.name] = hlsServerAPIMuxersListItem{
Created: m.created,
LastRequest: time.Unix(0, atomic.LoadInt64(m.lastRequestTime)),
BytesSent: atomic.LoadUint64(m.bytesSent),
}
close(req.res)
case <-innerReady:
isReady = true
for _, req := range m.requests {
req.res <- m.handleRequest(req)
req.res <- hlsMuxerResponse{
muxer: m,
cb: m.handleRequest(req),
}
}
m.requests = nil
@ -264,8 +278,11 @@ func (m *hlsMuxer) run() { @@ -264,8 +278,11 @@ func (m *hlsMuxer) run() {
m.ctxCancel()
for _, req := range m.requests {
req.res <- func() *hls.MuxerFileResponse {
return &hls.MuxerFileResponse{Status: http.StatusNotFound}
req.res <- hlsMuxerResponse{
muxer: m,
cb: func() *hls.MuxerFileResponse {
return &hls.MuxerFileResponse{Status: http.StatusNotFound}
},
}
}
@ -547,13 +564,20 @@ func (m *hlsMuxer) authenticate(ctx *gin.Context) error { @@ -547,13 +564,20 @@ func (m *hlsMuxer) authenticate(ctx *gin.Context) error {
return nil
}
func (m *hlsMuxer) addSentBytes(n uint64) {
atomic.AddUint64(m.bytesSent, n)
}
// request is called by hlsserver.Server (forwarded from ServeHTTP).
func (m *hlsMuxer) request(req *hlsMuxerRequest) {
select {
case m.chRequest <- req:
case <-m.ctx.Done():
req.res <- func() *hls.MuxerFileResponse {
return &hls.MuxerFileResponse{Status: http.StatusInternalServerError}
req.res <- hlsMuxerResponse{
muxer: m,
cb: func() *hls.MuxerFileResponse {
return &hls.MuxerFileResponse{Status: http.StatusInternalServerError}
},
}
}
}

13
internal/core/hls_server.go

@ -17,7 +17,6 @@ import ( @@ -17,7 +17,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/hls"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
@ -30,6 +29,7 @@ func (nilWriter) Write(p []byte) (int, error) { @@ -30,6 +29,7 @@ func (nilWriter) Write(p []byte) (int, error) {
type hlsServerAPIMuxersListItem struct {
Created time.Time `json:"created"`
LastRequest time.Time `json:"lastRequest"`
BytesSent uint64 `json:"bytesSent"`
}
type hlsServerAPIMuxersListData struct {
@ -311,19 +311,17 @@ func (s *hlsServer) onRequest(ctx *gin.Context) { @@ -311,19 +311,17 @@ func (s *hlsServer) onRequest(ctx *gin.Context) {
dir = strings.TrimSuffix(dir, "/")
cres := make(chan func() *hls.MuxerFileResponse)
hreq := &hlsMuxerRequest{
dir: dir,
file: fname,
ctx: ctx,
res: cres,
res: make(chan hlsMuxerResponse),
}
select {
case s.request <- hreq:
cb := <-cres
res := cb()
res1 := <-hreq.res
res := res1.cb()
for k, v := range res.Header {
ctx.Writer.Header().Set(k, v)
@ -332,7 +330,8 @@ func (s *hlsServer) onRequest(ctx *gin.Context) { @@ -332,7 +330,8 @@ func (s *hlsServer) onRequest(ctx *gin.Context) {
ctx.Writer.WriteHeader(res.Status)
if res.Body != nil {
io.Copy(ctx.Writer, res.Body)
n, _ := io.Copy(ctx.Writer, res.Body)
res1.muxer.addSentBytes(uint64(n))
}
case <-s.ctx.Done():

112
internal/core/metrics.go

@ -93,83 +93,68 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -93,83 +93,68 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
res := m.pathManager.apiPathsList()
if res.err == nil {
for name, p := range res.data.Items {
if p.SourceReady {
out += metric("paths{name=\""+name+"\",state=\"ready\"}", 1)
for name, i := range res.data.Items {
var state string
if i.SourceReady {
state = "ready"
} else {
out += metric("paths{name=\""+name+"\",state=\"notReady\"}", 1)
state = "notReady"
}
tags := "{name=\"" + name + "\",state=\"" + state + "\"}"
out += metric("paths"+tags, 1)
out += metric("paths_bytes_received"+tags, int64(i.BytesReceived))
}
}
if !interfaceIsEmpty(m.rtspServer) {
if !interfaceIsEmpty(m.rtspServer) { //nolint:dupl
func() {
res := m.rtspServer.apiConnsList()
if res.err == nil {
out += metric("rtsp_conns", int64(len(res.data.Items)))
for id, i := range res.data.Items {
tags := "{id=\"" + id + "\"}"
out += metric("rtsp_conns"+tags, 1)
out += metric("rtsp_conns_bytes_received"+tags, int64(i.BytesReceived))
out += metric("rtsp_conns_bytes_sent"+tags, int64(i.BytesSent))
}
}
}()
func() {
res := m.rtspServer.apiSessionsList()
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.data.Items {
switch i.State {
case "idle":
idleCount++
case "read":
readCount++
case "publish":
publishCount++
}
for id, i := range res.data.Items {
tags := "{id=\"" + id + "\",state=\"" + i.State + "\"}"
out += metric("rtsp_sessions"+tags, 1)
out += metric("rtsp_sessions_bytes_received"+tags, int64(i.BytesReceived))
out += metric("rtsp_sessions_bytes_sent"+tags, int64(i.BytesSent))
}
out += metric("rtsp_sessions{state=\"idle\"}",
idleCount)
out += metric("rtsp_sessions{state=\"read\"}",
readCount)
out += metric("rtsp_sessions{state=\"publish\"}",
publishCount)
}
}()
}
if !interfaceIsEmpty(m.rtspsServer) {
if !interfaceIsEmpty(m.rtspsServer) { //nolint:dupl
func() {
res := m.rtspsServer.apiConnsList()
if res.err == nil {
out += metric("rtsps_conns", int64(len(res.data.Items)))
for id, i := range res.data.Items {
tags := "{id=\"" + id + "\"}"
out += metric("rtsps_conns"+tags, 1)
out += metric("rtsps_conns_bytes_received"+tags, int64(i.BytesReceived))
out += metric("rtsps_conns_bytes_sent"+tags, int64(i.BytesSent))
}
}
}()
func() {
res := m.rtspsServer.apiSessionsList()
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.data.Items {
switch i.State {
case "idle":
idleCount++
case "read":
readCount++
case "publish":
publishCount++
}
for id, i := range res.data.Items {
tags := "{id=\"" + id + "\",state=\"" + i.State + "\"}"
out += metric("rtsps_sessions"+tags, 1)
out += metric("rtsps_sessions_bytes_received"+tags, int64(i.BytesReceived))
out += metric("rtsps_sessions_bytes_sent"+tags, int64(i.BytesSent))
}
out += metric("rtsps_sessions{state=\"idle\"}",
idleCount)
out += metric("rtsps_sessions{state=\"read\"}",
readCount)
out += metric("rtsps_sessions{state=\"publish\"}",
publishCount)
}
}()
}
@ -177,35 +162,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -177,35 +162,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
if !interfaceIsEmpty(m.rtmpServer) {
res := m.rtmpServer.apiConnsList()
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.data.Items {
switch i.State {
case "idle":
idleCount++
case "read":
readCount++
case "publish":
publishCount++
}
for id, i := range res.data.Items {
tags := "{id=\"" + id + "\",state=\"" + i.State + "\"}"
out += metric("rtmp_conns"+tags, 1)
out += metric("rtmp_conns_bytes_received"+tags, int64(i.BytesReceived))
out += metric("rtmp_conns_bytes_sent"+tags, int64(i.BytesSent))
}
out += metric("rtmp_conns{state=\"idle\"}",
idleCount)
out += metric("rtmp_conns{state=\"read\"}",
readCount)
out += metric("rtmp_conns{state=\"publish\"}",
publishCount)
}
}
if !interfaceIsEmpty(m.hlsServer) {
res := m.hlsServer.apiHLSMuxersList()
if res.err == nil {
for name := range res.data.Items {
out += metric("hls_muxers{name=\""+name+"\"}", 1)
for name, i := range res.data.Items {
tags := "{name=\"" + name + "\"}"
out += metric("hls_muxers"+tags, 1)
out += metric("hls_muxers_bytes_sent"+tags, int64(i.BytesSent))
}
}
}

57
internal/core/metrics_test.go

@ -1,12 +1,12 @@ @@ -1,12 +1,12 @@
package core
import (
"crypto/tls"
"io"
"net"
"net/http"
"net/url"
"os"
"strings"
"testing"
"github.com/aler9/gortsplib"
@ -40,12 +40,17 @@ func TestMetrics(t *testing.T) { @@ -40,12 +40,17 @@ func TestMetrics(t *testing.T) {
}
source := gortsplib.Client{}
err = source.StartPublishing("rtsp://localhost:8554/rtsp_path",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err = source2.StartPublishing("rtsps://localhost:8322/rtsps_path",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source2.Close()
u, err := url.Parse("rtmp://localhost:1935/rtmp_path")
require.NoError(t, err)
@ -88,27 +93,29 @@ func TestMetrics(t *testing.T) { @@ -88,27 +93,29 @@ func TestMetrics(t *testing.T) {
bo, err := io.ReadAll(res.Body)
require.NoError(t, err)
vals := make(map[string]string)
lines := strings.Split(string(bo), "\n")
for _, l := range lines[:len(lines)-1] {
fields := strings.Split(l, " ")
vals[fields[0]] = fields[1]
}
require.Equal(t, map[string]string{
"hls_muxers{name=\"rtsp_path\"}": "1",
"paths{name=\"rtsp_path\",state=\"ready\"}": "1",
"paths{name=\"rtmp_path\",state=\"ready\"}": "1",
"rtmp_conns{state=\"idle\"}": "0",
"rtmp_conns{state=\"publish\"}": "1",
"rtmp_conns{state=\"read\"}": "0",
"rtsp_conns": "1",
"rtsp_sessions{state=\"idle\"}": "0",
"rtsp_sessions{state=\"publish\"}": "1",
"rtsp_sessions{state=\"read\"}": "0",
"rtsps_conns": "0",
"rtsps_sessions{state=\"idle\"}": "0",
"rtsps_sessions{state=\"publish\"}": "0",
"rtsps_sessions{state=\"read\"}": "0",
}, vals)
require.Regexp(t,
`^paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`rtsp_conns\{id=".*?"\} 1`+"\n"+
`rtsp_conns_bytes_received\{id=".*?"\} [0-9]+`+"\n"+
`rtsp_conns_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+
`rtsp_sessions\{id=".*?",state="publish"\} 1`+"\n"+
`rtsp_sessions_bytes_received\{id=".*?",state="publish"\} 0`+"\n"+
`rtsp_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`rtsps_conns\{id=".*?"\} 1`+"\n"+
`rtsps_conns_bytes_received\{id=".*?"\} [0-9]+`+"\n"+
`rtsps_conns_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+
`rtsps_sessions\{id=".*?",state="publish"\} 1`+"\n"+
`rtsps_sessions_bytes_received\{id=".*?",state="publish"\} 0`+"\n"+
`rtsps_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+
`rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`hls_muxers\{name="rtsp_path"\} 1`+"\n"+
`hls_muxers_bytes_sent\{name="rtsp_path"\} [0-9]+`+"\n"+"$",
string(bo))
}

21
internal/core/path.go

@ -7,6 +7,7 @@ import ( @@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
@ -178,12 +179,13 @@ type pathPublisherStopReq struct { @@ -178,12 +179,13 @@ type pathPublisherStopReq struct {
}
type pathAPIPathsListItem struct {
ConfName string `json:"confName"`
Conf *conf.PathConf `json:"conf"`
Source interface{} `json:"source"`
SourceReady bool `json:"sourceReady"`
Tracks []string `json:"tracks"`
Readers []interface{} `json:"readers"`
ConfName string `json:"confName"`
Conf *conf.PathConf `json:"conf"`
Source interface{} `json:"source"`
SourceReady bool `json:"sourceReady"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
Readers []interface{} `json:"readers"`
}
type pathAPIPathsListData struct {
@ -221,6 +223,7 @@ type path struct { @@ -221,6 +223,7 @@ type path struct {
ctx context.Context
ctxCancel func()
source source
bytesReceived *uint64
stream *stream
readers map[reader]pathReaderState
describeRequestsOnHold []pathDescribeReq
@ -279,6 +282,7 @@ func newPath( @@ -279,6 +282,7 @@ func newPath(
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
bytesReceived: new(uint64),
readers: make(map[reader]pathReaderState),
onDemandStaticSourceReadyTimer: newEmptyTimer(),
onDemandStaticSourceCloseTimer: newEmptyTimer(),
@ -663,8 +667,8 @@ func (pa *path) onDemandPublisherStop() { @@ -663,8 +667,8 @@ func (pa *path) onDemandPublisherStop() {
}
}
func (pa *path) sourceSetReady(tracks gortsplib.Tracks, generateRTPPackets bool) error {
stream, err := newStream(tracks, generateRTPPackets)
func (pa *path) sourceSetReady(tracks gortsplib.Tracks, allocateEncoder bool) error {
stream, err := newStream(tracks, allocateEncoder, pa.bytesReceived)
if err != nil {
return err
}
@ -957,6 +961,7 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) { @@ -957,6 +961,7 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) {
}
return sourceTrackNames(pa.stream.tracks())
}(),
BytesReceived: atomic.LoadUint64(pa.bytesReceived),
Readers: func() []interface{} {
ret := []interface{}{}
for r := range pa.readers {

10
internal/core/rtmp_server.go

@ -14,9 +14,11 @@ import ( @@ -14,9 +14,11 @@ import (
)
type rtmpServerAPIConnsListItem struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type rtmpServerAPIConnsListData struct {
@ -236,6 +238,8 @@ outer: @@ -236,6 +238,8 @@ outer:
}
return "idle"
}(),
BytesReceived: c.conn.BytesReceived(),
BytesSent: c.conn.BytesSent(),
}
}

22
internal/core/rtsp_server.go

@ -19,8 +19,10 @@ import ( @@ -19,8 +19,10 @@ import (
)
type rtspServerAPIConnsListItem struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type rtspServerAPIConnsListData struct {
@ -33,9 +35,11 @@ type rtspServerAPIConnsListRes struct { @@ -33,9 +35,11 @@ type rtspServerAPIConnsListRes struct {
}
type rtspServerAPISessionsListItem struct {
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type rtspServerAPISessionsListData struct {
@ -374,8 +378,10 @@ func (s *rtspServer) apiConnsList() rtspServerAPIConnsListRes { @@ -374,8 +378,10 @@ func (s *rtspServer) apiConnsList() rtspServerAPIConnsListRes {
for _, c := range s.conns {
data.Items[c.uuid.String()] = rtspServerAPIConnsListItem{
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
BytesReceived: c.conn.BytesReceived(),
BytesSent: c.conn.BytesSent(),
}
}
@ -413,6 +419,8 @@ func (s *rtspServer) apiSessionsList() rtspServerAPISessionsListRes { @@ -413,6 +419,8 @@ func (s *rtspServer) apiSessionsList() rtspServerAPISessionsListRes {
}
return "idle"
}(),
BytesReceived: s.session.BytesReceived(),
BytesSent: s.session.BytesSent(),
}
}

32
internal/core/rtsp_session.go

@ -34,7 +34,7 @@ type rtspSessionParent interface { @@ -34,7 +34,7 @@ type rtspSessionParent interface {
type rtspSession struct {
isTLS bool
protocols map[conf.Protocol]struct{}
ss *gortsplib.ServerSession
session *gortsplib.ServerSession
author *gortsplib.ServerConn
externalCmdPool *externalcmd.Pool
pathManager rtspSessionPathManager
@ -52,7 +52,7 @@ type rtspSession struct { @@ -52,7 +52,7 @@ type rtspSession struct {
func newRTSPSession(
isTLS bool,
protocols map[conf.Protocol]struct{},
ss *gortsplib.ServerSession,
session *gortsplib.ServerSession,
sc *gortsplib.ServerConn,
externalCmdPool *externalcmd.Pool,
pathManager rtspSessionPathManager,
@ -61,7 +61,7 @@ func newRTSPSession( @@ -61,7 +61,7 @@ func newRTSPSession(
s := &rtspSession{
isTLS: isTLS,
protocols: protocols,
ss: ss,
session: session,
author: sc,
externalCmdPool: externalCmdPool,
pathManager: pathManager,
@ -77,7 +77,7 @@ func newRTSPSession( @@ -77,7 +77,7 @@ func newRTSPSession(
// Close closes a Session.
func (s *rtspSession) close() {
s.ss.Close()
s.session.Close()
}
// isRTSPSession implements pathRTSPSession.
@ -100,7 +100,7 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{} @@ -100,7 +100,7 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{}
// onClose is called by rtspServer.
func (s *rtspSession) onClose(err error) {
if s.ss.State() == gortsplib.ServerSessionStatePlay {
if s.session.State() == gortsplib.ServerSessionStatePlay {
if s.onReadCmd != nil {
s.onReadCmd.Close()
s.onReadCmd = nil
@ -108,7 +108,7 @@ func (s *rtspSession) onClose(err error) { @@ -108,7 +108,7 @@ func (s *rtspSession) onClose(err error) {
}
}
switch s.ss.State() {
switch s.session.State() {
case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay:
s.path.readerRemove(pathReaderRemoveReq{author: s})
@ -181,7 +181,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -181,7 +181,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
}
}
switch s.ss.State() {
switch s.session.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play
res := s.pathManager.readerAdd(pathReaderAddReq{
author: s,
@ -247,19 +247,19 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -247,19 +247,19 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
h := make(base.Header)
if s.ss.State() == gortsplib.ServerSessionStatePrePlay {
if s.session.State() == gortsplib.ServerSessionStatePrePlay {
s.path.readerStart(pathReaderStartReq{author: s})
tracks := make(gortsplib.Tracks, len(s.ss.SetuppedTracks()))
tracks := make(gortsplib.Tracks, len(s.session.SetuppedTracks()))
n := 0
for id := range s.ss.SetuppedTracks() {
for id := range s.session.SetuppedTracks() {
tracks[n] = s.stream.tracks()[id]
n++
}
s.log(logger.Info, "is reading from path '%s', with %s, %s",
s.path.Name(),
s.ss.SetuppedTransport(),
s.session.SetuppedTransport(),
sourceTrackInfo(tracks))
if s.path.Conf().RunOnRead != "" {
@ -289,7 +289,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -289,7 +289,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
res := s.path.publisherStart(pathPublisherStartReq{
author: s,
tracks: s.ss.AnnouncedTracks(),
tracks: s.session.AnnouncedTracks(),
generateRTPPackets: false,
})
if res.err != nil {
@ -300,8 +300,8 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -300,8 +300,8 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
s.log(logger.Info, "is publishing to path '%s', with %s, %s",
s.path.Name(),
s.ss.SetuppedTransport(),
sourceTrackInfo(s.ss.AnnouncedTracks()))
s.session.SetuppedTransport(),
sourceTrackInfo(s.session.AnnouncedTracks()))
s.stream = res.stream
@ -316,7 +316,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -316,7 +316,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
// onPause is called by rtspServer.
func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) {
switch s.ss.State() {
switch s.session.State() {
case gortsplib.ServerSessionStatePlay:
if s.onReadCmd != nil {
s.log(logger.Info, "runOnRead command stopped")
@ -381,7 +381,7 @@ func (s *rtspSession) apiSourceDescribe() interface{} { @@ -381,7 +381,7 @@ func (s *rtspSession) apiSourceDescribe() interface{} {
func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
var err error
switch s.ss.AnnouncedTracks()[ctx.TrackID].(type) {
switch s.session.AnnouncedTracks()[ctx.TrackID].(type) {
case *gortsplib.TrackH264:
err = s.stream.writeData(&dataH264{
trackID: ctx.TrackID,

10
internal/core/stream.go

@ -2,6 +2,7 @@ package core @@ -2,6 +2,7 @@ package core
import (
"sync"
"sync/atomic"
"github.com/aler9/gortsplib"
)
@ -51,13 +52,19 @@ func (m *streamNonRTSPReadersMap) hasReaders() bool { @@ -51,13 +52,19 @@ func (m *streamNonRTSPReadersMap) hasReaders() bool {
}
type stream struct {
bytesReceived *uint64
nonRTSPReaders *streamNonRTSPReadersMap
rtspStream *gortsplib.ServerStream
streamTracks []streamTrack
}
func newStream(tracks gortsplib.Tracks, generateRTPPackets bool) (*stream, error) {
func newStream(
tracks gortsplib.Tracks,
generateRTPPackets bool,
bytesReceived *uint64,
) (*stream, error) {
s := &stream{
bytesReceived: bytesReceived,
nonRTSPReaders: newStreamNonRTSPReadersMap(),
rtspStream: gortsplib.NewServerStream(tracks),
}
@ -104,6 +111,7 @@ func (s *stream) writeData(data data) error { @@ -104,6 +111,7 @@ func (s *stream) writeData(data data) error {
// forward RTP packets to RTSP readers
for _, pkt := range data.getRTPPackets() {
atomic.AddUint64(s.bytesReceived, uint64(pkt.MarshalSize()))
s.rtspStream.WritePacketRTP(data.getTrackID(), pkt, data.getPTSEqualsDTS())
}

4
internal/core/streamtrack_h264.go

@ -70,13 +70,13 @@ type streamTrackH264 struct { @@ -70,13 +70,13 @@ type streamTrackH264 struct {
func newStreamTrackH264(
track *gortsplib.TrackH264,
generateRTPPackets bool,
allocateEncoder bool,
) *streamTrackH264 {
t := &streamTrackH264{
track: track,
}
if generateRTPPackets {
if allocateEncoder {
t.encoder = &rtph264.Encoder{PayloadType: 96}
t.encoder.Init()
}

4
internal/core/streamtrack_mpeg4audio.go

@ -15,13 +15,13 @@ type streamTrackMPEG4Audio struct { @@ -15,13 +15,13 @@ type streamTrackMPEG4Audio struct {
func newStreamTrackMPEG4Audio(
track *gortsplib.TrackMPEG4Audio,
generateRTPPackets bool,
allocateEncoder bool,
) *streamTrackMPEG4Audio {
t := &streamTrackMPEG4Audio{
track: track,
}
if generateRTPPackets {
if allocateEncoder {
t.encoder = &rtpmpeg4audio.Encoder{
PayloadType: 96,
SampleRate: track.ClockRate(),

38
internal/rtmp/bytecounter/reader.go

@ -1,42 +1,36 @@ @@ -1,42 +1,36 @@
package bytecounter
import (
"bufio"
"io"
"sync/atomic"
)
type readerInner struct {
r io.Reader
count uint32
}
func (r *readerInner) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
r.count += uint32(n)
return n, err
}
// Reader allows to count read bytes.
type Reader struct {
ri *readerInner
*bufio.Reader
r io.Reader
count uint64
}
// NewReader allocates a Reader.
func NewReader(r io.Reader) *Reader {
ri := &readerInner{r: r}
return &Reader{
ri: ri,
Reader: bufio.NewReader(ri),
r: r,
}
}
// Count returns read bytes.
func (r Reader) Count() uint32 {
return r.ri.count
// Read implements io.Reader.
func (r *Reader) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
atomic.AddUint64(&r.count, uint64(n))
return n, err
}
// Count returns received bytes.
func (r *Reader) Count() uint64 {
return atomic.LoadUint64(&r.count)
}
// SetCount sets read bytes.
func (r *Reader) SetCount(v uint32) {
r.ri.count = v
func (r *Reader) SetCount(v uint64) {
atomic.StoreUint64(&r.count, v)
}

2
internal/rtmp/bytecounter/reader_test.go

@ -19,5 +19,5 @@ func TestReader(t *testing.T) { @@ -19,5 +19,5 @@ func TestReader(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 64, n)
require.Equal(t, uint32(100+1024), r.Count())
require.Equal(t, uint64(100+64), r.Count())
}

17
internal/rtmp/bytecounter/writer.go

@ -2,12 +2,13 @@ package bytecounter @@ -2,12 +2,13 @@ package bytecounter
import (
"io"
"sync/atomic"
)
// Writer allows to count written bytes.
type Writer struct {
w io.Writer
count uint32
count uint64
}
// NewWriter allocates a Writer.
@ -20,16 +21,16 @@ func NewWriter(w io.Writer) *Writer { @@ -20,16 +21,16 @@ func NewWriter(w io.Writer) *Writer {
// Write implements io.Writer.
func (w *Writer) Write(p []byte) (int, error) {
n, err := w.w.Write(p)
w.count += uint32(n)
atomic.AddUint64(&w.count, uint64(n))
return n, err
}
// Count returns written bytes.
func (w Writer) Count() uint32 {
return w.count
// Count returns sent bytes.
func (w *Writer) Count() uint64 {
return atomic.LoadUint64(&w.count)
}
// SetCount sets written bytes.
func (w *Writer) SetCount(v uint32) {
w.count = v
// SetCount sets sent bytes.
func (w *Writer) SetCount(v uint64) {
atomic.StoreUint64(&w.count, v)
}

2
internal/rtmp/bytecounter/writer_test.go

@ -14,5 +14,5 @@ func TestWriter(t *testing.T) { @@ -14,5 +14,5 @@ func TestWriter(t *testing.T) {
w.SetCount(100)
w.Write(bytes.Repeat([]byte{0x01}, 64))
require.Equal(t, uint32(100+64), w.Count())
require.Equal(t, uint64(100+64), w.Count())
}

21
internal/rtmp/conn.go

@ -114,10 +114,19 @@ type Conn struct { @@ -114,10 +114,19 @@ type Conn struct {
// NewConn initializes a connection.
func NewConn(rw io.ReadWriter) *Conn {
c := &Conn{}
c.bc = bytecounter.NewReadWriter(rw)
c.mrw = message.NewReadWriter(c.bc, false)
return c
return &Conn{
bc: bytecounter.NewReadWriter(rw),
}
}
// BytesReceived returns the number of bytes received.
func (c *Conn) BytesReceived() uint64 {
return c.bc.Reader.Count()
}
// BytesSent returns the number of bytes sent.
func (c *Conn) BytesSent() uint64 {
return c.bc.Writer.Count()
}
func (c *Conn) readCommand() (*message.MsgCommandAMF0, error) {
@ -161,6 +170,8 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -161,6 +170,8 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
return err
}
c.mrw = message.NewReadWriter(c.bc, false)
err = c.mrw.Write(&message.MsgSetWindowAckSize{
Value: 2500000,
})
@ -319,6 +330,8 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -319,6 +330,8 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
return nil, false, err
}
c.mrw = message.NewReadWriter(c.bc, false)
cmd, err := c.readCommand()
if err != nil {
return nil, false, err

8
internal/rtmp/conn_test.go

@ -244,6 +244,14 @@ func TestInitializeClient(t *testing.T) { @@ -244,6 +244,14 @@ func TestInitializeClient(t *testing.T) {
err = conn.InitializeClient(u, ca == "publish")
require.NoError(t, err)
if ca == "read" {
require.Equal(t, uint64(3421), conn.BytesReceived())
require.Equal(t, uint64(3409), conn.BytesSent())
} else {
require.Equal(t, uint64(3427), conn.BytesReceived())
require.Equal(t, uint64(3466), conn.BytesSent())
}
<-done
})
}

13
internal/rtmp/rawmessage/reader.go

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package rawmessage
import (
"bufio"
"errors"
"fmt"
"time"
@ -22,14 +23,14 @@ type readerChunkStream struct { @@ -22,14 +23,14 @@ type readerChunkStream struct {
}
func (rc *readerChunkStream) readChunk(c chunk.Chunk, chunkBodySize uint32) error {
err := c.Read(rc.mr.r, chunkBodySize)
err := c.Read(rc.mr.br, chunkBodySize)
if err != nil {
return err
}
// check if an ack is needed
if rc.mr.ackWindowSize != 0 {
count := rc.mr.r.Count()
count := uint32(rc.mr.r.Count())
diff := count - rc.mr.lastAckCount
if diff > (rc.mr.ackWindowSize) {
@ -210,6 +211,7 @@ type Reader struct { @@ -210,6 +211,7 @@ type Reader struct {
r *bytecounter.Reader
onAckNeeded func(uint32) error
br *bufio.Reader
chunkSize uint32
ackWindowSize uint32
lastAckCount uint32
@ -223,8 +225,11 @@ type Reader struct { @@ -223,8 +225,11 @@ type Reader struct {
// NewReader allocates a Reader.
func NewReader(r *bytecounter.Reader, onAckNeeded func(uint32) error) *Reader {
br := bufio.NewReader(r)
return &Reader{
r: r,
br: br,
onAckNeeded: onAckNeeded,
chunkSize: 128,
chunkStreams: make(map[byte]*readerChunkStream),
@ -244,7 +249,7 @@ func (r *Reader) SetWindowAckSize(v uint32) { @@ -244,7 +249,7 @@ func (r *Reader) SetWindowAckSize(v uint32) {
// Read reads a Message.
func (r *Reader) Read() (*Message, error) {
for {
byt, err := r.r.ReadByte()
byt, err := r.br.ReadByte()
if err != nil {
return nil, err
}
@ -258,7 +263,7 @@ func (r *Reader) Read() (*Message, error) { @@ -258,7 +263,7 @@ func (r *Reader) Read() (*Message, error) {
r.chunkStreams[chunkStreamID] = rc
}
r.r.UnreadByte()
r.br.UnreadByte()
msg, err := rc.readMessage(typ)
if err != nil {

9
internal/rtmp/rawmessage/reader_test.go

@ -198,8 +198,7 @@ func TestReader(t *testing.T) { @@ -198,8 +198,7 @@ func TestReader(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
bcr := bytecounter.NewReader(&buf)
r := NewReader(bcr, func(count uint32) error {
r := NewReader(bytecounter.NewReader(&buf), func(count uint32) error {
return nil
})
@ -224,14 +223,14 @@ func TestReaderAcknowledge(t *testing.T) { @@ -224,14 +223,14 @@ func TestReaderAcknowledge(t *testing.T) {
onAckCalled := make(chan struct{})
var buf bytes.Buffer
bcr := bytecounter.NewReader(&buf)
r := NewReader(bcr, func(count uint32) error {
bc := bytecounter.NewReader(&buf)
r := NewReader(bc, func(count uint32) error {
close(onAckCalled)
return nil
})
if ca == "overflow" {
bcr.SetCount(4294967096)
bc.SetCount(4294967096)
r.lastAckCount = 4294967096
}

2
internal/rtmp/rawmessage/writer.go

@ -20,7 +20,7 @@ type writerChunkStream struct { @@ -20,7 +20,7 @@ type writerChunkStream struct {
func (wc *writerChunkStream) writeChunk(c chunk.Chunk) error {
// check if we received an acknowledge
if wc.mw.checkAcknowledge && wc.mw.ackWindowSize != 0 {
diff := wc.mw.w.Count() - wc.mw.ackValue
diff := uint32(wc.mw.w.Count()) - wc.mw.ackValue
if diff > (wc.mw.ackWindowSize * 3 / 2) {
return fmt.Errorf("no acknowledge received within window")

Loading…
Cancel
Save