From b72f3577c8c5dc1f3f21cd043eeadde0e9ccb7f5 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sat, 26 Aug 2023 19:45:10 +0200 Subject: [PATCH] print warning when the write queue is full (#2251) --- internal/core/async_writer.go | 83 ++++++++++++++++++++++++++ internal/core/hls_muxer.go | 46 ++++---------- internal/core/rtmp_conn.go | 40 ++++++------- internal/core/rtsp_server.go | 6 ++ internal/core/rtsp_session.go | 31 +++++++--- internal/core/srt_conn.go | 36 +++++------ internal/core/webrtc_outgoing_track.go | 16 +---- internal/core/webrtc_session.go | 22 ++----- 8 files changed, 165 insertions(+), 115 deletions(-) create mode 100644 internal/core/async_writer.go diff --git a/internal/core/async_writer.go b/internal/core/async_writer.go new file mode 100644 index 00000000..ee6fd150 --- /dev/null +++ b/internal/core/async_writer.go @@ -0,0 +1,83 @@ +package core + +import ( + "fmt" + "sync" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" + + "github.com/bluenviron/mediamtx/internal/logger" +) + +const ( + minIntervalBetweenWarnings = 1 * time.Second +) + +type asyncWriter struct { + parent logger.Writer + + buffer *ringbuffer.RingBuffer + prevWarnPrinted time.Time + prevWarnPrintedMutex sync.Mutex + + // out + err chan error +} + +func newAsyncWriter( + queueSize int, + parent logger.Writer, +) *asyncWriter { + buffer, _ := ringbuffer.New(uint64(queueSize)) + + return &asyncWriter{ + parent: parent, + buffer: buffer, + err: make(chan error), + } +} + +func (w *asyncWriter) start() { + go w.run() +} + +func (w *asyncWriter) stop() { + w.buffer.Close() + <-w.err +} + +func (w *asyncWriter) error() chan error { + return w.err +} + +func (w *asyncWriter) run() { + w.err <- w.runInner() +} + +func (w *asyncWriter) runInner() error { + for { + cb, ok := w.buffer.Pull() + if !ok { + return fmt.Errorf("terminated") + } + + err := cb.(func() error)() + if err != nil { + return err + } + } +} + +func (w *asyncWriter) push(cb func() error) { + ok := w.buffer.Push(cb) + if !ok { + now := time.Now() + w.prevWarnPrintedMutex.Lock() + if now.Sub(w.prevWarnPrinted) >= minIntervalBetweenWarnings { + w.prevWarnPrinted = now + w.parent.Log(logger.Warn, "write queue is full") + } + w.prevWarnPrintedMutex.Unlock() + } +} diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index cf495d1d..7fc67ac8 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -15,7 +15,6 @@ import ( "github.com/bluenviron/gohlslib/pkg/codecs" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" "github.com/gin-gonic/gin" "github.com/bluenviron/mediamtx/internal/conf" @@ -76,7 +75,7 @@ type hlsMuxer struct { ctxCancel func() created time.Time path *path - ringBuffer *ringbuffer.RingBuffer + writer *asyncWriter lastRequestTime *int64 muxer *gohlslib.Muxer requests []*hlsMuxerHandleRequestReq @@ -254,7 +253,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) defer m.path.removeReader(pathRemoveReaderReq{author: m}) - m.ringBuffer, _ = ringbuffer.New(uint64(m.writeQueueSize)) + m.writer = newAsyncWriter(m.writeQueueSize, m) var medias []*description.Media @@ -304,10 +303,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) m.Log(logger.Info, "is converting into HLS, %s", sourceMediaInfo(medias)) - writerDone := make(chan error) - go func() { - writerDone <- m.runWriter() - }() + m.writer.start() closeCheckTicker := time.NewTicker(closeCheckPeriod) defer closeCheckTicker.Stop() @@ -318,18 +314,16 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) if m.remoteAddr != "" { t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime)) if time.Since(t) >= closeAfterInactivity { - m.ringBuffer.Close() - <-writerDone + m.writer.stop() return fmt.Errorf("not used anymore") } } - case err := <-writerDone: + case err := <-m.writer.error(): return err case <-innerCtx.Done(): - m.ringBuffer.Close() - <-writerDone + m.writer.stop() return fmt.Errorf("terminated") } } @@ -341,7 +335,7 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media, if videoFormatAV1 != nil { stream.AddReader(m, videoMedia, videoFormatAV1, func(u unit.Unit) { - m.ringBuffer.Push(func() error { + m.writer.push(func() error { tunit := u.(*unit.AV1) if tunit.TU == nil { @@ -368,7 +362,7 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media, if videoFormatVP9 != nil { stream.AddReader(m, videoMedia, videoFormatVP9, func(u unit.Unit) { - m.ringBuffer.Push(func() error { + m.writer.push(func() error { tunit := u.(*unit.VP9) if tunit.Frame == nil { @@ -395,7 +389,7 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media, if videoFormatH265 != nil { stream.AddReader(m, videoMedia, videoFormatH265, func(u unit.Unit) { - m.ringBuffer.Push(func() error { + m.writer.push(func() error { tunit := u.(*unit.H265) if tunit.AU == nil { @@ -428,7 +422,7 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media, if videoFormatH264 != nil { stream.AddReader(m, videoMedia, videoFormatH264, func(u unit.Unit) { - m.ringBuffer.Push(func() error { + m.writer.push(func() error { tunit := u.(*unit.H264) if tunit.AU == nil { @@ -464,7 +458,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media, if audioMedia != nil { stream.AddReader(m, audioMedia, audioFormatOpus, func(u unit.Unit) { - m.ringBuffer.Push(func() error { + m.writer.push(func() error { tunit := u.(*unit.Opus) pts := tunit.PTS @@ -497,7 +491,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media, if audioMedia != nil { stream.AddReader(m, audioMedia, audioFormatMPEG4AudioGeneric, func(u unit.Unit) { - m.ringBuffer.Push(func() error { + m.writer.push(func() error { tunit := u.(*unit.MPEG4AudioGeneric) if tunit.AUs == nil { @@ -532,7 +526,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media, len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 && len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 { stream.AddReader(m, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) { - m.ringBuffer.Push(func() error { + m.writer.push(func() error { tunit := u.(*unit.MPEG4AudioLATM) if tunit.AU == nil { @@ -562,20 +556,6 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media, return nil, nil } -func (m *hlsMuxer) runWriter() error { - for { - item, ok := m.ringBuffer.Pull() - if !ok { - return fmt.Errorf("terminated") - } - - err := item.(func() error)() - if err != nil { - return err - } - } -} - func (m *hlsMuxer) handleRequest(ctx *gin.Context) { atomic.StoreInt64(m.lastRequestTime, time.Now().UnixNano()) diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 9c9fd3fe..3519081a 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -12,7 +12,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" @@ -241,11 +240,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { c.pathName = pathName c.mutex.Unlock() - ringBuffer, _ := ringbuffer.New(uint64(c.writeQueueSize)) - go func() { - <-c.ctx.Done() - ringBuffer.Close() - }() + writer := newAsyncWriter(c.writeQueueSize, c) var medias []*description.Media var w *rtmp.Writer @@ -253,7 +248,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { videoMedia, videoFormat := c.setupVideo( &w, res.stream, - ringBuffer) + writer) if videoMedia != nil { medias = append(medias, videoMedia) } @@ -261,7 +256,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { audioMedia, audioFormat := c.setupAudio( &w, res.stream, - ringBuffer) + writer) if audioFormat != nil { medias = append(medias, audioMedia) } @@ -303,23 +298,22 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { // disable read deadline c.nconn.SetReadDeadline(time.Time{}) - for { - item, ok := ringBuffer.Pull() - if !ok { - return fmt.Errorf("terminated") - } + writer.start() - err := item.(func() error)() - if err != nil { - return err - } + select { + case <-c.ctx.Done(): + writer.stop() + return fmt.Errorf("terminated") + + case err := <-writer.error(): + return err } } func (c *rtmpConn) setupVideo( w **rtmp.Writer, stream *stream.Stream, - ringBuffer *ringbuffer.RingBuffer, + writer *asyncWriter, ) (*description.Media, format.Format) { var videoFormatH264 *format.H264 videoMedia := stream.Desc().FindFormat(&videoFormatH264) @@ -328,7 +322,7 @@ func (c *rtmpConn) setupVideo( var videoDTSExtractor *h264.DTSExtractor stream.AddReader(c, videoMedia, videoFormatH264, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.H264) if tunit.AU == nil { @@ -392,14 +386,14 @@ func (c *rtmpConn) setupVideo( func (c *rtmpConn) setupAudio( w **rtmp.Writer, stream *stream.Stream, - ringBuffer *ringbuffer.RingBuffer, + writer *asyncWriter, ) (*description.Media, format.Format) { var audioFormatMPEG4Generic *format.MPEG4AudioGeneric audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Generic) if audioMedia != nil { stream.AddReader(c, audioMedia, audioFormatMPEG4Generic, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.MPEG4AudioGeneric) if tunit.AUs == nil { @@ -435,7 +429,7 @@ func (c *rtmpConn) setupAudio( len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 && len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 { stream.AddReader(c, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.MPEG4AudioLATM) if tunit.AU == nil { @@ -457,7 +451,7 @@ func (c *rtmpConn) setupAudio( if audioMedia != nil { stream.AddReader(c, audioMedia, audioFormatMPEG1, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.MPEG1Audio) pts := tunit.PTS diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index 51155855..bb14a687 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -330,6 +330,12 @@ func (s *rtspServer) OnDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) se.onDecodeError(ctx) } +// OnDecodeError implements gortsplib.ServerHandlerOnStreamWriteError. +func (s *rtspServer) OnStreamWriteError(ctx *gortsplib.ServerHandlerOnStreamWriteErrorCtx) { + se := ctx.Session.UserData().(*rtspSession) + se.onStreamWriteError(ctx) +} + func (s *rtspServer) findConnByUUID(uuid uuid.UUID) *rtspConn { for _, c := range s.conns { if c.uuid == uuid { diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index e2cfcf0c..d04e194a 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -40,15 +40,17 @@ type rtspSession struct { pathManager rtspSessionPathManager parent rtspSessionParent - uuid uuid.UUID - created time.Time - path *path - stream *stream.Stream - onReadCmd *externalcmd.Cmd // read - mutex sync.Mutex - state gortsplib.ServerSessionState - transport *gortsplib.Transport - pathName string + uuid uuid.UUID + created time.Time + path *path + stream *stream.Stream + onReadCmd *externalcmd.Cmd // read + mutex sync.Mutex + state gortsplib.ServerSessionState + transport *gortsplib.Transport + pathName string + prevWarnPrinted time.Time + prevWarnPrintedMutex sync.Mutex } func newRTSPSession( @@ -408,6 +410,17 @@ func (s *rtspSession) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx s.Log(logger.Warn, ctx.Error.Error()) } +// onStreamWriteError is called by rtspServer. +func (s *rtspSession) onStreamWriteError(ctx *gortsplib.ServerHandlerOnStreamWriteErrorCtx) { + now := time.Now() + s.prevWarnPrintedMutex.Lock() + if now.Sub(s.prevWarnPrinted) >= minIntervalBetweenWarnings { + s.prevWarnPrinted = now + s.Log(logger.Warn, ctx.Error.Error()) + } + s.prevWarnPrintedMutex.Unlock() +} + func (s *rtspSession) apiItem() *apiRTSPSession { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index 3d18a6b7..ddb9ab31 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -12,7 +12,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" @@ -416,11 +415,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass c.conn = sconn c.mutex.Unlock() - ringBuffer, _ := ringbuffer.New(uint64(c.writeQueueSize)) - go func() { - <-c.ctx.Done() - ringBuffer.Close() - }() + writer := newAsyncWriter(c.writeQueueSize, c) var w *mpegts.Writer var tracks []*mpegts.Track @@ -446,7 +441,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass dtsExtractor := h265.NewDTSExtractor() res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.H265) if tunit.AU == nil { return nil @@ -483,7 +478,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass dtsExtractor := h264.NewDTSExtractor() res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.H264) if tunit.AU == nil { return nil @@ -519,7 +514,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass }) res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.MPEG4AudioGeneric) if tunit.AUs == nil { return nil @@ -545,7 +540,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass }) res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.MPEG4AudioLATM) if tunit.AU == nil { return nil @@ -574,7 +569,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass }) res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.Opus) if tunit.Packets == nil { return nil @@ -595,7 +590,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass track := addTrack(medi, &mpegts.CodecMPEG1Audio{}) res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - ringBuffer.Push(func() error { + writer.push(func() error { tunit := u.(*unit.MPEG1Audio) if tunit.Frames == nil { return nil @@ -646,16 +641,15 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass // disable read deadline sconn.SetReadDeadline(time.Time{}) - for { - item, ok := ringBuffer.Pull() - if !ok { - return true, fmt.Errorf("terminated") - } + writer.start() - err := item.(func() error)() - if err != nil { - return true, err - } + select { + case <-c.ctx.Done(): + writer.stop() + return true, fmt.Errorf("terminated") + + case err := <-writer.error(): + return true, err } } diff --git a/internal/core/webrtc_outgoing_track.go b/internal/core/webrtc_outgoing_track.go index aeb7ebdc..ed7cf973 100644 --- a/internal/core/webrtc_outgoing_track.go +++ b/internal/core/webrtc_outgoing_track.go @@ -1,7 +1,6 @@ package core import ( - "context" "fmt" "time" @@ -11,7 +10,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" - "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" "github.com/pion/webrtc/v3" "github.com/bluenviron/mediamtx/internal/stream" @@ -350,11 +348,9 @@ func newWebRTCOutgoingTrackAudio(desc *description.Session) (*webRTCOutgoingTrac } func (t *webRTCOutgoingTrack) start( - ctx context.Context, r reader, stream *stream.Stream, - ringBuffer *ringbuffer.RingBuffer, - writeError chan error, + writer *asyncWriter, ) { // read incoming RTCP packets to make interceptors work go func() { @@ -368,14 +364,8 @@ func (t *webRTCOutgoingTrack) start( }() stream.AddReader(r, t.media, t.format, func(u unit.Unit) { - ringBuffer.Push(func() { - err := t.cb(u) - if err != nil { - select { - case writeError <- err: - case <-ctx.Done(): - } - } + writer.push(func() error { + return t.cb(u) }) }) } diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index 4aafc46e..217a449b 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -11,7 +11,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/google/uuid" "github.com/pion/sdp/v3" @@ -512,13 +511,10 @@ func (s *webRTCSession) runRead() (int, error) { s.pc = pc s.mutex.Unlock() - ringBuffer, _ := ringbuffer.New(uint64(s.writeQueueSize)) - defer ringBuffer.Close() - - writeError := make(chan error) + writer := newAsyncWriter(s.writeQueueSize, s) for _, track := range tracks { - track.start(s.ctx, s, res.stream, ringBuffer, writeError) + track.start(s, res.stream, writer) } defer res.stream.RemoveReader(s) @@ -526,24 +522,18 @@ func (s *webRTCSession) runRead() (int, error) { s.Log(logger.Info, "is reading from path '%s', %s", res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks))) - go func() { - for { - item, ok := ringBuffer.Pull() - if !ok { - return - } - item.(func())() - } - }() + writer.start() select { case <-pc.Disconnected(): + writer.stop() return 0, fmt.Errorf("peer connection closed") - case err := <-writeError: + case err := <-writer.error(): return 0, err case <-s.ctx.Done(): + writer.stop() return 0, fmt.Errorf("terminated") } }