Browse Source

fix crash when requesting metrics and RTMP, SRT or WebRTC servers are not present anymore (#2782)

pull/2783/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
ed72fa7db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 68
      internal/core/core.go
  2. 15
      internal/core/hls_manager.go
  3. 22
      internal/core/metrics.go
  4. 27
      internal/core/metrics_test.go
  5. 11
      internal/core/path_manager.go
  6. 11
      internal/core/rtmp_server.go
  7. 19
      internal/core/rtsp_server.go
  8. 9
      internal/core/srt_server.go
  9. 5
      internal/core/webrtc_manager.go

68
internal/core/core.go

@ -313,9 +313,12 @@ func (p *Core) createResources(initial bool) error { @@ -313,9 +313,12 @@ func (p *Core) createResources(initial bool) error {
p.conf.UDPMaxPayloadSize,
p.conf.Paths,
p.externalCmdPool,
p.metrics,
p,
)
if p.metrics != nil {
p.metrics.setPathManager(p.pathManager)
}
}
if p.conf.RTSP &&
@ -347,13 +350,16 @@ func (p *Core) createResources(initial bool) error { @@ -347,13 +350,16 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)
if err != nil {
return err
}
if p.metrics != nil {
p.metrics.setRTSPServer(p.rtspServer)
}
}
if p.conf.RTSP &&
@ -382,13 +388,16 @@ func (p *Core) createResources(initial bool) error { @@ -382,13 +388,16 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)
if err != nil {
return err
}
if p.metrics != nil {
p.metrics.setRTSPSServer(p.rtspsServer)
}
}
if p.conf.RTMP &&
@ -408,13 +417,16 @@ func (p *Core) createResources(initial bool) error { @@ -408,13 +417,16 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)
if err != nil {
return err
}
if p.metrics != nil {
p.metrics.setRTMPServer(p.rtmpServer)
}
}
if p.conf.RTMP &&
@ -434,7 +446,6 @@ func (p *Core) createResources(initial bool) error { @@ -434,7 +446,6 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)
@ -463,12 +474,17 @@ func (p *Core) createResources(initial bool) error { @@ -463,12 +474,17 @@ func (p *Core) createResources(initial bool) error {
p.conf.ReadTimeout,
p.conf.WriteQueueSize,
p.pathManager,
p.metrics,
p,
)
if err != nil {
return err
}
p.pathManager.setHLSManager(p.hlsManager)
if p.metrics != nil {
p.metrics.setHLSManager(p.hlsManager)
}
}
if p.conf.WebRTC &&
@ -490,7 +506,6 @@ func (p *Core) createResources(initial bool) error { @@ -490,7 +506,6 @@ func (p *Core) createResources(initial bool) error {
ICEServers: p.conf.WebRTCICEServers2,
ExternalCmdPool: p.externalCmdPool,
PathManager: p.pathManager,
Metrics: p.metrics,
Parent: p,
}
err = p.webRTCManager.initialize()
@ -498,6 +513,10 @@ func (p *Core) createResources(initial bool) error { @@ -498,6 +513,10 @@ func (p *Core) createResources(initial bool) error {
p.webRTCManager = nil
return err
}
if p.metrics != nil {
p.metrics.setWebRTCManager(p.webRTCManager)
}
}
if p.conf.SRT &&
@ -513,13 +532,16 @@ func (p *Core) createResources(initial bool) error { @@ -513,13 +532,16 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)
if err != nil {
return err
}
if p.metrics != nil {
p.metrics.setSRTServer(p.srtServer)
}
}
if p.conf.API &&
@ -747,16 +769,30 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -747,16 +769,30 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
}
if closeSRTServer && p.srtServer != nil {
if p.metrics != nil {
p.metrics.setSRTServer(nil)
}
p.srtServer.close()
p.srtServer = nil
}
if closeWebRTCManager && p.webRTCManager != nil {
if p.metrics != nil {
p.metrics.setWebRTCManager(nil)
}
p.webRTCManager.close()
p.webRTCManager = nil
}
if closeHLSManager && p.hlsManager != nil {
if p.metrics != nil {
p.metrics.setHLSManager(nil)
}
p.pathManager.setHLSManager(nil)
p.hlsManager.close()
p.hlsManager = nil
}
@ -767,21 +803,37 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -767,21 +803,37 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
}
if closeRTMPServer && p.rtmpServer != nil {
if p.metrics != nil {
p.metrics.setRTMPServer(nil)
}
p.rtmpServer.close()
p.rtmpServer = nil
}
if closeRTSPSServer && p.rtspsServer != nil {
if p.metrics != nil {
p.metrics.setRTSPSServer(nil)
}
p.rtspsServer.close()
p.rtspsServer = nil
}
if closeRTSPServer && p.rtspServer != nil {
if p.metrics != nil {
p.metrics.setRTSPServer(nil)
}
p.rtspServer.close()
p.rtspServer = nil
}
if closePathManager && p.pathManager != nil {
if p.metrics != nil {
p.metrics.setPathManager(nil)
}
p.pathManager.close()
p.pathManager = nil
}

15
internal/core/hls_manager.go

@ -45,7 +45,6 @@ type hlsManager struct { @@ -45,7 +45,6 @@ type hlsManager struct {
directory string
writeQueueSize int
pathManager *pathManager
metrics *metrics
parent hlsManagerParent
ctx context.Context
@ -81,7 +80,6 @@ func newHLSManager( @@ -81,7 +80,6 @@ func newHLSManager(
readTimeout conf.StringDuration,
writeQueueSize int,
pathManager *pathManager,
metrics *metrics,
parent hlsManagerParent,
) (*hlsManager, error) {
ctx, ctxCancel := context.WithCancel(context.Background())
@ -98,7 +96,6 @@ func newHLSManager( @@ -98,7 +96,6 @@ func newHLSManager(
writeQueueSize: writeQueueSize,
pathManager: pathManager,
parent: parent,
metrics: metrics,
ctx: ctx,
ctxCancel: ctxCancel,
muxers: make(map[string]*hlsMuxer),
@ -129,12 +126,6 @@ func newHLSManager( @@ -129,12 +126,6 @@ func newHLSManager(
m.Log(logger.Info, "listener opened on "+address)
m.pathManager.setHLSManager(m)
if m.metrics != nil {
m.metrics.setHLSManager(m)
}
m.wg.Add(1)
go m.run()
@ -223,12 +214,6 @@ outer: @@ -223,12 +214,6 @@ outer:
m.ctxCancel()
m.httpServer.close()
m.pathManager.setHLSManager(nil)
if m.metrics != nil {
m.metrics.setHLSManager(nil)
}
}
func (m *hlsManager) createMuxer(pathName string, remoteAddr string) *hlsMuxer {

22
internal/core/metrics.go

@ -237,50 +237,50 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -237,50 +237,50 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
io.WriteString(ctx.Writer, out) //nolint:errcheck
}
// pathManagerSet is called by pathManager.
func (m *metrics) pathManagerSet(s apiPathManager) {
// setPathManager is called by core.
func (m *metrics) setPathManager(s apiPathManager) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.pathManager = s
}
// setHLSManager is called by hlsManager.
// setHLSManager is called by core.
func (m *metrics) setHLSManager(s apiHLSManager) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.hlsManager = s
}
// setRTSPServer is called by rtspServer (plain).
// setRTSPServer is called by core.
func (m *metrics) setRTSPServer(s apiRTSPServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rtspServer = s
}
// setRTSPSServer is called by rtspServer (tls).
// setRTSPSServer is called by core.
func (m *metrics) setRTSPSServer(s apiRTSPServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rtspsServer = s
}
// rtmpServerSet is called by rtmpServer.
func (m *metrics) rtmpServerSet(s apiRTMPServer) {
// setRTMPServer is called by core.
func (m *metrics) setRTMPServer(s apiRTMPServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.rtmpServer = s
}
// srtServerSet is called by srtServer.
func (m *metrics) srtServerSet(s apiSRTServer) {
// setSRTServer is called by core.
func (m *metrics) setSRTServer(s apiSRTServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.srtServer = s
}
// webRTCManagerSet is called by webRTCManager.
func (m *metrics) webRTCManagerSet(s apiWebRTCManager) {
// setWebRTCManager is called by core.
func (m *metrics) setWebRTCManager(s apiWebRTCManager) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.webRTCManager = s

27
internal/core/metrics_test.go

@ -16,7 +16,7 @@ import ( @@ -16,7 +16,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/datarhei/gosrt"
srt "github.com/datarhei/gosrt"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
@ -33,7 +33,8 @@ func TestMetrics(t *testing.T) { @@ -33,7 +33,8 @@ func TestMetrics(t *testing.T) {
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
p, ok := newInstance("hlsAlwaysRemux: yes\n" +
p, ok := newInstance("api: yes\n" +
"hlsAlwaysRemux: yes\n" +
"metrics: yes\n" +
"webrtcServerCert: " + serverCertFpath + "\n" +
"webrtcServerKey: " + serverKeyFpath + "\n" +
@ -47,6 +48,7 @@ func TestMetrics(t *testing.T) { @@ -47,6 +48,7 @@ func TestMetrics(t *testing.T) {
hc := &http.Client{Transport: &http.Transport{}}
t.Run("initial", func(t *testing.T) {
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
require.Equal(t, `paths 0
@ -74,7 +76,9 @@ webrtc_sessions 0 @@ -74,7 +76,9 @@ webrtc_sessions 0
webrtc_sessions_bytes_received 0
webrtc_sessions_bytes_sent 0
`, string(bo))
})
t.Run("with data", func(t *testing.T) {
terminate := make(chan struct{})
var wg sync.WaitGroup
wg.Add(5)
@ -197,7 +201,7 @@ webrtc_sessions_bytes_sent 0 @@ -197,7 +201,7 @@ webrtc_sessions_bytes_sent 0
time.Sleep(500 * time.Millisecond)
bo = httpPullFile(t, hc, "http://localhost:9998/metrics")
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
require.Regexp(t,
`^paths\{name=".*?",state="ready"\} 1`+"\n"+
@ -251,4 +255,21 @@ webrtc_sessions_bytes_sent 0 @@ -251,4 +255,21 @@ webrtc_sessions_bytes_sent 0
close(terminate)
wg.Wait()
})
t.Run("servers deleted", func(t *testing.T) {
httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/global/patch", map[string]interface{}{
"rtsp": false,
"rtmp": false,
"srt": false,
"hls": false,
"webrtc": false,
}, nil)
time.Sleep(500 * time.Millisecond)
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
require.Equal(t, "paths 0\n", string(bo))
})
}

11
internal/core/path_manager.go

@ -78,7 +78,6 @@ type pathManager struct { @@ -78,7 +78,6 @@ type pathManager struct {
udpMaxPayloadSize int
pathConfs map[string]*conf.Path
externalCmdPool *externalcmd.Pool
metrics *metrics
parent pathManagerParent
ctx context.Context
@ -112,7 +111,6 @@ func newPathManager( @@ -112,7 +111,6 @@ func newPathManager(
udpMaxPayloadSize int,
pathConfs map[string]*conf.Path,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
parent pathManagerParent,
) *pathManager {
ctx, ctxCancel := context.WithCancel(context.Background())
@ -127,7 +125,6 @@ func newPathManager( @@ -127,7 +125,6 @@ func newPathManager(
udpMaxPayloadSize: udpMaxPayloadSize,
pathConfs: pathConfs,
externalCmdPool: externalCmdPool,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
@ -152,10 +149,6 @@ func newPathManager( @@ -152,10 +149,6 @@ func newPathManager(
}
}
if pm.metrics != nil {
pm.metrics.pathManagerSet(pm)
}
pm.Log(logger.Debug, "path manager created")
pm.wg.Add(1)
@ -220,10 +213,6 @@ outer: @@ -220,10 +213,6 @@ outer:
}
pm.ctxCancel()
if pm.metrics != nil {
pm.metrics.pathManagerSet(nil)
}
}
func (pm *pathManager) doReloadConf(newPaths map[string]*conf.Path) {

11
internal/core/rtmp_server.go

@ -59,7 +59,6 @@ type rtmpServer struct { @@ -59,7 +59,6 @@ type rtmpServer struct {
runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent rtmpServerParent
@ -91,7 +90,6 @@ func newRTMPServer( @@ -91,7 +90,6 @@ func newRTMPServer(
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
pathManager *pathManager,
parent rtmpServerParent,
) (*rtmpServer, error) {
@ -124,7 +122,6 @@ func newRTMPServer( @@ -124,7 +122,6 @@ func newRTMPServer(
runOnDisconnect: runOnDisconnect,
isTLS: isTLS,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
@ -141,10 +138,6 @@ func newRTMPServer( @@ -141,10 +138,6 @@ func newRTMPServer(
s.Log(logger.Info, "listener opened on %s", address)
if s.metrics != nil {
s.metrics.rtmpServerSet(s)
}
newRTMPListener(
s.ln,
&s.wg,
@ -247,10 +240,6 @@ outer: @@ -247,10 +240,6 @@ outer:
s.ctxCancel()
s.ln.Close()
if s.metrics != nil {
s.metrics.rtmpServerSet(s)
}
}
func (s *rtmpServer) findConnByUUID(uuid uuid.UUID) *rtmpConn {

19
internal/core/rtsp_server.go

@ -51,7 +51,6 @@ type rtspServer struct { @@ -51,7 +51,6 @@ type rtspServer struct {
runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent rtspServerParent
@ -86,7 +85,6 @@ func newRTSPServer( @@ -86,7 +85,6 @@ func newRTSPServer(
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
pathManager *pathManager,
parent rtspServerParent,
) (*rtspServer, error) {
@ -102,7 +100,6 @@ func newRTSPServer( @@ -102,7 +100,6 @@ func newRTSPServer(
runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
@ -146,14 +143,6 @@ func newRTSPServer( @@ -146,14 +143,6 @@ func newRTSPServer(
s.Log(logger.Info, "listener opened on %s", printAddresses(s.srv))
if metrics != nil {
if !isTLS {
metrics.setRTSPServer(s)
} else {
metrics.setRTSPSServer(s)
}
}
s.wg.Add(1)
go s.run()
@ -205,14 +194,6 @@ outer: @@ -205,14 +194,6 @@ outer:
}
s.ctxCancel()
if s.metrics != nil {
if !s.isTLS {
s.metrics.setRTSPServer(nil)
} else {
s.metrics.setRTSPSServer(nil)
}
}
}
// OnConnOpen implements gortsplib.ServerHandlerOnConnOpen.

9
internal/core/srt_server.go

@ -7,7 +7,7 @@ import ( @@ -7,7 +7,7 @@ import (
"sync"
"time"
"github.com/datarhei/gosrt"
srt "github.com/datarhei/gosrt"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/conf"
@ -67,7 +67,6 @@ type srtServer struct { @@ -67,7 +67,6 @@ type srtServer struct {
runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent srtServerParent
@ -97,7 +96,6 @@ func newSRTServer( @@ -97,7 +96,6 @@ func newSRTServer(
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
pathManager *pathManager,
parent srtServerParent,
) (*srtServer, error) {
@ -122,7 +120,6 @@ func newSRTServer( @@ -122,7 +120,6 @@ func newSRTServer(
runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
@ -139,10 +136,6 @@ func newSRTServer( @@ -139,10 +136,6 @@ func newSRTServer(
s.Log(logger.Info, "listener opened on "+address+" (UDP)")
if s.metrics != nil {
s.metrics.srtServerSet(s)
}
newSRTListener(
s.ln,
&s.wg,

5
internal/core/webrtc_manager.go

@ -180,7 +180,6 @@ type webRTCManager struct { @@ -180,7 +180,6 @@ type webRTCManager struct {
ICEServers []conf.WebRTCICEServer
ExternalCmdPool *externalcmd.Pool
PathManager *pathManager
Metrics *metrics
Parent webRTCManagerParent
ctx context.Context
@ -284,10 +283,6 @@ func (m *webRTCManager) initialize() error { @@ -284,10 +283,6 @@ func (m *webRTCManager) initialize() error {
}
m.Log(logger.Info, str)
if m.Metrics != nil {
m.Metrics.webRTCManagerSet(m)
}
go m.run()
return nil

Loading…
Cancel
Save