Browse Source

print warning when the write queue is full (#2251)

pull/2252/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
b72f3577c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 83
      internal/core/async_writer.go
  2. 46
      internal/core/hls_muxer.go
  3. 36
      internal/core/rtmp_conn.go
  4. 6
      internal/core/rtsp_server.go
  5. 13
      internal/core/rtsp_session.go
  6. 32
      internal/core/srt_conn.go
  7. 16
      internal/core/webrtc_outgoing_track.go
  8. 22
      internal/core/webrtc_session.go

83
internal/core/async_writer.go

@ -0,0 +1,83 @@
package core
import (
"fmt"
"sync"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/bluenviron/mediamtx/internal/logger"
)
const (
minIntervalBetweenWarnings = 1 * time.Second
)
type asyncWriter struct {
parent logger.Writer
buffer *ringbuffer.RingBuffer
prevWarnPrinted time.Time
prevWarnPrintedMutex sync.Mutex
// out
err chan error
}
func newAsyncWriter(
queueSize int,
parent logger.Writer,
) *asyncWriter {
buffer, _ := ringbuffer.New(uint64(queueSize))
return &asyncWriter{
parent: parent,
buffer: buffer,
err: make(chan error),
}
}
func (w *asyncWriter) start() {
go w.run()
}
func (w *asyncWriter) stop() {
w.buffer.Close()
<-w.err
}
func (w *asyncWriter) error() chan error {
return w.err
}
func (w *asyncWriter) run() {
w.err <- w.runInner()
}
func (w *asyncWriter) runInner() error {
for {
cb, ok := w.buffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
err := cb.(func() error)()
if err != nil {
return err
}
}
}
func (w *asyncWriter) push(cb func() error) {
ok := w.buffer.Push(cb)
if !ok {
now := time.Now()
w.prevWarnPrintedMutex.Lock()
if now.Sub(w.prevWarnPrinted) >= minIntervalBetweenWarnings {
w.prevWarnPrinted = now
w.parent.Log(logger.Warn, "write queue is full")
}
w.prevWarnPrintedMutex.Unlock()
}
}

46
internal/core/hls_muxer.go

@ -15,7 +15,6 @@ import (
"github.com/bluenviron/gohlslib/pkg/codecs" "github.com/bluenviron/gohlslib/pkg/codecs"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
@ -76,7 +75,7 @@ type hlsMuxer struct {
ctxCancel func() ctxCancel func()
created time.Time created time.Time
path *path path *path
ringBuffer *ringbuffer.RingBuffer writer *asyncWriter
lastRequestTime *int64 lastRequestTime *int64
muxer *gohlslib.Muxer muxer *gohlslib.Muxer
requests []*hlsMuxerHandleRequestReq requests []*hlsMuxerHandleRequestReq
@ -254,7 +253,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
defer m.path.removeReader(pathRemoveReaderReq{author: m}) defer m.path.removeReader(pathRemoveReaderReq{author: m})
m.ringBuffer, _ = ringbuffer.New(uint64(m.writeQueueSize)) m.writer = newAsyncWriter(m.writeQueueSize, m)
var medias []*description.Media var medias []*description.Media
@ -304,10 +303,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.Log(logger.Info, "is converting into HLS, %s", m.Log(logger.Info, "is converting into HLS, %s",
sourceMediaInfo(medias)) sourceMediaInfo(medias))
writerDone := make(chan error) m.writer.start()
go func() {
writerDone <- m.runWriter()
}()
closeCheckTicker := time.NewTicker(closeCheckPeriod) closeCheckTicker := time.NewTicker(closeCheckPeriod)
defer closeCheckTicker.Stop() defer closeCheckTicker.Stop()
@ -318,18 +314,16 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
if m.remoteAddr != "" { if m.remoteAddr != "" {
t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime)) t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime))
if time.Since(t) >= closeAfterInactivity { if time.Since(t) >= closeAfterInactivity {
m.ringBuffer.Close() m.writer.stop()
<-writerDone
return fmt.Errorf("not used anymore") return fmt.Errorf("not used anymore")
} }
} }
case err := <-writerDone: case err := <-m.writer.error():
return err return err
case <-innerCtx.Done(): case <-innerCtx.Done():
m.ringBuffer.Close() m.writer.stop()
<-writerDone
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
} }
} }
@ -341,7 +335,7 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media,
if videoFormatAV1 != nil { if videoFormatAV1 != nil {
stream.AddReader(m, videoMedia, videoFormatAV1, func(u unit.Unit) { stream.AddReader(m, videoMedia, videoFormatAV1, func(u unit.Unit) {
m.ringBuffer.Push(func() error { m.writer.push(func() error {
tunit := u.(*unit.AV1) tunit := u.(*unit.AV1)
if tunit.TU == nil { if tunit.TU == nil {
@ -368,7 +362,7 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media,
if videoFormatVP9 != nil { if videoFormatVP9 != nil {
stream.AddReader(m, videoMedia, videoFormatVP9, func(u unit.Unit) { stream.AddReader(m, videoMedia, videoFormatVP9, func(u unit.Unit) {
m.ringBuffer.Push(func() error { m.writer.push(func() error {
tunit := u.(*unit.VP9) tunit := u.(*unit.VP9)
if tunit.Frame == nil { if tunit.Frame == nil {
@ -395,7 +389,7 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media,
if videoFormatH265 != nil { if videoFormatH265 != nil {
stream.AddReader(m, videoMedia, videoFormatH265, func(u unit.Unit) { stream.AddReader(m, videoMedia, videoFormatH265, func(u unit.Unit) {
m.ringBuffer.Push(func() error { m.writer.push(func() error {
tunit := u.(*unit.H265) tunit := u.(*unit.H265)
if tunit.AU == nil { if tunit.AU == nil {
@ -428,7 +422,7 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media,
if videoFormatH264 != nil { if videoFormatH264 != nil {
stream.AddReader(m, videoMedia, videoFormatH264, func(u unit.Unit) { stream.AddReader(m, videoMedia, videoFormatH264, func(u unit.Unit) {
m.ringBuffer.Push(func() error { m.writer.push(func() error {
tunit := u.(*unit.H264) tunit := u.(*unit.H264)
if tunit.AU == nil { if tunit.AU == nil {
@ -464,7 +458,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media,
if audioMedia != nil { if audioMedia != nil {
stream.AddReader(m, audioMedia, audioFormatOpus, func(u unit.Unit) { stream.AddReader(m, audioMedia, audioFormatOpus, func(u unit.Unit) {
m.ringBuffer.Push(func() error { m.writer.push(func() error {
tunit := u.(*unit.Opus) tunit := u.(*unit.Opus)
pts := tunit.PTS pts := tunit.PTS
@ -497,7 +491,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media,
if audioMedia != nil { if audioMedia != nil {
stream.AddReader(m, audioMedia, audioFormatMPEG4AudioGeneric, func(u unit.Unit) { stream.AddReader(m, audioMedia, audioFormatMPEG4AudioGeneric, func(u unit.Unit) {
m.ringBuffer.Push(func() error { m.writer.push(func() error {
tunit := u.(*unit.MPEG4AudioGeneric) tunit := u.(*unit.MPEG4AudioGeneric)
if tunit.AUs == nil { if tunit.AUs == nil {
@ -532,7 +526,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media,
len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 && len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 &&
len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 { len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 {
stream.AddReader(m, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) { stream.AddReader(m, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) {
m.ringBuffer.Push(func() error { m.writer.push(func() error {
tunit := u.(*unit.MPEG4AudioLATM) tunit := u.(*unit.MPEG4AudioLATM)
if tunit.AU == nil { if tunit.AU == nil {
@ -562,20 +556,6 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media,
return nil, nil return nil, nil
} }
func (m *hlsMuxer) runWriter() error {
for {
item, ok := m.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
err := item.(func() error)()
if err != nil {
return err
}
}
}
func (m *hlsMuxer) handleRequest(ctx *gin.Context) { func (m *hlsMuxer) handleRequest(ctx *gin.Context) {
atomic.StoreInt64(m.lastRequestTime, time.Now().UnixNano()) atomic.StoreInt64(m.lastRequestTime, time.Now().UnixNano())

36
internal/core/rtmp_conn.go

@ -12,7 +12,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
@ -241,11 +240,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
c.pathName = pathName c.pathName = pathName
c.mutex.Unlock() c.mutex.Unlock()
ringBuffer, _ := ringbuffer.New(uint64(c.writeQueueSize)) writer := newAsyncWriter(c.writeQueueSize, c)
go func() {
<-c.ctx.Done()
ringBuffer.Close()
}()
var medias []*description.Media var medias []*description.Media
var w *rtmp.Writer var w *rtmp.Writer
@ -253,7 +248,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
videoMedia, videoFormat := c.setupVideo( videoMedia, videoFormat := c.setupVideo(
&w, &w,
res.stream, res.stream,
ringBuffer) writer)
if videoMedia != nil { if videoMedia != nil {
medias = append(medias, videoMedia) medias = append(medias, videoMedia)
} }
@ -261,7 +256,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
audioMedia, audioFormat := c.setupAudio( audioMedia, audioFormat := c.setupAudio(
&w, &w,
res.stream, res.stream,
ringBuffer) writer)
if audioFormat != nil { if audioFormat != nil {
medias = append(medias, audioMedia) medias = append(medias, audioMedia)
} }
@ -303,23 +298,22 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
// disable read deadline // disable read deadline
c.nconn.SetReadDeadline(time.Time{}) c.nconn.SetReadDeadline(time.Time{})
for { writer.start()
item, ok := ringBuffer.Pull()
if !ok { select {
case <-c.ctx.Done():
writer.stop()
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
}
err := item.(func() error)() case err := <-writer.error():
if err != nil {
return err return err
} }
}
} }
func (c *rtmpConn) setupVideo( func (c *rtmpConn) setupVideo(
w **rtmp.Writer, w **rtmp.Writer,
stream *stream.Stream, stream *stream.Stream,
ringBuffer *ringbuffer.RingBuffer, writer *asyncWriter,
) (*description.Media, format.Format) { ) (*description.Media, format.Format) {
var videoFormatH264 *format.H264 var videoFormatH264 *format.H264
videoMedia := stream.Desc().FindFormat(&videoFormatH264) videoMedia := stream.Desc().FindFormat(&videoFormatH264)
@ -328,7 +322,7 @@ func (c *rtmpConn) setupVideo(
var videoDTSExtractor *h264.DTSExtractor var videoDTSExtractor *h264.DTSExtractor
stream.AddReader(c, videoMedia, videoFormatH264, func(u unit.Unit) { stream.AddReader(c, videoMedia, videoFormatH264, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.H264) tunit := u.(*unit.H264)
if tunit.AU == nil { if tunit.AU == nil {
@ -392,14 +386,14 @@ func (c *rtmpConn) setupVideo(
func (c *rtmpConn) setupAudio( func (c *rtmpConn) setupAudio(
w **rtmp.Writer, w **rtmp.Writer,
stream *stream.Stream, stream *stream.Stream,
ringBuffer *ringbuffer.RingBuffer, writer *asyncWriter,
) (*description.Media, format.Format) { ) (*description.Media, format.Format) {
var audioFormatMPEG4Generic *format.MPEG4AudioGeneric var audioFormatMPEG4Generic *format.MPEG4AudioGeneric
audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Generic) audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Generic)
if audioMedia != nil { if audioMedia != nil {
stream.AddReader(c, audioMedia, audioFormatMPEG4Generic, func(u unit.Unit) { stream.AddReader(c, audioMedia, audioFormatMPEG4Generic, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.MPEG4AudioGeneric) tunit := u.(*unit.MPEG4AudioGeneric)
if tunit.AUs == nil { if tunit.AUs == nil {
@ -435,7 +429,7 @@ func (c *rtmpConn) setupAudio(
len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 && len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 &&
len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 { len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 {
stream.AddReader(c, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) { stream.AddReader(c, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.MPEG4AudioLATM) tunit := u.(*unit.MPEG4AudioLATM)
if tunit.AU == nil { if tunit.AU == nil {
@ -457,7 +451,7 @@ func (c *rtmpConn) setupAudio(
if audioMedia != nil { if audioMedia != nil {
stream.AddReader(c, audioMedia, audioFormatMPEG1, func(u unit.Unit) { stream.AddReader(c, audioMedia, audioFormatMPEG1, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.MPEG1Audio) tunit := u.(*unit.MPEG1Audio)
pts := tunit.PTS pts := tunit.PTS

6
internal/core/rtsp_server.go

@ -330,6 +330,12 @@ func (s *rtspServer) OnDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx)
se.onDecodeError(ctx) se.onDecodeError(ctx)
} }
// OnDecodeError implements gortsplib.ServerHandlerOnStreamWriteError.
func (s *rtspServer) OnStreamWriteError(ctx *gortsplib.ServerHandlerOnStreamWriteErrorCtx) {
se := ctx.Session.UserData().(*rtspSession)
se.onStreamWriteError(ctx)
}
func (s *rtspServer) findConnByUUID(uuid uuid.UUID) *rtspConn { func (s *rtspServer) findConnByUUID(uuid uuid.UUID) *rtspConn {
for _, c := range s.conns { for _, c := range s.conns {
if c.uuid == uuid { if c.uuid == uuid {

13
internal/core/rtsp_session.go

@ -49,6 +49,8 @@ type rtspSession struct {
state gortsplib.ServerSessionState state gortsplib.ServerSessionState
transport *gortsplib.Transport transport *gortsplib.Transport
pathName string pathName string
prevWarnPrinted time.Time
prevWarnPrintedMutex sync.Mutex
} }
func newRTSPSession( func newRTSPSession(
@ -408,6 +410,17 @@ func (s *rtspSession) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx
s.Log(logger.Warn, ctx.Error.Error()) s.Log(logger.Warn, ctx.Error.Error())
} }
// onStreamWriteError is called by rtspServer.
func (s *rtspSession) onStreamWriteError(ctx *gortsplib.ServerHandlerOnStreamWriteErrorCtx) {
now := time.Now()
s.prevWarnPrintedMutex.Lock()
if now.Sub(s.prevWarnPrinted) >= minIntervalBetweenWarnings {
s.prevWarnPrinted = now
s.Log(logger.Warn, ctx.Error.Error())
}
s.prevWarnPrintedMutex.Unlock()
}
func (s *rtspSession) apiItem() *apiRTSPSession { func (s *rtspSession) apiItem() *apiRTSPSession {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()

32
internal/core/srt_conn.go

@ -12,7 +12,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/bluenviron/mediacommon/pkg/formats/mpegts"
@ -416,11 +415,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
c.conn = sconn c.conn = sconn
c.mutex.Unlock() c.mutex.Unlock()
ringBuffer, _ := ringbuffer.New(uint64(c.writeQueueSize)) writer := newAsyncWriter(c.writeQueueSize, c)
go func() {
<-c.ctx.Done()
ringBuffer.Close()
}()
var w *mpegts.Writer var w *mpegts.Writer
var tracks []*mpegts.Track var tracks []*mpegts.Track
@ -446,7 +441,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
dtsExtractor := h265.NewDTSExtractor() dtsExtractor := h265.NewDTSExtractor()
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(c, medi, forma, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.H265) tunit := u.(*unit.H265)
if tunit.AU == nil { if tunit.AU == nil {
return nil return nil
@ -483,7 +478,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
dtsExtractor := h264.NewDTSExtractor() dtsExtractor := h264.NewDTSExtractor()
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(c, medi, forma, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.H264) tunit := u.(*unit.H264)
if tunit.AU == nil { if tunit.AU == nil {
return nil return nil
@ -519,7 +514,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
}) })
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(c, medi, forma, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.MPEG4AudioGeneric) tunit := u.(*unit.MPEG4AudioGeneric)
if tunit.AUs == nil { if tunit.AUs == nil {
return nil return nil
@ -545,7 +540,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
}) })
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(c, medi, forma, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.MPEG4AudioLATM) tunit := u.(*unit.MPEG4AudioLATM)
if tunit.AU == nil { if tunit.AU == nil {
return nil return nil
@ -574,7 +569,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
}) })
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(c, medi, forma, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.Opus) tunit := u.(*unit.Opus)
if tunit.Packets == nil { if tunit.Packets == nil {
return nil return nil
@ -595,7 +590,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
track := addTrack(medi, &mpegts.CodecMPEG1Audio{}) track := addTrack(medi, &mpegts.CodecMPEG1Audio{})
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(c, medi, forma, func(u unit.Unit) {
ringBuffer.Push(func() error { writer.push(func() error {
tunit := u.(*unit.MPEG1Audio) tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil { if tunit.Frames == nil {
return nil return nil
@ -646,17 +641,16 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
// disable read deadline // disable read deadline
sconn.SetReadDeadline(time.Time{}) sconn.SetReadDeadline(time.Time{})
for { writer.start()
item, ok := ringBuffer.Pull()
if !ok { select {
case <-c.ctx.Done():
writer.stop()
return true, fmt.Errorf("terminated") return true, fmt.Errorf("terminated")
}
err := item.(func() error)() case err := <-writer.error():
if err != nil {
return true, err return true, err
} }
}
} }
func (c *srtConn) exchangeRequestWithConn(req srtNewConnReq) (srt.Conn, error) { func (c *srtConn) exchangeRequestWithConn(req srtNewConnReq) (srt.Conn, error) {

16
internal/core/webrtc_outgoing_track.go

@ -1,7 +1,6 @@
package core package core
import ( import (
"context"
"fmt" "fmt"
"time" "time"
@ -11,7 +10,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
@ -350,11 +348,9 @@ func newWebRTCOutgoingTrackAudio(desc *description.Session) (*webRTCOutgoingTrac
} }
func (t *webRTCOutgoingTrack) start( func (t *webRTCOutgoingTrack) start(
ctx context.Context,
r reader, r reader,
stream *stream.Stream, stream *stream.Stream,
ringBuffer *ringbuffer.RingBuffer, writer *asyncWriter,
writeError chan error,
) { ) {
// read incoming RTCP packets to make interceptors work // read incoming RTCP packets to make interceptors work
go func() { go func() {
@ -368,14 +364,8 @@ func (t *webRTCOutgoingTrack) start(
}() }()
stream.AddReader(r, t.media, t.format, func(u unit.Unit) { stream.AddReader(r, t.media, t.format, func(u unit.Unit) {
ringBuffer.Push(func() { writer.push(func() error {
err := t.cb(u) return t.cb(u)
if err != nil {
select {
case writeError <- err:
case <-ctx.Done():
}
}
}) })
}) })
} }

22
internal/core/webrtc_session.go

@ -11,7 +11,6 @@ import (
"time" "time"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/gortsplib/v4/pkg/rtptime"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/pion/sdp/v3" "github.com/pion/sdp/v3"
@ -512,13 +511,10 @@ func (s *webRTCSession) runRead() (int, error) {
s.pc = pc s.pc = pc
s.mutex.Unlock() s.mutex.Unlock()
ringBuffer, _ := ringbuffer.New(uint64(s.writeQueueSize)) writer := newAsyncWriter(s.writeQueueSize, s)
defer ringBuffer.Close()
writeError := make(chan error)
for _, track := range tracks { for _, track := range tracks {
track.start(s.ctx, s, res.stream, ringBuffer, writeError) track.start(s, res.stream, writer)
} }
defer res.stream.RemoveReader(s) defer res.stream.RemoveReader(s)
@ -526,24 +522,18 @@ func (s *webRTCSession) runRead() (int, error) {
s.Log(logger.Info, "is reading from path '%s', %s", s.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks))) res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks)))
go func() { writer.start()
for {
item, ok := ringBuffer.Pull()
if !ok {
return
}
item.(func())()
}
}()
select { select {
case <-pc.Disconnected(): case <-pc.Disconnected():
writer.stop()
return 0, fmt.Errorf("peer connection closed") return 0, fmt.Errorf("peer connection closed")
case err := <-writeError: case err := <-writer.error():
return 0, err return 0, err
case <-s.ctx.Done(): case <-s.ctx.Done():
writer.stop()
return 0, fmt.Errorf("terminated") return 0, fmt.Errorf("terminated")
} }
} }

Loading…
Cancel
Save