From 5fb7f4e8468b08a23efdec9a5f520cd4daaff8ee Mon Sep 17 00:00:00 2001 From: Alessandro Ros <aler9.dev@gmail.com> Date: Wed, 30 Aug 2023 11:24:14 +0200 Subject: [PATCH] force all readers to use an asynchronous writer (#2265) needed by #2255 --- apidocs/openapi.yaml | 21 +- .../{core => asyncwriter}/async_writer.go | 36 +- internal/conf/conf.go | 11 +- internal/conf/path.go | 43 +- internal/core/hls_manager.go | 2 +- internal/core/hls_muxer.go | 192 +++---- internal/core/hls_source.go | 2 +- internal/core/limited_logger.go | 30 - internal/core/path.go | 512 +++++++++--------- internal/core/path_manager.go | 298 +++++----- internal/core/rtmp_conn.go | 205 ++++--- internal/core/rtsp_session.go | 4 +- internal/core/rtsp_source.go | 2 +- internal/core/source_static.go | 2 +- internal/core/srt_conn.go | 227 ++++---- internal/core/srt_server.go | 2 +- internal/core/srt_source.go | 2 +- internal/core/udp_source.go | 2 +- internal/core/webrtc_manager.go | 2 +- internal/core/webrtc_outgoing_track.go | 10 +- internal/core/webrtc_session.go | 15 +- internal/formatprocessor/processor.go | 2 +- internal/logger/limited_logger.go | 34 ++ internal/stream/stream.go | 15 +- internal/stream/stream_format.go | 48 +- mediamtx.yml | 57 +- 26 files changed, 900 insertions(+), 876 deletions(-) rename internal/{core => asyncwriter}/async_writer.go (54%) delete mode 100644 internal/core/limited_logger.go create mode 100644 internal/logger/limited_logger.go diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 43d6682c..cd150c22 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -18,7 +18,7 @@ components: Conf: type: object properties: - # general + # General logLevel: type: string logDestinations: @@ -169,13 +169,13 @@ components: webrtcICETCPMuxAddress: type: string - # srt + # SRT srt: type: boolean srtAddress: type: string - # paths + # Paths paths: type: object additionalProperties: @@ -184,10 +184,9 @@ components: PathConf: type: object properties: + # General source: type: string - - # general sourceFingerprint: type: string sourceOnDemand: @@ -199,7 +198,7 @@ components: maxReaders: type: number - # authentication + # Authentication publishUser: type: string publishPass: @@ -217,13 +216,13 @@ components: items: type: string - # publisher + # Publisher overridePublisher: type: boolean fallback: type: string - # rtsp + # RTSP sourceProtocol: type: string sourceAnyPortEnable: @@ -233,11 +232,11 @@ components: rtspRangeStart: type: string - # redirect + # Redirect sourceRedirect: type: string - # raspberry pi camera + # Raspberry Pi Camera rpiCameraCamID: type: integer rpiCameraWidth: @@ -303,7 +302,7 @@ components: rpiCameraTextOverlay: type: string - # external commands + # External commands runOnInit: type: string runOnInitRestart: diff --git a/internal/core/async_writer.go b/internal/asyncwriter/async_writer.go similarity index 54% rename from internal/core/async_writer.go rename to internal/asyncwriter/async_writer.go index 905221b1..36761e36 100644 --- a/internal/core/async_writer.go +++ b/internal/asyncwriter/async_writer.go @@ -1,19 +1,16 @@ -package core +// Package asyncwriter contains an asynchronous writer. +package asyncwriter import ( "fmt" - "time" "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" "github.com/bluenviron/mediamtx/internal/logger" ) -const ( - minIntervalBetweenWarnings = 1 * time.Second -) - -type asyncWriter struct { +// Writer is an asynchronous writer. +type Writer struct { writeErrLogger logger.Writer buffer *ringbuffer.RingBuffer @@ -21,37 +18,41 @@ type asyncWriter struct { err chan error } -func newAsyncWriter( +// New allocates a Writer. +func New( queueSize int, parent logger.Writer, -) *asyncWriter { +) *Writer { buffer, _ := ringbuffer.New(uint64(queueSize)) - return &asyncWriter{ - writeErrLogger: newLimitedLogger(parent), + return &Writer{ + writeErrLogger: logger.NewLimitedLogger(parent), buffer: buffer, err: make(chan error), } } -func (w *asyncWriter) start() { +// Start starts the writer routine. +func (w *Writer) Start() { go w.run() } -func (w *asyncWriter) stop() { +// Stop stops the writer routine. +func (w *Writer) Stop() { w.buffer.Close() <-w.err } -func (w *asyncWriter) error() chan error { +// Error returns whenever there's an error. +func (w *Writer) Error() chan error { return w.err } -func (w *asyncWriter) run() { +func (w *Writer) run() { w.err <- w.runInner() } -func (w *asyncWriter) runInner() error { +func (w *Writer) runInner() error { for { cb, ok := w.buffer.Pull() if !ok { @@ -65,7 +66,8 @@ func (w *asyncWriter) runInner() error { } } -func (w *asyncWriter) push(cb func() error) { +// Push appends an element to the queue. +func (w *Writer) Push(cb func() error) { ok := w.buffer.Push(cb) if !ok { w.writeErrLogger.Log(logger.Warn, "write queue is full") diff --git a/internal/conf/conf.go b/internal/conf/conf.go index bfbaf2b4..b7b5fe65 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -89,7 +89,7 @@ func contains(list []headers.AuthMethod, item headers.AuthMethod) bool { // Conf is a configuration. type Conf struct { - // general + // General LogLevel LogLevel `json:"logLevel"` LogDestinations LogDestinations `json:"logDestinations"` LogFile string `json:"logFile"` @@ -169,7 +169,7 @@ type Conf struct { SRT bool `json:"srt"` SRTAddress string `json:"srtAddress"` - // paths + // Paths Paths map[string]*PathConf `json:"paths"` } @@ -218,7 +218,8 @@ func (conf Conf) Clone() *Conf { // Check checks the configuration for errors. func (conf *Conf) Check() error { - // general + // General + if conf.ReadBufferCount != 0 { conf.WriteQueueSize = conf.ReadBufferCount } @@ -240,6 +241,7 @@ func (conf *Conf) Check() error { } // RTSP + if conf.RTSPDisable { conf.RTSP = false } @@ -253,16 +255,19 @@ func (conf *Conf) Check() error { } // RTMP + if conf.RTMPDisable { conf.RTMP = false } // HLS + if conf.HLSDisable { conf.HLS = false } // WebRTC + if conf.WebRTCDisable { conf.WebRTC = false } diff --git a/internal/conf/path.go b/internal/conf/path.go index 61611894..b9dc93d6 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -42,17 +42,15 @@ func IsValidPathName(name string) error { type PathConf struct { Regexp *regexp.Regexp `json:"-"` - // source - Source string `json:"source"` - - // general + // General + Source string `json:"source"` SourceFingerprint string `json:"sourceFingerprint"` SourceOnDemand bool `json:"sourceOnDemand"` SourceOnDemandStartTimeout StringDuration `json:"sourceOnDemandStartTimeout"` SourceOnDemandCloseAfter StringDuration `json:"sourceOnDemandCloseAfter"` MaxReaders int `json:"maxReaders"` - // authentication + // Authentication PublishUser Credential `json:"publishUser"` PublishPass Credential `json:"publishPass"` PublishIPs IPsOrCIDRs `json:"publishIPs"` @@ -60,21 +58,21 @@ type PathConf struct { ReadPass Credential `json:"readPass"` ReadIPs IPsOrCIDRs `json:"readIPs"` - // publisher + // Publisher OverridePublisher bool `json:"overridePublisher"` DisablePublisherOverride bool `json:"disablePublisherOverride"` // deprecated Fallback string `json:"fallback"` - // rtsp + // RTSP SourceProtocol SourceProtocol `json:"sourceProtocol"` SourceAnyPortEnable bool `json:"sourceAnyPortEnable"` RtspRangeType RtspRangeType `json:"rtspRangeType"` RtspRangeStart string `json:"rtspRangeStart"` - // redirect + // Redirect SourceRedirect string `json:"sourceRedirect"` - // raspberry pi camera + // Raspberry Pi Camera RPICameraCamID int `json:"rpiCameraCamID"` RPICameraWidth int `json:"rpiCameraWidth"` RPICameraHeight int `json:"rpiCameraHeight"` @@ -108,7 +106,7 @@ type PathConf struct { RPICameraTextOverlayEnable bool `json:"rpiCameraTextOverlayEnable"` RPICameraTextOverlay string `json:"rpiCameraTextOverlay"` - // external commands + // External commands RunOnInit string `json:"runOnInit"` RunOnInitRestart bool `json:"runOnInitRestart"` RunOnDemand string `json:"runOnDemand"` @@ -140,6 +138,8 @@ func (pconf *PathConf) check(conf *Conf, name string) error { pconf.Regexp = pathRegexp } + // General + switch { case pconf.Source == "publisher": @@ -263,10 +263,11 @@ func (pconf *PathConf) check(conf *Conf, name string) error { } } + // Publisher + if pconf.DisablePublisherOverride { pconf.OverridePublisher = true } - if pconf.Fallback != "" { if strings.HasPrefix(pconf.Fallback, "/") { err := IsValidPathName(pconf.Fallback[1:]) @@ -281,26 +282,24 @@ func (pconf *PathConf) check(conf *Conf, name string) error { } } + // Authentication + if (pconf.PublishUser != "" && pconf.PublishPass == "") || (pconf.PublishUser == "" && pconf.PublishPass != "") { return fmt.Errorf("read username and password must be both filled") } - if pconf.PublishUser != "" && pconf.Source != "publisher" { return fmt.Errorf("'publishUser' is useless when source is not 'publisher', since " + "the stream is not provided by a publisher, but by a fixed source") } - if len(pconf.PublishIPs) > 0 && pconf.Source != "publisher" { return fmt.Errorf("'publishIPs' is useless when source is not 'publisher', since " + "the stream is not provided by a publisher, but by a fixed source") } - if (pconf.ReadUser != "" && pconf.ReadPass == "") || (pconf.ReadUser == "" && pconf.ReadPass != "") { return fmt.Errorf("read username and password must be both filled") } - if contains(conf.AuthMethods, headers.AuthDigest) { if strings.HasPrefix(string(pconf.PublishUser), "sha256:") || strings.HasPrefix(string(pconf.PublishPass), "sha256:") || @@ -309,7 +308,6 @@ func (pconf *PathConf) check(conf *Conf, name string) error { return fmt.Errorf("hashed credentials can't be used when the digest auth method is available") } } - if conf.ExternalAuthenticationURL != "" { if pconf.PublishUser != "" || len(pconf.PublishIPs) > 0 || @@ -319,10 +317,11 @@ func (pconf *PathConf) check(conf *Conf, name string) error { } } + // External commands + if pconf.RunOnInit != "" && pconf.Regexp != nil { return fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path") } - if pconf.RunOnDemand != "" && pconf.Source != "publisher" { return fmt.Errorf("'runOnDemand' can be used only when source is 'publisher'") } @@ -378,17 +377,17 @@ func (pconf PathConf) HasOnDemandPublisher() bool { // UnmarshalJSON implements json.Unmarshaler. It is used to set default values. func (pconf *PathConf) UnmarshalJSON(b []byte) error { - // source + // Source pconf.Source = "publisher" - // general + // General pconf.SourceOnDemandStartTimeout = 10 * StringDuration(time.Second) pconf.SourceOnDemandCloseAfter = 10 * StringDuration(time.Second) - // publisher + // Publisher pconf.OverridePublisher = true - // raspberry pi camera + // Raspberry Pi Camera pconf.RPICameraWidth = 1920 pconf.RPICameraHeight = 1080 pconf.RPICameraContrast = 1 @@ -401,7 +400,7 @@ func (pconf *PathConf) UnmarshalJSON(b []byte) error { pconf.RPICameraLevel = "4.1" pconf.RPICameraTextOverlay = "%Y-%m-%d %H:%M:%S - MediaMTX" - // external commands + // External commands pconf.RunOnDemandStartTimeout = 10 * StringDuration(time.Second) pconf.RunOnDemandCloseAfter = 10 * StringDuration(time.Second) diff --git a/internal/core/hls_manager.go b/internal/core/hls_manager.go index 0f687b1a..2f43ccae 100644 --- a/internal/core/hls_manager.go +++ b/internal/core/hls_manager.go @@ -142,7 +142,7 @@ func newHLSManager( // Log is the main logging function. func (m *hlsManager) Log(level logger.Level, format string, args ...interface{}) { - m.parent.Log(level, "[HLS] "+format, append([]interface{}{}, args...)...) + m.parent.Log(level, "[HLS] "+format, args...) } func (m *hlsManager) close() { diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 7fc67ac8..304159c9 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -17,6 +17,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/gin-gonic/gin" + "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" @@ -75,7 +76,7 @@ type hlsMuxer struct { ctxCancel func() created time.Time path *path - writer *asyncWriter + writer *asyncwriter.Writer lastRequestTime *int64 muxer *gohlslib.Muxer requests []*hlsMuxerHandleRequestReq @@ -207,7 +208,7 @@ func (m *hlsMuxer) run() { innerCtxCancel() if m.remoteAddr == "" { // created with "always remux" - m.Log(logger.Info, "ERR: %v", err) + m.Log(logger.Error, err.Error()) m.clearQueuedRequests() isReady = false isRecreating = true @@ -253,7 +254,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) defer m.path.removeReader(pathRemoveReaderReq{author: m}) - m.writer = newAsyncWriter(m.writeQueueSize, m) + m.writer = asyncwriter.New(m.writeQueueSize, m) var medias []*description.Media @@ -267,7 +268,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) medias = append(medias, audioMedia) } - defer res.stream.RemoveReader(m) + defer res.stream.RemoveReader(m.writer) if medias == nil { return fmt.Errorf( @@ -303,7 +304,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) m.Log(logger.Info, "is converting into HLS, %s", sourceMediaInfo(medias)) - m.writer.start() + m.writer.Start() closeCheckTicker := time.NewTicker(closeCheckPeriod) defer closeCheckTicker.Stop() @@ -314,16 +315,16 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) if m.remoteAddr != "" { t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime)) if time.Since(t) >= closeAfterInactivity { - m.writer.stop() + m.writer.Stop() return fmt.Errorf("not used anymore") } } - case err := <-m.writer.error(): + case err := <-m.writer.Error(): return err case <-innerCtx.Done(): - m.writer.stop() + m.writer.Stop() return fmt.Errorf("terminated") } } @@ -334,22 +335,19 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media, videoMedia := stream.Desc().FindFormat(&videoFormatAV1) if videoFormatAV1 != nil { - stream.AddReader(m, videoMedia, videoFormatAV1, func(u unit.Unit) { - m.writer.push(func() error { - tunit := u.(*unit.AV1) + stream.AddReader(m.writer, videoMedia, videoFormatAV1, func(u unit.Unit) error { + tunit := u.(*unit.AV1) - if tunit.TU == nil { - return nil - } + if tunit.TU == nil { + return nil + } - pts := tunit.PTS - err := m.muxer.WriteAV1(tunit.NTP, pts, tunit.TU) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } + err := m.muxer.WriteAV1(tunit.NTP, tunit.PTS, tunit.TU) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } - return nil - }) + return nil }) return videoMedia, &gohlslib.Track{ @@ -361,22 +359,19 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media, videoMedia = stream.Desc().FindFormat(&videoFormatVP9) if videoFormatVP9 != nil { - stream.AddReader(m, videoMedia, videoFormatVP9, func(u unit.Unit) { - m.writer.push(func() error { - tunit := u.(*unit.VP9) + stream.AddReader(m.writer, videoMedia, videoFormatVP9, func(u unit.Unit) error { + tunit := u.(*unit.VP9) - if tunit.Frame == nil { - return nil - } + if tunit.Frame == nil { + return nil + } - pts := tunit.PTS - err := m.muxer.WriteVP9(tunit.NTP, pts, tunit.Frame) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } + err := m.muxer.WriteVP9(tunit.NTP, tunit.PTS, tunit.Frame) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } - return nil - }) + return nil }) return videoMedia, &gohlslib.Track{ @@ -388,22 +383,19 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media, videoMedia = stream.Desc().FindFormat(&videoFormatH265) if videoFormatH265 != nil { - stream.AddReader(m, videoMedia, videoFormatH265, func(u unit.Unit) { - m.writer.push(func() error { - tunit := u.(*unit.H265) + stream.AddReader(m.writer, videoMedia, videoFormatH265, func(u unit.Unit) error { + tunit := u.(*unit.H265) - if tunit.AU == nil { - return nil - } + if tunit.AU == nil { + return nil + } - pts := tunit.PTS - err := m.muxer.WriteH26x(tunit.NTP, pts, tunit.AU) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } + err := m.muxer.WriteH26x(tunit.NTP, tunit.PTS, tunit.AU) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } - return nil - }) + return nil }) vps, sps, pps := videoFormatH265.SafeParams() @@ -421,22 +413,19 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media, videoMedia = stream.Desc().FindFormat(&videoFormatH264) if videoFormatH264 != nil { - stream.AddReader(m, videoMedia, videoFormatH264, func(u unit.Unit) { - m.writer.push(func() error { - tunit := u.(*unit.H264) + stream.AddReader(m.writer, videoMedia, videoFormatH264, func(u unit.Unit) error { + tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil - } + if tunit.AU == nil { + return nil + } - pts := tunit.PTS - err := m.muxer.WriteH26x(tunit.NTP, pts, tunit.AU) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } + err := m.muxer.WriteH26x(tunit.NTP, tunit.PTS, tunit.AU) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } - return nil - }) + return nil }) sps, pps := videoFormatH264.SafeParams() @@ -457,21 +446,18 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media, audioMedia := stream.Desc().FindFormat(&audioFormatOpus) if audioMedia != nil { - stream.AddReader(m, audioMedia, audioFormatOpus, func(u unit.Unit) { - m.writer.push(func() error { - tunit := u.(*unit.Opus) - - pts := tunit.PTS - err := m.muxer.WriteOpus( - tunit.NTP, - pts, - tunit.Packets) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } + stream.AddReader(m.writer, audioMedia, audioFormatOpus, func(u unit.Unit) error { + tunit := u.(*unit.Opus) + + err := m.muxer.WriteOpus( + tunit.NTP, + tunit.PTS, + tunit.Packets) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } - return nil - }) + return nil }) return audioMedia, &gohlslib.Track{ @@ -490,25 +476,22 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media, audioMedia = stream.Desc().FindFormat(&audioFormatMPEG4AudioGeneric) if audioMedia != nil { - stream.AddReader(m, audioMedia, audioFormatMPEG4AudioGeneric, func(u unit.Unit) { - m.writer.push(func() error { - tunit := u.(*unit.MPEG4AudioGeneric) + stream.AddReader(m.writer, audioMedia, audioFormatMPEG4AudioGeneric, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioGeneric) - if tunit.AUs == nil { - return nil - } + if tunit.AUs == nil { + return nil + } - pts := tunit.PTS - err := m.muxer.WriteMPEG4Audio( - tunit.NTP, - pts, - tunit.AUs) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } + err := m.muxer.WriteMPEG4Audio( + tunit.NTP, + tunit.PTS, + tunit.AUs) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } - return nil - }) + return nil }) return audioMedia, &gohlslib.Track{ @@ -525,25 +508,22 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media, audioFormatMPEG4AudioLATM.Config != nil && len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 && len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 { - stream.AddReader(m, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) { - m.writer.push(func() error { - tunit := u.(*unit.MPEG4AudioLATM) + stream.AddReader(m.writer, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioLATM) - if tunit.AU == nil { - return nil - } + if tunit.AU == nil { + return nil + } - pts := tunit.PTS - err := m.muxer.WriteMPEG4Audio( - tunit.NTP, - pts, - [][]byte{tunit.AU}) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } + err := m.muxer.WriteMPEG4Audio( + tunit.NTP, + tunit.PTS, + [][]byte{tunit.AU}) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } - return nil - }) + return nil }) return audioMedia, &gohlslib.Track{ diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 7ad25e48..37df2544 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -48,7 +48,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan } }() - decodeErrLogger := newLimitedLogger(s) + decodeErrLogger := logger.NewLimitedLogger(s) var c *gohlslib.Client c = &gohlslib.Client{ diff --git a/internal/core/limited_logger.go b/internal/core/limited_logger.go deleted file mode 100644 index d0743a5f..00000000 --- a/internal/core/limited_logger.go +++ /dev/null @@ -1,30 +0,0 @@ -package core - -import ( - "sync" - "time" - - "github.com/bluenviron/mediamtx/internal/logger" -) - -type limitedLogger struct { - w logger.Writer - mutex sync.Mutex - lastPrinted time.Time -} - -func newLimitedLogger(w logger.Writer) *limitedLogger { - return &limitedLogger{ - w: w, - } -} - -func (l *limitedLogger) Log(level logger.Level, format string, args ...interface{}) { - now := time.Now() - l.mutex.Lock() - if now.Sub(l.lastPrinted) >= minIntervalBetweenWarnings { - l.lastPrinted = now - l.w.Log(level, format, args...) - } - l.mutex.Unlock() -} diff --git a/internal/core/path.go b/internal/core/path.go index 7e2ce9de..226c911f 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -380,112 +380,85 @@ func (pa *path) runInner() error { for { select { case <-pa.onDemandStaticSourceReadyTimer.C: - for _, req := range pa.describeRequestsOnHold { - req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.describeRequestsOnHold = nil - - for _, req := range pa.readerAddRequestsOnHold { - req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.readerAddRequestsOnHold = nil - - pa.onDemandStaticSourceStop() + pa.doOnDemandStaticSourceReadyTimer() if pa.shouldClose() { return fmt.Errorf("not in use") } case <-pa.onDemandStaticSourceCloseTimer.C: - pa.setNotReady() - pa.onDemandStaticSourceStop() + pa.doOnDemandStaticSourceCloseTimer() if pa.shouldClose() { return fmt.Errorf("not in use") } case <-pa.onDemandPublisherReadyTimer.C: - for _, req := range pa.describeRequestsOnHold { - req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.describeRequestsOnHold = nil - - for _, req := range pa.readerAddRequestsOnHold { - req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} - } - pa.readerAddRequestsOnHold = nil - - pa.onDemandStopPublisher() + pa.doOnDemandPublisherReadyTimer() if pa.shouldClose() { return fmt.Errorf("not in use") } case <-pa.onDemandPublisherCloseTimer.C: - pa.onDemandStopPublisher() + pa.doOnDemandPublisherCloseTimer() if pa.shouldClose() { return fmt.Errorf("not in use") } case newConf := <-pa.chReloadConf: - if pa.conf.HasStaticSource() { - go pa.source.(*sourceStatic).reloadConf(newConf) - } - - pa.confMutex.Lock() - pa.conf = newConf - pa.confMutex.Unlock() + pa.doReloadConf(newConf) case req := <-pa.chSourceStaticSetReady: - pa.handleSourceStaticSetReady(req) + pa.doSourceStaticSetReady(req) case req := <-pa.chSourceStaticSetNotReady: - pa.handleSourceStaticSetNotReady(req) + pa.doSourceStaticSetNotReady(req) if pa.shouldClose() { return fmt.Errorf("not in use") } case req := <-pa.chDescribe: - pa.handleDescribe(req) + pa.doDescribe(req) if pa.shouldClose() { return fmt.Errorf("not in use") } case req := <-pa.chRemovePublisher: - pa.handleRemovePublisher(req) + pa.doRemovePublisher(req) if pa.shouldClose() { return fmt.Errorf("not in use") } case req := <-pa.chAddPublisher: - pa.handleAddPublisher(req) + pa.doAddPublisher(req) case req := <-pa.chStartPublisher: - pa.handleStartPublisher(req) + pa.doStartPublisher(req) case req := <-pa.chStopPublisher: - pa.handleStopPublisher(req) + pa.doStopPublisher(req) if pa.shouldClose() { return fmt.Errorf("not in use") } case req := <-pa.chAddReader: - pa.handleAddReader(req) + pa.doAddReader(req) if pa.shouldClose() { return fmt.Errorf("not in use") } case req := <-pa.chRemoveReader: - pa.handleRemoveReader(req) + pa.doRemoveReader(req) case req := <-pa.chAPIPathsGet: - pa.handleAPIPathsGet(req) + pa.doAPIPathsGet(req) case <-pa.ctx.Done(): return fmt.Errorf("terminated") @@ -493,167 +466,54 @@ func (pa *path) runInner() error { } } -func (pa *path) shouldClose() bool { - return pa.conf.Regexp != nil && - pa.source == nil && - len(pa.readers) == 0 && - len(pa.describeRequestsOnHold) == 0 && - len(pa.readerAddRequestsOnHold) == 0 -} - -func (pa *path) externalCmdEnv() externalcmd.Environment { - _, port, _ := net.SplitHostPort(pa.rtspAddress) - env := externalcmd.Environment{ - "MTX_PATH": pa.name, - "RTSP_PATH": pa.name, // deprecated - "RTSP_PORT": port, - } - - if len(pa.matches) > 1 { - for i, ma := range pa.matches[1:] { - env["G"+strconv.FormatInt(int64(i+1), 10)] = ma - } - } - - return env -} - -func (pa *path) onDemandStaticSourceStart() { - pa.source.(*sourceStatic).start() - - pa.onDemandStaticSourceReadyTimer.Stop() - pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout)) - - pa.onDemandStaticSourceState = pathOnDemandStateWaitingReady -} - -func (pa *path) onDemandStaticSourceScheduleClose() { - pa.onDemandStaticSourceCloseTimer.Stop() - pa.onDemandStaticSourceCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter)) - - pa.onDemandStaticSourceState = pathOnDemandStateClosing -} - -func (pa *path) onDemandStaticSourceStop() { - if pa.onDemandStaticSourceState == pathOnDemandStateClosing { - pa.onDemandStaticSourceCloseTimer.Stop() - pa.onDemandStaticSourceCloseTimer = newEmptyTimer() - } - - pa.onDemandStaticSourceState = pathOnDemandStateInitial - - pa.source.(*sourceStatic).stop() -} - -func (pa *path) onDemandStartPublisher() { - pa.Log(logger.Info, "runOnDemand command started") - pa.onDemandCmd = externalcmd.NewCmd( - pa.externalCmdPool, - pa.conf.RunOnDemand, - pa.conf.RunOnDemandRestart, - pa.externalCmdEnv(), - func(err error) { - pa.Log(logger.Info, "runOnDemand command exited: %v", err) - }) - - pa.onDemandPublisherReadyTimer.Stop() - pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout)) - - pa.onDemandPublisherState = pathOnDemandStateWaitingReady -} - -func (pa *path) onDemandPublisherScheduleClose() { - pa.onDemandPublisherCloseTimer.Stop() - pa.onDemandPublisherCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter)) - - pa.onDemandPublisherState = pathOnDemandStateClosing -} - -func (pa *path) onDemandStopPublisher() { - if pa.source != nil { - pa.source.(publisher).close() - pa.doPublisherRemove() +func (pa *path) doOnDemandStaticSourceReadyTimer() { + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } + pa.describeRequestsOnHold = nil - if pa.onDemandPublisherState == pathOnDemandStateClosing { - pa.onDemandPublisherCloseTimer.Stop() - pa.onDemandPublisherCloseTimer = newEmptyTimer() + for _, req := range pa.readerAddRequestsOnHold { + req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } + pa.readerAddRequestsOnHold = nil - pa.onDemandPublisherState = pathOnDemandStateInitial - - if pa.onDemandCmd != nil { - pa.onDemandCmd.Close() - pa.onDemandCmd = nil - pa.Log(logger.Info, "runOnDemand command stopped") - } + pa.onDemandStaticSourceStop() } -func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { - stream, err := stream.New( - pa.udpMaxPayloadSize, - desc, - allocateEncoder, - pa.bytesReceived, - newLimitedLogger(pa.source), - ) - if err != nil { - return err - } - - pa.stream = stream - pa.readyTime = time.Now() - - if pa.conf.RunOnReady != "" { - pa.Log(logger.Info, "runOnReady command started") - pa.onReadyCmd = externalcmd.NewCmd( - pa.externalCmdPool, - pa.conf.RunOnReady, - pa.conf.RunOnReadyRestart, - pa.externalCmdEnv(), - func(err error) { - pa.Log(logger.Info, "runOnReady command exited: %v", err) - }) - } - - pa.parent.pathReady(pa) - - return nil +func (pa *path) doOnDemandStaticSourceCloseTimer() { + pa.setNotReady() + pa.onDemandStaticSourceStop() } -func (pa *path) setNotReady() { - pa.parent.pathNotReady(pa) - - for r := range pa.readers { - pa.doRemoveReader(r) - r.close() +func (pa *path) doOnDemandPublisherReadyTimer() { + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } + pa.describeRequestsOnHold = nil - if pa.onReadyCmd != nil { - pa.onReadyCmd.Close() - pa.onReadyCmd = nil - pa.Log(logger.Info, "runOnReady command stopped") + for _, req := range pa.readerAddRequestsOnHold { + req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)} } + pa.readerAddRequestsOnHold = nil - if pa.stream != nil { - pa.stream.Close() - pa.stream = nil - } + pa.onDemandStopPublisher() } -func (pa *path) doRemoveReader(r reader) { - delete(pa.readers, r) +func (pa *path) doOnDemandPublisherCloseTimer() { + pa.onDemandStopPublisher() } -func (pa *path) doPublisherRemove() { - if pa.stream != nil { - pa.setNotReady() +func (pa *path) doReloadConf(newConf *conf.PathConf) { + if pa.conf.HasStaticSource() { + go pa.source.(*sourceStatic).reloadConf(newConf) } - pa.source = nil + pa.confMutex.Lock() + pa.conf = newConf + pa.confMutex.Unlock() } -func (pa *path) handleSourceStaticSetReady(req pathSourceStaticSetReadyReq) { +func (pa *path) doSourceStaticSetReady(req pathSourceStaticSetReadyReq) { err := pa.setReady(req.desc, req.generateRTPPackets) if err != nil { req.res <- pathSourceStaticSetReadyRes{err: err} @@ -674,7 +534,7 @@ func (pa *path) handleSourceStaticSetReady(req pathSourceStaticSetReadyReq) { pa.describeRequestsOnHold = nil for _, req := range pa.readerAddRequestsOnHold { - pa.handleAddReaderPost(req) + pa.addReaderPost(req) } pa.readerAddRequestsOnHold = nil } @@ -682,7 +542,7 @@ func (pa *path) handleSourceStaticSetReady(req pathSourceStaticSetReadyReq) { req.res <- pathSourceStaticSetReadyRes{stream: pa.stream} } -func (pa *path) handleSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) { +func (pa *path) doSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) { pa.setNotReady() // send response before calling onDemandStaticSourceStop() @@ -694,7 +554,7 @@ func (pa *path) handleSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq } } -func (pa *path) handleDescribe(req pathDescribeReq) { +func (pa *path) doDescribe(req pathDescribeReq) { if _, ok := pa.source.(*sourceRedirect); ok { req.res <- pathDescribeRes{ redirect: pa.conf.SourceRedirect, @@ -745,14 +605,14 @@ func (pa *path) handleDescribe(req pathDescribeReq) { req.res <- pathDescribeRes{err: errPathNoOnePublishing{pathName: pa.name}} } -func (pa *path) handleRemovePublisher(req pathRemovePublisherReq) { +func (pa *path) doRemovePublisher(req pathRemovePublisherReq) { if pa.source == req.author { - pa.doPublisherRemove() + pa.executeRemovePublisher() } close(req.res) } -func (pa *path) handleAddPublisher(req pathAddPublisherReq) { +func (pa *path) doAddPublisher(req pathAddPublisherReq) { if pa.conf.Source != "publisher" { req.res <- pathAddPublisherRes{ err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name), @@ -768,7 +628,7 @@ func (pa *path) handleAddPublisher(req pathAddPublisherReq) { pa.Log(logger.Info, "closing existing publisher") pa.source.(publisher).close() - pa.doPublisherRemove() + pa.executeRemovePublisher() } pa.source = req.author @@ -776,7 +636,7 @@ func (pa *path) handleAddPublisher(req pathAddPublisherReq) { req.res <- pathAddPublisherRes{path: pa} } -func (pa *path) handleStartPublisher(req pathStartPublisherReq) { +func (pa *path) doStartPublisher(req pathStartPublisherReq) { if pa.source != req.author { req.res <- pathStartPublisherRes{err: fmt.Errorf("publisher is not assigned to this path anymore")} return @@ -806,7 +666,7 @@ func (pa *path) handleStartPublisher(req pathStartPublisherReq) { pa.describeRequestsOnHold = nil for _, req := range pa.readerAddRequestsOnHold { - pa.handleAddReaderPost(req) + pa.addReaderPost(req) } pa.readerAddRequestsOnHold = nil } @@ -814,35 +674,16 @@ func (pa *path) handleStartPublisher(req pathStartPublisherReq) { req.res <- pathStartPublisherRes{stream: pa.stream} } -func (pa *path) handleStopPublisher(req pathStopPublisherReq) { +func (pa *path) doStopPublisher(req pathStopPublisherReq) { if req.author == pa.source && pa.stream != nil { pa.setNotReady() } close(req.res) } -func (pa *path) handleRemoveReader(req pathRemoveReaderReq) { - if _, ok := pa.readers[req.author]; ok { - pa.doRemoveReader(req.author) - } - close(req.res) - - if len(pa.readers) == 0 { - if pa.conf.HasOnDemandStaticSource() { - if pa.onDemandStaticSourceState == pathOnDemandStateReady { - pa.onDemandStaticSourceScheduleClose() - } - } else if pa.conf.HasOnDemandPublisher() { - if pa.onDemandPublisherState == pathOnDemandStateReady { - pa.onDemandPublisherScheduleClose() - } - } - } -} - -func (pa *path) handleAddReader(req pathAddReaderReq) { +func (pa *path) doAddReader(req pathAddReaderReq) { if pa.stream != nil { - pa.handleAddReaderPost(req) + pa.addReaderPost(req) return } @@ -865,45 +706,26 @@ func (pa *path) handleAddReader(req pathAddReaderReq) { req.res <- pathAddReaderRes{err: errPathNoOnePublishing{pathName: pa.name}} } -func (pa *path) handleAddReaderPost(req pathAddReaderReq) { +func (pa *path) doRemoveReader(req pathRemoveReaderReq) { if _, ok := pa.readers[req.author]; ok { - req.res <- pathAddReaderRes{ - path: pa, - stream: pa.stream, - } - return + pa.executeRemoveReader(req.author) } + close(req.res) - if pa.conf.MaxReaders != 0 && len(pa.readers) >= pa.conf.MaxReaders { - req.res <- pathAddReaderRes{ - err: fmt.Errorf("maximum reader count reached"), - } - return - } - - pa.readers[req.author] = struct{}{} - - if pa.conf.HasOnDemandStaticSource() { - if pa.onDemandStaticSourceState == pathOnDemandStateClosing { - pa.onDemandStaticSourceState = pathOnDemandStateReady - pa.onDemandStaticSourceCloseTimer.Stop() - pa.onDemandStaticSourceCloseTimer = newEmptyTimer() - } - } else if pa.conf.HasOnDemandPublisher() { - if pa.onDemandPublisherState == pathOnDemandStateClosing { - pa.onDemandPublisherState = pathOnDemandStateReady - pa.onDemandPublisherCloseTimer.Stop() - pa.onDemandPublisherCloseTimer = newEmptyTimer() + if len(pa.readers) == 0 { + if pa.conf.HasOnDemandStaticSource() { + if pa.onDemandStaticSourceState == pathOnDemandStateReady { + pa.onDemandStaticSourceScheduleClose() + } + } else if pa.conf.HasOnDemandPublisher() { + if pa.onDemandPublisherState == pathOnDemandStateReady { + pa.onDemandPublisherScheduleClose() + } } } - - req.res <- pathAddReaderRes{ - path: pa, - stream: pa.stream, - } } -func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) { +func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) { req.res <- pathAPIPathsGetRes{ data: &apiPath{ Name: pa.name, @@ -942,6 +764,204 @@ func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) { } } +func (pa *path) shouldClose() bool { + return pa.conf.Regexp != nil && + pa.source == nil && + len(pa.readers) == 0 && + len(pa.describeRequestsOnHold) == 0 && + len(pa.readerAddRequestsOnHold) == 0 +} + +func (pa *path) externalCmdEnv() externalcmd.Environment { + _, port, _ := net.SplitHostPort(pa.rtspAddress) + env := externalcmd.Environment{ + "MTX_PATH": pa.name, + "RTSP_PATH": pa.name, // deprecated + "RTSP_PORT": port, + } + + if len(pa.matches) > 1 { + for i, ma := range pa.matches[1:] { + env["G"+strconv.FormatInt(int64(i+1), 10)] = ma + } + } + + return env +} + +func (pa *path) onDemandStaticSourceStart() { + pa.source.(*sourceStatic).start() + + pa.onDemandStaticSourceReadyTimer.Stop() + pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout)) + + pa.onDemandStaticSourceState = pathOnDemandStateWaitingReady +} + +func (pa *path) onDemandStaticSourceScheduleClose() { + pa.onDemandStaticSourceCloseTimer.Stop() + pa.onDemandStaticSourceCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter)) + + pa.onDemandStaticSourceState = pathOnDemandStateClosing +} + +func (pa *path) onDemandStaticSourceStop() { + if pa.onDemandStaticSourceState == pathOnDemandStateClosing { + pa.onDemandStaticSourceCloseTimer.Stop() + pa.onDemandStaticSourceCloseTimer = newEmptyTimer() + } + + pa.onDemandStaticSourceState = pathOnDemandStateInitial + + pa.source.(*sourceStatic).stop() +} + +func (pa *path) onDemandStartPublisher() { + pa.Log(logger.Info, "runOnDemand command started") + pa.onDemandCmd = externalcmd.NewCmd( + pa.externalCmdPool, + pa.conf.RunOnDemand, + pa.conf.RunOnDemandRestart, + pa.externalCmdEnv(), + func(err error) { + pa.Log(logger.Info, "runOnDemand command exited: %v", err) + }) + + pa.onDemandPublisherReadyTimer.Stop() + pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout)) + + pa.onDemandPublisherState = pathOnDemandStateWaitingReady +} + +func (pa *path) onDemandPublisherScheduleClose() { + pa.onDemandPublisherCloseTimer.Stop() + pa.onDemandPublisherCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter)) + + pa.onDemandPublisherState = pathOnDemandStateClosing +} + +func (pa *path) onDemandStopPublisher() { + if pa.source != nil { + pa.source.(publisher).close() + pa.executeRemovePublisher() + } + + if pa.onDemandPublisherState == pathOnDemandStateClosing { + pa.onDemandPublisherCloseTimer.Stop() + pa.onDemandPublisherCloseTimer = newEmptyTimer() + } + + pa.onDemandPublisherState = pathOnDemandStateInitial + + if pa.onDemandCmd != nil { + pa.onDemandCmd.Close() + pa.onDemandCmd = nil + pa.Log(logger.Info, "runOnDemand command stopped") + } +} + +func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { + var err error + pa.stream, err = stream.New( + pa.udpMaxPayloadSize, + desc, + allocateEncoder, + pa.bytesReceived, + logger.NewLimitedLogger(pa.source), + ) + if err != nil { + return err + } + + pa.readyTime = time.Now() + + if pa.conf.RunOnReady != "" { + pa.Log(logger.Info, "runOnReady command started") + pa.onReadyCmd = externalcmd.NewCmd( + pa.externalCmdPool, + pa.conf.RunOnReady, + pa.conf.RunOnReadyRestart, + pa.externalCmdEnv(), + func(err error) { + pa.Log(logger.Info, "runOnReady command exited: %v", err) + }) + } + + pa.parent.pathReady(pa) + + return nil +} + +func (pa *path) setNotReady() { + pa.parent.pathNotReady(pa) + + for r := range pa.readers { + pa.executeRemoveReader(r) + r.close() + } + + if pa.onReadyCmd != nil { + pa.onReadyCmd.Close() + pa.onReadyCmd = nil + pa.Log(logger.Info, "runOnReady command stopped") + } + + if pa.stream != nil { + pa.stream.Close() + pa.stream = nil + } +} + +func (pa *path) executeRemoveReader(r reader) { + delete(pa.readers, r) +} + +func (pa *path) executeRemovePublisher() { + if pa.stream != nil { + pa.setNotReady() + } + + pa.source = nil +} + +func (pa *path) addReaderPost(req pathAddReaderReq) { + if _, ok := pa.readers[req.author]; ok { + req.res <- pathAddReaderRes{ + path: pa, + stream: pa.stream, + } + return + } + + if pa.conf.MaxReaders != 0 && len(pa.readers) >= pa.conf.MaxReaders { + req.res <- pathAddReaderRes{ + err: fmt.Errorf("maximum reader count reached"), + } + return + } + + pa.readers[req.author] = struct{}{} + + if pa.conf.HasOnDemandStaticSource() { + if pa.onDemandStaticSourceState == pathOnDemandStateClosing { + pa.onDemandStaticSourceState = pathOnDemandStateReady + pa.onDemandStaticSourceCloseTimer.Stop() + pa.onDemandStaticSourceCloseTimer = newEmptyTimer() + } + } else if pa.conf.HasOnDemandPublisher() { + if pa.onDemandPublisherState == pathOnDemandStateClosing { + pa.onDemandPublisherState = pathOnDemandStateReady + pa.onDemandPublisherCloseTimer.Stop() + pa.onDemandPublisherCloseTimer = newEmptyTimer() + } + } + + req.res <- pathAddReaderRes{ + path: pa, + stream: pa.stream, + } +} + // reloadConf is called by pathManager. func (pa *path) reloadConf(newConf *conf.PathConf) { select { diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 1cf39ce1..9694b0f3 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -85,6 +85,7 @@ type pathManager struct { // in chReloadConf chan map[string]*conf.PathConf + chSetHLSManager chan pathManagerHLSManager chClosePath chan *path chPathReady chan *path chPathNotReady chan *path @@ -92,7 +93,6 @@ type pathManager struct { chDescribe chan pathDescribeReq chAddReader chan pathAddReaderReq chAddPublisher chan pathAddPublisherReq - chSetHLSManager chan pathManagerHLSManager chAPIPathsList chan pathAPIPathsListReq chAPIPathsGet chan pathAPIPathsGetReq } @@ -129,6 +129,7 @@ func newPathManager( paths: make(map[string]*path), pathsByConf: make(map[string]map[*path]struct{}), chReloadConf: make(chan map[string]*conf.PathConf), + chSetHLSManager: make(chan pathManagerHLSManager), chClosePath: make(chan *path), chPathReady: make(chan *path), chPathNotReady: make(chan *path), @@ -136,7 +137,6 @@ func newPathManager( chDescribe: make(chan pathDescribeReq), chAddReader: make(chan pathAddReaderReq), chAddPublisher: make(chan pathAddPublisherReq), - chSetHLSManager: make(chan pathManagerHLSManager), chAPIPathsList: make(chan pathAPIPathsListReq), chAPIPathsGet: make(chan pathAPIPathsGetReq), } @@ -177,168 +177,212 @@ outer: for { select { case newPathConfs := <-pm.chReloadConf: - for confName, pathConf := range pm.pathConfs { - if newPathConf, ok := newPathConfs[confName]; ok { - // configuration has changed - if !newPathConf.Equal(pathConf) { - if pathConfCanBeUpdated(pathConf, newPathConf) { // paths associated with the configuration can be updated - for pa := range pm.pathsByConf[confName] { - go pa.reloadConf(newPathConf) - } - } else { // paths associated with the configuration must be recreated - for pa := range pm.pathsByConf[confName] { - pm.removePath(pa) - pa.close() - pa.wait() // avoid conflicts between sources - } - } - } - } else { - // configuration has been deleted, remove associated paths - for pa := range pm.pathsByConf[confName] { - pm.removePath(pa) - pa.close() - pa.wait() // avoid conflicts between sources - } - } - } - - pm.pathConfs = newPathConfs + pm.doReloadConf(newPathConfs) - // add new paths - for pathConfName, pathConf := range pm.pathConfs { - if _, ok := pm.paths[pathConfName]; !ok && pathConf.Regexp == nil { - pm.createPath(pathConfName, pathConf, pathConfName, nil) - } - } + case m := <-pm.chSetHLSManager: + pm.doSetHLSManager(m) case pa := <-pm.chClosePath: - if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa { - continue - } - pm.removePath(pa) + pm.doClosePath(pa) case pa := <-pm.chPathReady: - if pm.hlsManager != nil { - pm.hlsManager.pathReady(pa) - } + pm.doPathReady(pa) case pa := <-pm.chPathNotReady: - if pm.hlsManager != nil { - pm.hlsManager.pathNotReady(pa) - } + pm.doPathNotReady(pa) case req := <-pm.chGetConfForPath: - _, pathConf, _, err := getConfForPath(pm.pathConfs, req.name) - if err != nil { - req.res <- pathGetConfForPathRes{err: err} - continue - } + pm.doGetConfForPath(req) - err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, - req.name, pathConf, req.publish, req.credentials) - if err != nil { - req.res <- pathGetConfForPathRes{err: err} - continue - } + case req := <-pm.chDescribe: + pm.doDescribe(req) - req.res <- pathGetConfForPathRes{conf: pathConf} + case req := <-pm.chAddReader: + pm.doAddReader(req) - case req := <-pm.chDescribe: - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) - if err != nil { - req.res <- pathDescribeRes{err: err} - continue - } + case req := <-pm.chAddPublisher: + pm.doAddPublisher(req) - err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) - if err != nil { - req.res <- pathDescribeRes{err: err} - continue - } + case req := <-pm.chAPIPathsList: + pm.doAPIPathsList(req) - // create path if it doesn't exist - if _, ok := pm.paths[req.pathName]; !ok { - pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) - } + case req := <-pm.chAPIPathsGet: + pm.doAPIPathsGet(req) - req.res <- pathDescribeRes{path: pm.paths[req.pathName]} + case <-pm.ctx.Done(): + break outer + } + } - case req := <-pm.chAddReader: - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) - if err != nil { - req.res <- pathAddReaderRes{err: err} - continue - } + pm.ctxCancel() - if !req.skipAuth { - err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) - if err != nil { - req.res <- pathAddReaderRes{err: err} - continue + if pm.metrics != nil { + pm.metrics.pathManagerSet(nil) + } +} + +func (pm *pathManager) doReloadConf(newPathConfs map[string]*conf.PathConf) { + for confName, pathConf := range pm.pathConfs { + if newPathConf, ok := newPathConfs[confName]; ok { + // configuration has changed + if !newPathConf.Equal(pathConf) { + if pathConfCanBeUpdated(pathConf, newPathConf) { // paths associated with the configuration can be updated + for pa := range pm.pathsByConf[confName] { + go pa.reloadConf(newPathConf) + } + } else { // paths associated with the configuration must be recreated + for pa := range pm.pathsByConf[confName] { + pm.removePath(pa) + pa.close() + pa.wait() // avoid conflicts between sources + } } } - - // create path if it doesn't exist - if _, ok := pm.paths[req.pathName]; !ok { - pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) + } else { + // configuration has been deleted, remove associated paths + for pa := range pm.pathsByConf[confName] { + pm.removePath(pa) + pa.close() + pa.wait() // avoid conflicts between sources } + } + } - req.res <- pathAddReaderRes{path: pm.paths[req.pathName]} + pm.pathConfs = newPathConfs - case req := <-pm.chAddPublisher: - pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) - if err != nil { - req.res <- pathAddPublisherRes{err: err} - continue - } + // add new paths + for pathConfName, pathConf := range pm.pathConfs { + if _, ok := pm.paths[pathConfName]; !ok && pathConf.Regexp == nil { + pm.createPath(pathConfName, pathConf, pathConfName, nil) + } + } +} - if !req.skipAuth { - err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials) - if err != nil { - req.res <- pathAddPublisherRes{err: err} - continue - } - } +func (pm *pathManager) doSetHLSManager(m pathManagerHLSManager) { + pm.hlsManager = m +} - // create path if it doesn't exist - if _, ok := pm.paths[req.pathName]; !ok { - pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) - } +func (pm *pathManager) doClosePath(pa *path) { + if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa { + return + } + pm.removePath(pa) +} - req.res <- pathAddPublisherRes{path: pm.paths[req.pathName]} +func (pm *pathManager) doPathReady(pa *path) { + if pm.hlsManager != nil { + pm.hlsManager.pathReady(pa) + } +} - case s := <-pm.chSetHLSManager: - pm.hlsManager = s +func (pm *pathManager) doPathNotReady(pa *path) { + if pm.hlsManager != nil { + pm.hlsManager.pathNotReady(pa) + } +} - case req := <-pm.chAPIPathsList: - paths := make(map[string]*path) +func (pm *pathManager) doGetConfForPath(req pathGetConfForPathReq) { + _, pathConf, _, err := getConfForPath(pm.pathConfs, req.name) + if err != nil { + req.res <- pathGetConfForPathRes{err: err} + return + } - for name, pa := range pm.paths { - paths[name] = pa - } + err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, + req.name, pathConf, req.publish, req.credentials) + if err != nil { + req.res <- pathGetConfForPathRes{err: err} + return + } - req.res <- pathAPIPathsListRes{paths: paths} + req.res <- pathGetConfForPathRes{conf: pathConf} +} - case req := <-pm.chAPIPathsGet: - path, ok := pm.paths[req.name] - if !ok { - req.res <- pathAPIPathsGetRes{err: errAPINotFound} - continue - } +func (pm *pathManager) doDescribe(req pathDescribeReq) { + pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) + if err != nil { + req.res <- pathDescribeRes{err: err} + return + } - req.res <- pathAPIPathsGetRes{path: path} + err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) + if err != nil { + req.res <- pathDescribeRes{err: err} + return + } - case <-pm.ctx.Done(): - break outer + // create path if it doesn't exist + if _, ok := pm.paths[req.pathName]; !ok { + pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) + } + + req.res <- pathDescribeRes{path: pm.paths[req.pathName]} +} + +func (pm *pathManager) doAddReader(req pathAddReaderReq) { + pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) + if err != nil { + req.res <- pathAddReaderRes{err: err} + return + } + + if !req.skipAuth { + err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) + if err != nil { + req.res <- pathAddReaderRes{err: err} + return } } - pm.ctxCancel() + // create path if it doesn't exist + if _, ok := pm.paths[req.pathName]; !ok { + pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) + } - if pm.metrics != nil { - pm.metrics.pathManagerSet(nil) + req.res <- pathAddReaderRes{path: pm.paths[req.pathName]} +} + +func (pm *pathManager) doAddPublisher(req pathAddPublisherReq) { + pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) + if err != nil { + req.res <- pathAddPublisherRes{err: err} + return + } + + if !req.skipAuth { + err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials) + if err != nil { + req.res <- pathAddPublisherRes{err: err} + return + } + } + + // create path if it doesn't exist + if _, ok := pm.paths[req.pathName]; !ok { + pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) + } + + req.res <- pathAddPublisherRes{path: pm.paths[req.pathName]} +} + +func (pm *pathManager) doAPIPathsList(req pathAPIPathsListReq) { + paths := make(map[string]*path) + + for name, pa := range pm.paths { + paths[name] = pa } + + req.res <- pathAPIPathsListRes{paths: paths} +} + +func (pm *pathManager) doAPIPathsGet(req pathAPIPathsGetReq) { + path, ok := pm.paths[req.name] + if !ok { + req.res <- pathAPIPathsGetRes{err: errAPINotFound} + return + } + + req.res <- pathAPIPathsGetRes{path: path} } func (pm *pathManager) createPath( diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 3519081a..3f230c7f 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -17,6 +17,7 @@ import ( "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/google/uuid" + "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" @@ -240,7 +241,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { c.pathName = pathName c.mutex.Unlock() - writer := newAsyncWriter(c.writeQueueSize, c) + writer := asyncwriter.New(c.writeQueueSize, c) var medias []*description.Media var w *rtmp.Writer @@ -266,7 +267,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { "the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio") } - defer res.stream.RemoveReader(c) + defer res.stream.RemoveReader(writer) c.Log(logger.Info, "is reading from path '%s', %s", res.path.name, sourceMediaInfo(medias)) @@ -298,14 +299,14 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { // disable read deadline c.nconn.SetReadDeadline(time.Time{}) - writer.start() + writer.Start() select { case <-c.ctx.Done(): - writer.stop() + writer.Stop() return fmt.Errorf("terminated") - case err := <-writer.error(): + case err := <-writer.Error(): return err } } @@ -313,7 +314,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { func (c *rtmpConn) setupVideo( w **rtmp.Writer, stream *stream.Stream, - writer *asyncWriter, + writer *asyncwriter.Writer, ) (*description.Media, format.Format) { var videoFormatH264 *format.H264 videoMedia := stream.Desc().FindFormat(&videoFormatH264) @@ -321,60 +322,56 @@ func (c *rtmpConn) setupVideo( if videoFormatH264 != nil { var videoDTSExtractor *h264.DTSExtractor - stream.AddReader(c, videoMedia, videoFormatH264, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.H264) + stream.AddReader(writer, videoMedia, videoFormatH264, func(u unit.Unit) error { + tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil + if tunit.AU == nil { + return nil + } + + idrPresent := false + nonIDRPresent := false + + for _, nalu := range tunit.AU { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeIDR: + idrPresent = true + + case h264.NALUTypeNonIDR: + nonIDRPresent = true } + } - pts := tunit.PTS + var dts time.Duration - idrPresent := false - nonIDRPresent := false + // wait until we receive an IDR + if videoDTSExtractor == nil { + if !idrPresent { + return nil + } - for _, nalu := range tunit.AU { - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeIDR: - idrPresent = true + videoDTSExtractor = h264.NewDTSExtractor() - case h264.NALUTypeNonIDR: - nonIDRPresent = true - } + var err error + dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + } else { + if !idrPresent && !nonIDRPresent { + return nil } - var dts time.Duration - - // wait until we receive an IDR - if videoDTSExtractor == nil { - if !idrPresent { - return nil - } - - videoDTSExtractor = h264.NewDTSExtractor() - - var err error - dts, err = videoDTSExtractor.Extract(tunit.AU, pts) - if err != nil { - return err - } - } else { - if !idrPresent && !nonIDRPresent { - return nil - } - - var err error - dts, err = videoDTSExtractor.Extract(tunit.AU, pts) - if err != nil { - return err - } + var err error + dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err } + } - c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - return (*w).WriteH264(pts, dts, idrPresent, tunit.AU) - }) + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU) }) return videoMedia, videoFormatH264 @@ -386,36 +383,32 @@ func (c *rtmpConn) setupVideo( func (c *rtmpConn) setupAudio( w **rtmp.Writer, stream *stream.Stream, - writer *asyncWriter, + writer *asyncwriter.Writer, ) (*description.Media, format.Format) { var audioFormatMPEG4Generic *format.MPEG4AudioGeneric audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Generic) if audioMedia != nil { - stream.AddReader(c, audioMedia, audioFormatMPEG4Generic, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.MPEG4AudioGeneric) + stream.AddReader(writer, audioMedia, audioFormatMPEG4Generic, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioGeneric) - if tunit.AUs == nil { - return nil - } + if tunit.AUs == nil { + return nil + } - pts := tunit.PTS - - for i, au := range tunit.AUs { - c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err := (*w).WriteMPEG4Audio( - pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* - time.Second/time.Duration(audioFormatMPEG4Generic.ClockRate()), - au, - ) - if err != nil { - return err - } + for i, au := range tunit.AUs { + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err := (*w).WriteMPEG4Audio( + tunit.PTS+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* + time.Second/time.Duration(audioFormatMPEG4Generic.ClockRate()), + au, + ) + if err != nil { + return err } + } - return nil - }) + return nil }) return audioMedia, audioFormatMPEG4Generic @@ -428,19 +421,15 @@ func (c *rtmpConn) setupAudio( audioFormatMPEG4AudioLATM.Config != nil && len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 && len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 { - stream.AddReader(c, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.MPEG4AudioLATM) - - if tunit.AU == nil { - return nil - } + stream.AddReader(writer, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioLATM) - pts := tunit.PTS + if tunit.AU == nil { + return nil + } - c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - return (*w).WriteMPEG4Audio(pts, tunit.AU) - }) + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + return (*w).WriteMPEG4Audio(tunit.PTS, tunit.AU) }) return audioMedia, audioFormatMPEG4AudioLATM @@ -450,35 +439,33 @@ func (c *rtmpConn) setupAudio( audioMedia = stream.Desc().FindFormat(&audioFormatMPEG1) if audioMedia != nil { - stream.AddReader(c, audioMedia, audioFormatMPEG1, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.MPEG1Audio) - - pts := tunit.PTS - - for _, frame := range tunit.Frames { - var h mpeg1audio.FrameHeader - err := h.Unmarshal(frame) - if err != nil { - return err - } - - if !(!h.MPEG2 && h.Layer == 3) { - return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio") - } - - c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = (*w).WriteMPEG1Audio(pts, &h, frame) - if err != nil { - return err - } - - pts += time.Duration(h.SampleCount()) * - time.Second / time.Duration(h.SampleRate) + stream.AddReader(writer, audioMedia, audioFormatMPEG1, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + + pts := tunit.PTS + + for _, frame := range tunit.Frames { + var h mpeg1audio.FrameHeader + err := h.Unmarshal(frame) + if err != nil { + return err } - return nil - }) + if !(!h.MPEG2 && h.Layer == 3) { + return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio") + } + + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = (*w).WriteMPEG1Audio(pts, &h, frame) + if err != nil { + return err + } + + pts += time.Duration(h.SampleCount()) * + time.Second / time.Duration(h.SampleRate) + } + + return nil }) return audioMedia, audioFormatMPEG1 diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index da43bdb9..744a7a69 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -74,8 +74,8 @@ func newRTSPSession( created: time.Now(), } - s.decodeErrLogger = newLimitedLogger(s) - s.writeErrLogger = newLimitedLogger(s) + s.decodeErrLogger = logger.NewLimitedLogger(s) + s.writeErrLogger = logger.NewLimitedLogger(s) s.Log(logger.Info, "created by %v", s.author.NetConn().RemoteAddr()) diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index ef00174c..c9b11bda 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -94,7 +94,7 @@ func (s *rtspSource) Log(level logger.Level, format string, args ...interface{}) func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error { s.Log(logger.Debug, "connecting") - decodeErrLogger := newLimitedLogger(s) + decodeErrLogger := logger.NewLimitedLogger(s) c := &gortsplib.Client{ Transport: cnf.SourceProtocol.Transport, diff --git a/internal/core/source_static.go b/internal/core/source_static.go index 0079c242..8de1da01 100644 --- a/internal/core/source_static.go +++ b/internal/core/source_static.go @@ -167,7 +167,7 @@ func (s *sourceStatic) run() { select { case err := <-implErr: innerCtxCancel() - s.impl.Log(logger.Info, "ERR: %v", err) + s.impl.Log(logger.Error, err.Error()) recreating = true recreateTimer = time.NewTimer(sourceStaticRetryPause) diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index 184b45c9..abd2d4ce 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -18,6 +18,7 @@ import ( "github.com/datarhei/gosrt" "github.com/google/uuid" + "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" @@ -234,7 +235,7 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { return err } - decodeErrLogger := newLimitedLogger(c) + decodeErrLogger := logger.NewLimitedLogger(c) r.OnDecodeError(func(err error) { decodeErrLogger.Log(logger.Warn, err.Error()) @@ -421,7 +422,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass c.conn = sconn c.mutex.Unlock() - writer := newAsyncWriter(c.writeQueueSize, c) + writer := asyncwriter.New(c.writeQueueSize, c) var w *mpegts.Writer var tracks []*mpegts.Track @@ -443,75 +444,67 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass case *format.H265: //nolint:dupl track := addTrack(medi, &mpegts.CodecH265{}) - randomAccessReceived := false - dtsExtractor := h265.NewDTSExtractor() + var dtsExtractor *h265.DTSExtractor - res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.H265) - if tunit.AU == nil { - return nil - } + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.H265) + if tunit.AU == nil { + return nil + } - randomAccess := h265.IsRandomAccess(tunit.AU) - - if !randomAccessReceived { - if !randomAccess { - return nil - } - randomAccessReceived = true - } - - pts := tunit.PTS - dts, err := dtsExtractor.Extract(tunit.AU, pts) - if err != nil { - return err - } + randomAccess := h265.IsRandomAccess(tunit.AU) - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), randomAccess, tunit.AU) - if err != nil { - return err + if dtsExtractor == nil { + if !randomAccess { + return nil } - return bw.Flush() - }) + dtsExtractor = h265.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + if err != nil { + return err + } + return bw.Flush() }) case *format.H264: //nolint:dupl track := addTrack(medi, &mpegts.CodecH264{}) - firstIDRReceived := false - dtsExtractor := h264.NewDTSExtractor() + var dtsExtractor *h264.DTSExtractor - res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil - } + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.H264) + if tunit.AU == nil { + return nil + } - idrPresent := h264.IDRPresent(tunit.AU) + idrPresent := h264.IDRPresent(tunit.AU) - if !firstIDRReceived { - if !idrPresent { - return nil - } - firstIDRReceived = true - } - - pts := tunit.PTS - dts, err := dtsExtractor.Extract(tunit.AU, pts) - if err != nil { - return err - } - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, tunit.AU) - if err != nil { - return err + if dtsExtractor == nil { + if !idrPresent { + return nil } - return bw.Flush() - }) + dtsExtractor = h264.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU) + if err != nil { + return err + } + return bw.Flush() }) case *format.MPEG4AudioGeneric: @@ -519,22 +512,18 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass Config: *forma.Config, }) - res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.MPEG4AudioGeneric) - if tunit.AUs == nil { - return nil - } - - pts := tunit.PTS - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(pts), tunit.AUs) - if err != nil { - return err - } - return bw.Flush() - }) + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioGeneric) + if tunit.AUs == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + if err != nil { + return err + } + return bw.Flush() }) case *format.MPEG4AudioLATM: @@ -545,22 +534,18 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass Config: *forma.Config.Programs[0].Layers[0].AudioSpecificConfig, }) - res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.MPEG4AudioLATM) - if tunit.AU == nil { - return nil - } - - pts := tunit.PTS - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(pts), [][]byte{tunit.AU}) - if err != nil { - return err - } - return bw.Flush() - }) + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioLATM) + if tunit.AU == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), [][]byte{tunit.AU}) + if err != nil { + return err + } + return bw.Flush() }) } @@ -574,43 +559,35 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass }(), }) - res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.Opus) - if tunit.Packets == nil { - return nil - } - - pts := tunit.PTS - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteOpus(track, durationGoToMPEGTS(pts), tunit.Packets) - if err != nil { - return err - } - return bw.Flush() - }) + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.Opus) + if tunit.Packets == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + if err != nil { + return err + } + return bw.Flush() }) case *format.MPEG1Audio: track := addTrack(medi, &mpegts.CodecMPEG1Audio{}) - res.stream.AddReader(c, medi, forma, func(u unit.Unit) { - writer.push(func() error { - tunit := u.(*unit.MPEG1Audio) - if tunit.Frames == nil { - return nil - } - - pts := tunit.PTS - - sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEG1Audio(track, durationGoToMPEGTS(pts), tunit.Frames) - if err != nil { - return err - } - return bw.Flush() - }) + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + if tunit.Frames == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) + if err != nil { + return err + } + return bw.Flush() }) } } @@ -647,14 +624,14 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass // disable read deadline sconn.SetReadDeadline(time.Time{}) - writer.start() + writer.Start() select { case <-c.ctx.Done(): - writer.stop() + writer.Stop() return true, fmt.Errorf("terminated") - case err := <-writer.error(): + case err := <-writer.Error(): return true, err } } diff --git a/internal/core/srt_server.go b/internal/core/srt_server.go index ce10860b..bfc09d8b 100644 --- a/internal/core/srt_server.go +++ b/internal/core/srt_server.go @@ -137,7 +137,7 @@ func newSRTServer( // Log is the main logging function. func (s *srtServer) Log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[SRT] "+format, append([]interface{}{}, args...)...) + s.parent.Log(level, "[SRT] "+format, args...) } func (s *srtServer) close() { diff --git a/internal/core/srt_source.go b/internal/core/srt_source.go index 5dd9b806..f6292a53 100644 --- a/internal/core/srt_source.go +++ b/internal/core/srt_source.go @@ -91,7 +91,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error { return err } - decodeErrLogger := newLimitedLogger(s) + decodeErrLogger := logger.NewLimitedLogger(s) r.OnDecodeError(func(err error) { decodeErrLogger.Log(logger.Warn, err.Error()) diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index efb8d787..8840f1ad 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -140,7 +140,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { return err } - decodeErrLogger := newLimitedLogger(s) + decodeErrLogger := logger.NewLimitedLogger(s) r.OnDecodeError(func(err error) { decodeErrLogger.Log(logger.Warn, err.Error()) diff --git a/internal/core/webrtc_manager.go b/internal/core/webrtc_manager.go index a9166233..bf325e25 100644 --- a/internal/core/webrtc_manager.go +++ b/internal/core/webrtc_manager.go @@ -416,7 +416,7 @@ func newWebRTCManager( // Log is the main logging function. func (m *webRTCManager) Log(level logger.Level, format string, args ...interface{}) { - m.parent.Log(level, "[WebRTC] "+format, append([]interface{}{}, args...)...) + m.parent.Log(level, "[WebRTC] "+format, args...) } func (m *webRTCManager) close() { diff --git a/internal/core/webrtc_outgoing_track.go b/internal/core/webrtc_outgoing_track.go index ed7cf973..4e1d5a6a 100644 --- a/internal/core/webrtc_outgoing_track.go +++ b/internal/core/webrtc_outgoing_track.go @@ -12,6 +12,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" "github.com/pion/webrtc/v3" + "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -348,9 +349,8 @@ func newWebRTCOutgoingTrackAudio(desc *description.Session) (*webRTCOutgoingTrac } func (t *webRTCOutgoingTrack) start( - r reader, stream *stream.Stream, - writer *asyncWriter, + writer *asyncwriter.Writer, ) { // read incoming RTCP packets to make interceptors work go func() { @@ -363,9 +363,7 @@ func (t *webRTCOutgoingTrack) start( } }() - stream.AddReader(r, t.media, t.format, func(u unit.Unit) { - writer.push(func() error { - return t.cb(u) - }) + stream.AddReader(writer, t.media, t.format, func(u unit.Unit) error { + return t.cb(u) }) } diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index 217a449b..ab988b34 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -16,6 +16,7 @@ import ( "github.com/pion/sdp/v3" "github.com/pion/webrtc/v3" + "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/webrtcpc" ) @@ -511,29 +512,29 @@ func (s *webRTCSession) runRead() (int, error) { s.pc = pc s.mutex.Unlock() - writer := newAsyncWriter(s.writeQueueSize, s) + writer := asyncwriter.New(s.writeQueueSize, s) for _, track := range tracks { - track.start(s, res.stream, writer) + track.start(res.stream, writer) } - defer res.stream.RemoveReader(s) + defer res.stream.RemoveReader(writer) s.Log(logger.Info, "is reading from path '%s', %s", res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks))) - writer.start() + writer.Start() select { case <-pc.Disconnected(): - writer.stop() + writer.Stop() return 0, fmt.Errorf("peer connection closed") - case err := <-writer.error(): + case err := <-writer.Error(): return 0, err case <-s.ctx.Done(): - writer.stop() + writer.Stop() return 0, fmt.Errorf("terminated") } } diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 7b8bc71b..6125bd6d 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -1,4 +1,4 @@ -// Package formatprocessor contains code to cleanup and normalize streams. +// Package formatprocessor cleans and normalizes streams. package formatprocessor import ( diff --git a/internal/logger/limited_logger.go b/internal/logger/limited_logger.go new file mode 100644 index 00000000..445bef23 --- /dev/null +++ b/internal/logger/limited_logger.go @@ -0,0 +1,34 @@ +package logger + +import ( + "sync" + "time" +) + +const ( + minIntervalBetweenWarnings = 1 * time.Second +) + +type limitedLogger struct { + w Writer + mutex sync.Mutex + lastPrinted time.Time +} + +// NewLimitedLogger is a wrapper around a Writer that limits printed messages. +func NewLimitedLogger(w Writer) Writer { + return &limitedLogger{ + w: w, + } +} + +// Log is the main logging function. +func (l *limitedLogger) Log(level Level, format string, args ...interface{}) { + now := time.Now() + l.mutex.Lock() + if now.Sub(l.lastPrinted) >= minIntervalBetweenWarnings { + l.lastPrinted = now + l.w.Log(level, format, args...) + } + l.mutex.Unlock() +} diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 298955f8..91ee4c29 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -10,12 +10,15 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" + "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/unit" ) +type readerFunc func(unit.Unit) error + // Stream is a media stream. -// It stores tracks, readers and allow to write data to readers. +// It stores tracks, readers and allows to write data to readers. type Stream struct { desc *description.Session bytesReceived *uint64 @@ -62,7 +65,7 @@ func (s *Stream) Close() { } } -// Desc returns description of the stream. +// Desc returns the description of the stream. func (s *Stream) Desc() *description.Session { return s.desc } @@ -90,7 +93,7 @@ func (s *Stream) RTSPSStream(server *gortsplib.Server) *gortsplib.ServerStream { } // AddReader adds a reader. -func (s *Stream) AddReader(r interface{}, medi *description.Media, forma format.Format, cb func(unit.Unit)) { +func (s *Stream) AddReader(r *asyncwriter.Writer, medi *description.Media, forma format.Format, cb readerFunc) { s.mutex.Lock() defer s.mutex.Unlock() @@ -100,7 +103,7 @@ func (s *Stream) AddReader(r interface{}, medi *description.Media, forma format. } // RemoveReader removes a reader. -func (s *Stream) RemoveReader(r interface{}) { +func (s *Stream) RemoveReader(r *asyncwriter.Writer) { s.mutex.Lock() defer s.mutex.Unlock() @@ -112,14 +115,14 @@ func (s *Stream) RemoveReader(r interface{}) { } // WriteUnit writes a Unit. -func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, data unit.Unit) { +func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u unit.Unit) { sm := s.smedias[medi] sf := sm.formats[forma] s.mutex.RLock() defer s.mutex.RUnlock() - sf.writeUnit(s, medi, data) + sf.writeUnit(s, medi, u) } // WriteRTPPacket writes a RTP packet. diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 835e5650..0dc19e91 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -8,15 +8,24 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" + "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/formatprocessor" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/unit" ) +func unitSize(u unit.Unit) uint64 { + n := uint64(0) + for _, pkt := range u.GetRTPPackets() { + n += uint64(pkt.MarshalSize()) + } + return n +} + type streamFormat struct { decodeErrLogger logger.Writer proc formatprocessor.Processor - nonRTSPReaders map[interface{}]func(unit.Unit) + readers map[*asyncwriter.Writer]readerFunc } func newStreamFormat( @@ -33,50 +42,47 @@ func newStreamFormat( sf := &streamFormat{ decodeErrLogger: decodeErrLogger, proc: proc, - nonRTSPReaders: make(map[interface{}]func(unit.Unit)), + readers: make(map[*asyncwriter.Writer]readerFunc), } return sf, nil } -func (sf *streamFormat) addReader(r interface{}, cb func(unit.Unit)) { - sf.nonRTSPReaders[r] = cb +func (sf *streamFormat) addReader(r *asyncwriter.Writer, cb readerFunc) { + sf.readers[r] = cb } -func (sf *streamFormat) removeReader(r interface{}) { - delete(sf.nonRTSPReaders, r) +func (sf *streamFormat) removeReader(r *asyncwriter.Writer) { + delete(sf.readers, r) } -func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, data unit.Unit) { - hasNonRTSPReaders := len(sf.nonRTSPReaders) > 0 +func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) { + hasNonRTSPReaders := len(sf.readers) > 0 - err := sf.proc.Process(data, hasNonRTSPReaders) + err := sf.proc.Process(u, hasNonRTSPReaders) if err != nil { sf.decodeErrLogger.Log(logger.Warn, err.Error()) return } - n := uint64(0) - for _, pkt := range data.GetRTPPackets() { - n += uint64(pkt.MarshalSize()) - } - atomic.AddUint64(s.bytesReceived, n) + atomic.AddUint64(s.bytesReceived, unitSize(u)) if s.rtspStream != nil { - for _, pkt := range data.GetRTPPackets() { - s.rtspStream.WritePacketRTPWithNTP(medi, pkt, data.GetNTP()) //nolint:errcheck + for _, pkt := range u.GetRTPPackets() { + s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck } } if s.rtspsStream != nil { - for _, pkt := range data.GetRTPPackets() { - s.rtspsStream.WritePacketRTPWithNTP(medi, pkt, data.GetNTP()) //nolint:errcheck + for _, pkt := range u.GetRTPPackets() { + s.rtspsStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck } } - // forward decoded frames to non-RTSP readers - for _, cb := range sf.nonRTSPReaders { - cb(data) + for writer, cb := range sf.readers { + writer.Push(func() error { + return cb(u) + }) } } diff --git a/mediamtx.yml b/mediamtx.yml index 91c98422..8df4eab5 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -1,6 +1,6 @@ ############################################### -# General parameters +# General settings # Sets the verbosity of the program; available values are "error", "warn", "info", "debug". logLevel: info @@ -62,9 +62,9 @@ runOnConnect: runOnConnectRestart: no ############################################### -# RTSP parameters +# RTSP settings -# Enable support for the RTSP protocol. +# Allow publishing and reading streams with the RTSP protocol. rtsp: yes # List of enabled RTSP transport protocols. # UDP is the most performant, but doesn't work when there's a NAT/firewall between @@ -102,9 +102,9 @@ serverCert: server.crt authMethods: [basic] ############################################### -# RTMP parameters +# RTMP settings -# Enable support for the RTMP protocol. +# Allow publishing and reading streams with the RTMP protocol. rtmp: yes # Address of the RTMP listener. This is needed only when encryption is "no" or "optional". rtmpAddress: :1935 @@ -122,9 +122,9 @@ rtmpServerKey: server.key rtmpServerCert: server.crt ############################################### -# HLS parameters +# HLS settings -# Enable support for the HLS protocol. +# Allow reading streams with the HLS protocol. hls: yes # Address of the HLS listener. hlsAddress: :8888 @@ -178,9 +178,9 @@ hlsTrustedProxies: [] hlsDirectory: '' ############################################### -# WebRTC parameters +# WebRTC settings -# Enable support for the WebRTC protocol. +# Allow publishing and reading streams with the WebRTC protocol. webrtc: yes # Address of the WebRTC listener. webrtcAddress: :8889 @@ -222,20 +222,20 @@ webrtcICEUDPMuxAddress: # Address of a ICE TCP listener in format host:port. # If filled, ICE traffic will pass through a single TCP port, # allowing the deployment of the server inside a container or behind a NAT. -# Setting this parameter forces usage of the TCP protocol, which is not +# Using this setting forces usage of the TCP protocol, which is not # optimal for WebRTC. webrtcICETCPMuxAddress: ############################################### -# SRT parameters +# SRT settings -# Enables support for the SRT protocol. +# Allow publishing and reading streams with the SRT protocol. srt: yes # Address of the SRT listener. srtAddress: :8890 ############################################### -# Path parameters +# Path settings # These settings are path-dependent, and the map key is the name of the path. # It's possible to use regular expressions by using a tilde as prefix, @@ -245,25 +245,24 @@ srtAddress: :8890 # another entry. paths: all: + ############################################### + # General path settings + # Source of the stream. This can be: - # * publisher -> the stream is published by a RTSP, RTMP, WebRTC or SRT client + # * publisher -> the stream is provided by a RTSP, RTMP, WebRTC or SRT client # * rtsp://existing-url -> the stream is pulled from another RTSP server / camera # * rtsps://existing-url -> the stream is pulled from another RTSP server / camera with RTSPS # * rtmp://existing-url -> the stream is pulled from another RTMP server / camera # * rtmps://existing-url -> the stream is pulled from another RTMP server / camera with RTMPS - # * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server - # * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server with HTTPS + # * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server / camera + # * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server / camera with HTTPS # * udp://ip:port -> the stream is pulled with UDP, by listening on the specified IP and port - # * srt://existing-url -> the stream is pulled from another SRT server - # * whep://existing-url -> the stream is pulled from another WebRTC server - # * wheps://existing-url -> the stream is pulled from another WebRTC server with HTTPS + # * srt://existing-url -> the stream is pulled from another SRT server / camera + # * whep://existing-url -> the stream is pulled from another WebRTC server / camera + # * wheps://existing-url -> the stream is pulled from another WebRTC server / camera with HTTPS # * redirect -> the stream is provided by another path or server # * rpiCamera -> the stream is provided by a Raspberry Pi Camera source: publisher - - ############################################### - # General path parameters - # If the source is a URL, and the source certificate is self-signed # or invalid, you can provide the fingerprint of the certificate in order to # validate it anyway. It can be obtained by running: @@ -283,7 +282,7 @@ paths: maxReaders: 0 ############################################### - # Authentication path parameters + # Authentication path settings # Username required to publish. # SHA256-hashed values can be inserted with the "sha256:" prefix. @@ -304,7 +303,7 @@ paths: readIPs: [] ############################################### - # Publisher path parameters (when source is "publisher") + # Publisher path settings (when source is "publisher") # allow another client to disconnect the current publisher and publish in its place. overridePublisher: yes @@ -313,7 +312,7 @@ paths: fallback: ############################################### - # RTSP path parameters (when source is a RTSP or a RTSPS URL) + # RTSP path settings (when source is a RTSP or a RTSPS URL) # protocol used to pull the stream. available values are "automatic", "udp", "multicast", "tcp". sourceProtocol: automatic @@ -333,13 +332,13 @@ paths: rtspRangeStart: ############################################### - # Redirect path parameters (when source is "redirect") + # Redirect path settings (when source is "redirect") # RTSP URL which clients will be redirected to. sourceRedirect: ############################################### - # Raspberry Pi Camera path parameters (when source is "rpiCamera") + # Raspberry Pi Camera path settings (when source is "rpiCamera") # ID of the camera rpiCameraCamID: 0 @@ -421,7 +420,7 @@ paths: rpiCameraTextOverlay: '%Y-%m-%d %H:%M:%S - MediaMTX' ############################################### - # external commands path parameters + # External commands path settings # Command to run when this path is initialized. # This can be used to publish a stream and keep it always opened.