From b71e12444755c27cd7bc427a5ff8e51bf15e5cd8 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 3 Dec 2023 16:28:40 +0100 Subject: [PATCH] fix crash when requesting metrics and RTMP, SRT or WebRTC servers are not present anymore --- internal/core/core.go | 68 +++++- internal/core/hls_manager.go | 15 -- internal/core/metrics.go | 22 +- internal/core/metrics_test.go | 383 +++++++++++++++++--------------- internal/core/path_manager.go | 11 - internal/core/rtmp_server.go | 11 - internal/core/rtsp_server.go | 19 -- internal/core/srt_server.go | 9 +- internal/core/webrtc_manager.go | 5 - 9 files changed, 274 insertions(+), 269 deletions(-) diff --git a/internal/core/core.go b/internal/core/core.go index 242f73eb..33c0712f 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -313,9 +313,12 @@ func (p *Core) createResources(initial bool) error { p.conf.UDPMaxPayloadSize, p.conf.Paths, p.externalCmdPool, - p.metrics, p, ) + + if p.metrics != nil { + p.metrics.setPathManager(p.pathManager) + } } if p.conf.RTSP && @@ -347,13 +350,16 @@ func (p *Core) createResources(initial bool) error { p.conf.RunOnConnectRestart, p.conf.RunOnDisconnect, p.externalCmdPool, - p.metrics, p.pathManager, p, ) if err != nil { return err } + + if p.metrics != nil { + p.metrics.setRTSPServer(p.rtspServer) + } } if p.conf.RTSP && @@ -382,13 +388,16 @@ func (p *Core) createResources(initial bool) error { p.conf.RunOnConnectRestart, p.conf.RunOnDisconnect, p.externalCmdPool, - p.metrics, p.pathManager, p, ) if err != nil { return err } + + if p.metrics != nil { + p.metrics.setRTSPSServer(p.rtspsServer) + } } if p.conf.RTMP && @@ -408,13 +417,16 @@ func (p *Core) createResources(initial bool) error { p.conf.RunOnConnectRestart, p.conf.RunOnDisconnect, p.externalCmdPool, - p.metrics, p.pathManager, p, ) if err != nil { return err } + + if p.metrics != nil { + p.metrics.setRTMPServer(p.rtmpServer) + } } if p.conf.RTMP && @@ -434,7 +446,6 @@ func (p *Core) createResources(initial bool) error { p.conf.RunOnConnectRestart, p.conf.RunOnDisconnect, p.externalCmdPool, - p.metrics, p.pathManager, p, ) @@ -463,12 +474,17 @@ func (p *Core) createResources(initial bool) error { p.conf.ReadTimeout, p.conf.WriteQueueSize, p.pathManager, - p.metrics, p, ) if err != nil { return err } + + p.pathManager.setHLSManager(p.hlsManager) + + if p.metrics != nil { + p.metrics.setHLSManager(p.hlsManager) + } } if p.conf.WebRTC && @@ -490,7 +506,6 @@ func (p *Core) createResources(initial bool) error { ICEServers: p.conf.WebRTCICEServers2, ExternalCmdPool: p.externalCmdPool, PathManager: p.pathManager, - Metrics: p.metrics, Parent: p, } err = p.webRTCManager.initialize() @@ -498,6 +513,10 @@ func (p *Core) createResources(initial bool) error { p.webRTCManager = nil return err } + + if p.metrics != nil { + p.metrics.setWebRTCManager(p.webRTCManager) + } } if p.conf.SRT && @@ -513,13 +532,16 @@ func (p *Core) createResources(initial bool) error { p.conf.RunOnConnectRestart, p.conf.RunOnDisconnect, p.externalCmdPool, - p.metrics, p.pathManager, p, ) if err != nil { return err } + + if p.metrics != nil { + p.metrics.setSRTServer(p.srtServer) + } } if p.conf.API && @@ -747,16 +769,30 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { } if closeSRTServer && p.srtServer != nil { + if p.metrics != nil { + p.metrics.setSRTServer(nil) + } + p.srtServer.close() p.srtServer = nil } if closeWebRTCManager && p.webRTCManager != nil { + if p.metrics != nil { + p.metrics.setWebRTCManager(nil) + } + p.webRTCManager.close() p.webRTCManager = nil } if closeHLSManager && p.hlsManager != nil { + if p.metrics != nil { + p.metrics.setHLSManager(nil) + } + + p.pathManager.setHLSManager(nil) + p.hlsManager.close() p.hlsManager = nil } @@ -767,21 +803,37 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { } if closeRTMPServer && p.rtmpServer != nil { + if p.metrics != nil { + p.metrics.setRTMPServer(nil) + } + p.rtmpServer.close() p.rtmpServer = nil } if closeRTSPSServer && p.rtspsServer != nil { + if p.metrics != nil { + p.metrics.setRTSPSServer(nil) + } + p.rtspsServer.close() p.rtspsServer = nil } if closeRTSPServer && p.rtspServer != nil { + if p.metrics != nil { + p.metrics.setRTSPServer(nil) + } + p.rtspServer.close() p.rtspServer = nil } if closePathManager && p.pathManager != nil { + if p.metrics != nil { + p.metrics.setPathManager(nil) + } + p.pathManager.close() p.pathManager = nil } diff --git a/internal/core/hls_manager.go b/internal/core/hls_manager.go index b71cf81c..7e6915f9 100644 --- a/internal/core/hls_manager.go +++ b/internal/core/hls_manager.go @@ -45,7 +45,6 @@ type hlsManager struct { directory string writeQueueSize int pathManager *pathManager - metrics *metrics parent hlsManagerParent ctx context.Context @@ -81,7 +80,6 @@ func newHLSManager( readTimeout conf.StringDuration, writeQueueSize int, pathManager *pathManager, - metrics *metrics, parent hlsManagerParent, ) (*hlsManager, error) { ctx, ctxCancel := context.WithCancel(context.Background()) @@ -98,7 +96,6 @@ func newHLSManager( writeQueueSize: writeQueueSize, pathManager: pathManager, parent: parent, - metrics: metrics, ctx: ctx, ctxCancel: ctxCancel, muxers: make(map[string]*hlsMuxer), @@ -129,12 +126,6 @@ func newHLSManager( m.Log(logger.Info, "listener opened on "+address) - m.pathManager.setHLSManager(m) - - if m.metrics != nil { - m.metrics.setHLSManager(m) - } - m.wg.Add(1) go m.run() @@ -223,12 +214,6 @@ outer: m.ctxCancel() m.httpServer.close() - - m.pathManager.setHLSManager(nil) - - if m.metrics != nil { - m.metrics.setHLSManager(nil) - } } func (m *hlsManager) createMuxer(pathName string, remoteAddr string) *hlsMuxer { diff --git a/internal/core/metrics.go b/internal/core/metrics.go index bb511309..67c59e89 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -237,50 +237,50 @@ func (m *metrics) onMetrics(ctx *gin.Context) { io.WriteString(ctx.Writer, out) //nolint:errcheck } -// pathManagerSet is called by pathManager. -func (m *metrics) pathManagerSet(s apiPathManager) { +// setPathManager is called by core. +func (m *metrics) setPathManager(s apiPathManager) { m.mutex.Lock() defer m.mutex.Unlock() m.pathManager = s } -// setHLSManager is called by hlsManager. +// setHLSManager is called by core. func (m *metrics) setHLSManager(s apiHLSManager) { m.mutex.Lock() defer m.mutex.Unlock() m.hlsManager = s } -// setRTSPServer is called by rtspServer (plain). +// setRTSPServer is called by core. func (m *metrics) setRTSPServer(s apiRTSPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtspServer = s } -// setRTSPSServer is called by rtspServer (tls). +// setRTSPSServer is called by core. func (m *metrics) setRTSPSServer(s apiRTSPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtspsServer = s } -// rtmpServerSet is called by rtmpServer. -func (m *metrics) rtmpServerSet(s apiRTMPServer) { +// setRTMPServer is called by core. +func (m *metrics) setRTMPServer(s apiRTMPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtmpServer = s } -// srtServerSet is called by srtServer. -func (m *metrics) srtServerSet(s apiSRTServer) { +// setSRTServer is called by core. +func (m *metrics) setSRTServer(s apiSRTServer) { m.mutex.Lock() defer m.mutex.Unlock() m.srtServer = s } -// webRTCManagerSet is called by webRTCManager. -func (m *metrics) webRTCManagerSet(s apiWebRTCManager) { +// setWebRTCManager is called by core. +func (m *metrics) setWebRTCManager(s apiWebRTCManager) { m.mutex.Lock() defer m.mutex.Unlock() m.webRTCManager = s diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go index 0770e416..a1d0a936 100644 --- a/internal/core/metrics_test.go +++ b/internal/core/metrics_test.go @@ -16,7 +16,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" - "github.com/datarhei/gosrt" + srt "github.com/datarhei/gosrt" "github.com/pion/rtp" "github.com/stretchr/testify/require" @@ -33,7 +33,8 @@ func TestMetrics(t *testing.T) { require.NoError(t, err) defer os.Remove(serverKeyFpath) - p, ok := newInstance("hlsAlwaysRemux: yes\n" + + p, ok := newInstance("api: yes\n" + + "hlsAlwaysRemux: yes\n" + "metrics: yes\n" + "webrtcServerCert: " + serverCertFpath + "\n" + "webrtcServerKey: " + serverKeyFpath + "\n" + @@ -47,9 +48,10 @@ func TestMetrics(t *testing.T) { hc := &http.Client{Transport: &http.Transport{}} - bo := httpPullFile(t, hc, "http://localhost:9998/metrics") + t.Run("initial", func(t *testing.T) { + bo := httpPullFile(t, hc, "http://localhost:9998/metrics") - require.Equal(t, `paths 0 + require.Equal(t, `paths 0 hls_muxers 0 hls_muxers_bytes_sent 0 rtsp_conns 0 @@ -74,181 +76,200 @@ webrtc_sessions 0 webrtc_sessions_bytes_received 0 webrtc_sessions_bytes_sent 0 `, string(bo)) - - terminate := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(5) - - go func() { - defer wg.Done() - source := gortsplib.Client{} - err := source.StartRecording("rtsp://localhost:8554/rtsp_path", - &description.Session{Medias: []*description.Media{{ - Type: description.MediaTypeVideo, - Formats: []format.Format{testFormatH264}, - }}}) - require.NoError(t, err) - defer source.Close() - <-terminate - }() - - go func() { - defer wg.Done() - source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}} - err := source2.StartRecording("rtsps://localhost:8322/rtsps_path", - &description.Session{Medias: []*description.Media{{ - Type: description.MediaTypeVideo, - Formats: []format.Format{testFormatH264}, - }}}) - require.NoError(t, err) - defer source2.Close() - <-terminate - }() - - go func() { - defer wg.Done() - u, err := url.Parse("rtmp://localhost:1935/rtmp_path") - require.NoError(t, err) - - nconn, err := net.Dial("tcp", u.Host) - require.NoError(t, err) - defer nconn.Close() - - conn, err := rtmp.NewClientConn(nconn, u, true) - require.NoError(t, err) - - _, err = rtmp.NewWriter(conn, testFormatH264, nil) - require.NoError(t, err) - <-terminate - }() - - go func() { - defer wg.Done() - - su, err := url.Parse("http://localhost:8889/webrtc_path/whip") - require.NoError(t, err) - - s := &webrtc.WHIPClient{ - HTTPClient: &http.Client{Transport: &http.Transport{}}, - URL: su, - } - - tracks, err := s.Publish(context.Background(), testMediaH264.Formats[0], nil) - require.NoError(t, err) - defer checkClose(t, s.Close) - - err = tracks[0].WriteRTP(&rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 123, - Timestamp: 45343, - SSRC: 563423, - }, - Payload: []byte{1}, - }) - require.NoError(t, err) - <-terminate - }() - - go func() { - defer wg.Done() - - srtConf := srt.DefaultConfig() - address, err := srtConf.UnmarshalURL("srt://localhost:8890?streamid=publish:srt_path") - require.NoError(t, err) - - err = srtConf.Validate() - require.NoError(t, err) - - publisher, err := srt.Dial("srt", address, srtConf) - require.NoError(t, err) - defer publisher.Close() - - track := &mpegts.Track{ - Codec: &mpegts.CodecH264{}, - } - - bw := bufio.NewWriter(publisher) - w := mpegts.NewWriter(bw, []*mpegts.Track{track}) - require.NoError(t, err) - - err = w.WriteH26x(track, 0, 0, true, [][]byte{ - { // SPS - 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, - 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, - 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, - 0x20, - }, - { // PPS - 0x08, 0x06, 0x07, 0x08, - }, - { // IDR - 0x05, 1, - }, - }) - require.NoError(t, err) - - err = bw.Flush() - require.NoError(t, err) - <-terminate - }() - - time.Sleep(500 * time.Millisecond) - - bo = httpPullFile(t, hc, "http://localhost:9998/metrics") - - require.Regexp(t, - `^paths\{name=".*?",state="ready"\} 1`+"\n"+ - `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ - `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ - `paths\{name=".*?",state="ready"\} 1`+"\n"+ - `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ - `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ - `paths\{name=".*?",state="ready"\} 1`+"\n"+ - `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ - `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ - `paths\{name=".*?",state="ready"\} 1`+"\n"+ - `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ - `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ - `paths\{name=".*?",state="ready"\} 1`+"\n"+ - `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ - `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ - `hls_muxers\{name=".*?"\} 1`+"\n"+ - `hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+ - `hls_muxers\{name=".*?"\} 1`+"\n"+ - `hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+ - `hls_muxers\{name=".*?"\} 1`+"\n"+ - `hls_muxers_bytes_sent\{name=".*?"\} 0`+"\n"+ - `hls_muxers\{name=".*?"\} 1`+"\n"+ - `hls_muxers_bytes_sent\{name=".*?"\} 0`+"\n"+ - `hls_muxers\{name=".*?"\} 1`+"\n"+ - `hls_muxers_bytes_sent\{name=".*?"\} 0`+"\n"+ - `rtsp_conns\{id=".*?"\} 1`+"\n"+ - `rtsp_conns_bytes_received\{id=".*?"\} [0-9]+`+"\n"+ - `rtsp_conns_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+ - `rtsp_sessions\{id=".*?",state="publish"\} 1`+"\n"+ - `rtsp_sessions_bytes_received\{id=".*?",state="publish"\} 0`+"\n"+ - `rtsp_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - `rtsps_conns\{id=".*?"\} 1`+"\n"+ - `rtsps_conns_bytes_received\{id=".*?"\} [0-9]+`+"\n"+ - `rtsps_conns_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+ - `rtsps_sessions\{id=".*?",state="publish"\} 1`+"\n"+ - `rtsps_sessions_bytes_received\{id=".*?",state="publish"\} 0`+"\n"+ - `rtsps_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - `rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+ - `rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - `rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - `srt_conns\{id=".*?",state="publish"\} 1`+"\n"+ - `srt_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - `srt_conns_bytes_sent\{id=".*?",state="publish"\} 0`+"\n"+ - `webrtc_sessions\{id=".*?",state="publish"\} 1`+"\n"+ - `webrtc_sessions_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - `webrtc_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ - "$", - string(bo)) - - close(terminate) - wg.Wait() + }) + + t.Run("with data", func(t *testing.T) { + terminate := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(5) + + go func() { + defer wg.Done() + source := gortsplib.Client{} + err := source.StartRecording("rtsp://localhost:8554/rtsp_path", + &description.Session{Medias: []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{testFormatH264}, + }}}) + require.NoError(t, err) + defer source.Close() + <-terminate + }() + + go func() { + defer wg.Done() + source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}} + err := source2.StartRecording("rtsps://localhost:8322/rtsps_path", + &description.Session{Medias: []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{testFormatH264}, + }}}) + require.NoError(t, err) + defer source2.Close() + <-terminate + }() + + go func() { + defer wg.Done() + u, err := url.Parse("rtmp://localhost:1935/rtmp_path") + require.NoError(t, err) + + nconn, err := net.Dial("tcp", u.Host) + require.NoError(t, err) + defer nconn.Close() + + conn, err := rtmp.NewClientConn(nconn, u, true) + require.NoError(t, err) + + _, err = rtmp.NewWriter(conn, testFormatH264, nil) + require.NoError(t, err) + <-terminate + }() + + go func() { + defer wg.Done() + + su, err := url.Parse("http://localhost:8889/webrtc_path/whip") + require.NoError(t, err) + + s := &webrtc.WHIPClient{ + HTTPClient: &http.Client{Transport: &http.Transport{}}, + URL: su, + } + + tracks, err := s.Publish(context.Background(), testMediaH264.Formats[0], nil) + require.NoError(t, err) + defer checkClose(t, s.Close) + + err = tracks[0].WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{1}, + }) + require.NoError(t, err) + <-terminate + }() + + go func() { + defer wg.Done() + + srtConf := srt.DefaultConfig() + address, err := srtConf.UnmarshalURL("srt://localhost:8890?streamid=publish:srt_path") + require.NoError(t, err) + + err = srtConf.Validate() + require.NoError(t, err) + + publisher, err := srt.Dial("srt", address, srtConf) + require.NoError(t, err) + defer publisher.Close() + + track := &mpegts.Track{ + Codec: &mpegts.CodecH264{}, + } + + bw := bufio.NewWriter(publisher) + w := mpegts.NewWriter(bw, []*mpegts.Track{track}) + require.NoError(t, err) + + err = w.WriteH26x(track, 0, 0, true, [][]byte{ + { // SPS + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, + 0x20, + }, + { // PPS + 0x08, 0x06, 0x07, 0x08, + }, + { // IDR + 0x05, 1, + }, + }) + require.NoError(t, err) + + err = bw.Flush() + require.NoError(t, err) + <-terminate + }() + + time.Sleep(500 * time.Millisecond) + + bo := httpPullFile(t, hc, "http://localhost:9998/metrics") + + require.Regexp(t, + `^paths\{name=".*?",state="ready"\} 1`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ + `paths\{name=".*?",state="ready"\} 1`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ + `paths\{name=".*?",state="ready"\} 1`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ + `paths\{name=".*?",state="ready"\} 1`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ + `paths\{name=".*?",state="ready"\} 1`+"\n"+ + `paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ + `hls_muxers\{name=".*?"\} 1`+"\n"+ + `hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+ + `hls_muxers\{name=".*?"\} 1`+"\n"+ + `hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+ + `hls_muxers\{name=".*?"\} 1`+"\n"+ + `hls_muxers_bytes_sent\{name=".*?"\} 0`+"\n"+ + `hls_muxers\{name=".*?"\} 1`+"\n"+ + `hls_muxers_bytes_sent\{name=".*?"\} 0`+"\n"+ + `hls_muxers\{name=".*?"\} 1`+"\n"+ + `hls_muxers_bytes_sent\{name=".*?"\} 0`+"\n"+ + `rtsp_conns\{id=".*?"\} 1`+"\n"+ + `rtsp_conns_bytes_received\{id=".*?"\} [0-9]+`+"\n"+ + `rtsp_conns_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+ + `rtsp_sessions\{id=".*?",state="publish"\} 1`+"\n"+ + `rtsp_sessions_bytes_received\{id=".*?",state="publish"\} 0`+"\n"+ + `rtsp_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + `rtsps_conns\{id=".*?"\} 1`+"\n"+ + `rtsps_conns_bytes_received\{id=".*?"\} [0-9]+`+"\n"+ + `rtsps_conns_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+ + `rtsps_sessions\{id=".*?",state="publish"\} 1`+"\n"+ + `rtsps_sessions_bytes_received\{id=".*?",state="publish"\} 0`+"\n"+ + `rtsps_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + `rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+ + `rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + `rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + `srt_conns\{id=".*?",state="publish"\} 1`+"\n"+ + `srt_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + `srt_conns_bytes_sent\{id=".*?",state="publish"\} 0`+"\n"+ + `webrtc_sessions\{id=".*?",state="publish"\} 1`+"\n"+ + `webrtc_sessions_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + `webrtc_sessions_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + "$", + string(bo)) + + close(terminate) + wg.Wait() + }) + + t.Run("servers deleted", func(t *testing.T) { + httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/global/patch", map[string]interface{}{ + "rtsp": false, + "rtmp": false, + "srt": false, + "hls": false, + "webrtc": false, + }, nil) + + time.Sleep(500 * time.Millisecond) + + bo := httpPullFile(t, hc, "http://localhost:9998/metrics") + + require.Equal(t, "paths 0\n", string(bo)) + }) } diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index f37d3bc9..f63d4e50 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -78,7 +78,6 @@ type pathManager struct { udpMaxPayloadSize int pathConfs map[string]*conf.Path externalCmdPool *externalcmd.Pool - metrics *metrics parent pathManagerParent ctx context.Context @@ -112,7 +111,6 @@ func newPathManager( udpMaxPayloadSize int, pathConfs map[string]*conf.Path, externalCmdPool *externalcmd.Pool, - metrics *metrics, parent pathManagerParent, ) *pathManager { ctx, ctxCancel := context.WithCancel(context.Background()) @@ -127,7 +125,6 @@ func newPathManager( udpMaxPayloadSize: udpMaxPayloadSize, pathConfs: pathConfs, externalCmdPool: externalCmdPool, - metrics: metrics, parent: parent, ctx: ctx, ctxCancel: ctxCancel, @@ -152,10 +149,6 @@ func newPathManager( } } - if pm.metrics != nil { - pm.metrics.pathManagerSet(pm) - } - pm.Log(logger.Debug, "path manager created") pm.wg.Add(1) @@ -220,10 +213,6 @@ outer: } pm.ctxCancel() - - if pm.metrics != nil { - pm.metrics.pathManagerSet(nil) - } } func (pm *pathManager) doReloadConf(newPaths map[string]*conf.Path) { diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index a1bc1a45..5733d2c4 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -59,7 +59,6 @@ type rtmpServer struct { runOnConnectRestart bool runOnDisconnect string externalCmdPool *externalcmd.Pool - metrics *metrics pathManager *pathManager parent rtmpServerParent @@ -91,7 +90,6 @@ func newRTMPServer( runOnConnectRestart bool, runOnDisconnect string, externalCmdPool *externalcmd.Pool, - metrics *metrics, pathManager *pathManager, parent rtmpServerParent, ) (*rtmpServer, error) { @@ -124,7 +122,6 @@ func newRTMPServer( runOnDisconnect: runOnDisconnect, isTLS: isTLS, externalCmdPool: externalCmdPool, - metrics: metrics, pathManager: pathManager, parent: parent, ctx: ctx, @@ -141,10 +138,6 @@ func newRTMPServer( s.Log(logger.Info, "listener opened on %s", address) - if s.metrics != nil { - s.metrics.rtmpServerSet(s) - } - newRTMPListener( s.ln, &s.wg, @@ -247,10 +240,6 @@ outer: s.ctxCancel() s.ln.Close() - - if s.metrics != nil { - s.metrics.rtmpServerSet(s) - } } func (s *rtmpServer) findConnByUUID(uuid uuid.UUID) *rtmpConn { diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index da82bbe5..96ba9444 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -51,7 +51,6 @@ type rtspServer struct { runOnConnectRestart bool runOnDisconnect string externalCmdPool *externalcmd.Pool - metrics *metrics pathManager *pathManager parent rtspServerParent @@ -86,7 +85,6 @@ func newRTSPServer( runOnConnectRestart bool, runOnDisconnect string, externalCmdPool *externalcmd.Pool, - metrics *metrics, pathManager *pathManager, parent rtspServerParent, ) (*rtspServer, error) { @@ -102,7 +100,6 @@ func newRTSPServer( runOnConnectRestart: runOnConnectRestart, runOnDisconnect: runOnDisconnect, externalCmdPool: externalCmdPool, - metrics: metrics, pathManager: pathManager, parent: parent, ctx: ctx, @@ -146,14 +143,6 @@ func newRTSPServer( s.Log(logger.Info, "listener opened on %s", printAddresses(s.srv)) - if metrics != nil { - if !isTLS { - metrics.setRTSPServer(s) - } else { - metrics.setRTSPSServer(s) - } - } - s.wg.Add(1) go s.run() @@ -205,14 +194,6 @@ outer: } s.ctxCancel() - - if s.metrics != nil { - if !s.isTLS { - s.metrics.setRTSPServer(nil) - } else { - s.metrics.setRTSPSServer(nil) - } - } } // OnConnOpen implements gortsplib.ServerHandlerOnConnOpen. diff --git a/internal/core/srt_server.go b/internal/core/srt_server.go index b8876323..96fa0721 100644 --- a/internal/core/srt_server.go +++ b/internal/core/srt_server.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/datarhei/gosrt" + srt "github.com/datarhei/gosrt" "github.com/google/uuid" "github.com/bluenviron/mediamtx/internal/conf" @@ -67,7 +67,6 @@ type srtServer struct { runOnConnectRestart bool runOnDisconnect string externalCmdPool *externalcmd.Pool - metrics *metrics pathManager *pathManager parent srtServerParent @@ -97,7 +96,6 @@ func newSRTServer( runOnConnectRestart bool, runOnDisconnect string, externalCmdPool *externalcmd.Pool, - metrics *metrics, pathManager *pathManager, parent srtServerParent, ) (*srtServer, error) { @@ -122,7 +120,6 @@ func newSRTServer( runOnConnectRestart: runOnConnectRestart, runOnDisconnect: runOnDisconnect, externalCmdPool: externalCmdPool, - metrics: metrics, pathManager: pathManager, parent: parent, ctx: ctx, @@ -139,10 +136,6 @@ func newSRTServer( s.Log(logger.Info, "listener opened on "+address+" (UDP)") - if s.metrics != nil { - s.metrics.srtServerSet(s) - } - newSRTListener( s.ln, &s.wg, diff --git a/internal/core/webrtc_manager.go b/internal/core/webrtc_manager.go index 0e5dbaf3..5a4939c6 100644 --- a/internal/core/webrtc_manager.go +++ b/internal/core/webrtc_manager.go @@ -180,7 +180,6 @@ type webRTCManager struct { ICEServers []conf.WebRTCICEServer ExternalCmdPool *externalcmd.Pool PathManager *pathManager - Metrics *metrics Parent webRTCManagerParent ctx context.Context @@ -284,10 +283,6 @@ func (m *webRTCManager) initialize() error { } m.Log(logger.Info, str) - if m.Metrics != nil { - m.Metrics.webRTCManagerSet(m) - } - go m.run() return nil