Browse Source

force all readers to use an asynchronous writer (#2265)

needed by #2255
pull/2266/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
5fb7f4e846
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      apidocs/openapi.yaml
  2. 36
      internal/asyncwriter/async_writer.go
  3. 11
      internal/conf/conf.go
  4. 43
      internal/conf/path.go
  5. 2
      internal/core/hls_manager.go
  6. 192
      internal/core/hls_muxer.go
  7. 2
      internal/core/hls_source.go
  8. 30
      internal/core/limited_logger.go
  9. 512
      internal/core/path.go
  10. 298
      internal/core/path_manager.go
  11. 205
      internal/core/rtmp_conn.go
  12. 4
      internal/core/rtsp_session.go
  13. 2
      internal/core/rtsp_source.go
  14. 2
      internal/core/source_static.go
  15. 227
      internal/core/srt_conn.go
  16. 2
      internal/core/srt_server.go
  17. 2
      internal/core/srt_source.go
  18. 2
      internal/core/udp_source.go
  19. 2
      internal/core/webrtc_manager.go
  20. 10
      internal/core/webrtc_outgoing_track.go
  21. 15
      internal/core/webrtc_session.go
  22. 2
      internal/formatprocessor/processor.go
  23. 34
      internal/logger/limited_logger.go
  24. 15
      internal/stream/stream.go
  25. 48
      internal/stream/stream_format.go
  26. 57
      mediamtx.yml

21
apidocs/openapi.yaml

@ -18,7 +18,7 @@ components:
Conf: Conf:
type: object type: object
properties: properties:
# general # General
logLevel: logLevel:
type: string type: string
logDestinations: logDestinations:
@ -169,13 +169,13 @@ components:
webrtcICETCPMuxAddress: webrtcICETCPMuxAddress:
type: string type: string
# srt # SRT
srt: srt:
type: boolean type: boolean
srtAddress: srtAddress:
type: string type: string
# paths # Paths
paths: paths:
type: object type: object
additionalProperties: additionalProperties:
@ -184,10 +184,9 @@ components:
PathConf: PathConf:
type: object type: object
properties: properties:
# General
source: source:
type: string type: string
# general
sourceFingerprint: sourceFingerprint:
type: string type: string
sourceOnDemand: sourceOnDemand:
@ -199,7 +198,7 @@ components:
maxReaders: maxReaders:
type: number type: number
# authentication # Authentication
publishUser: publishUser:
type: string type: string
publishPass: publishPass:
@ -217,13 +216,13 @@ components:
items: items:
type: string type: string
# publisher # Publisher
overridePublisher: overridePublisher:
type: boolean type: boolean
fallback: fallback:
type: string type: string
# rtsp # RTSP
sourceProtocol: sourceProtocol:
type: string type: string
sourceAnyPortEnable: sourceAnyPortEnable:
@ -233,11 +232,11 @@ components:
rtspRangeStart: rtspRangeStart:
type: string type: string
# redirect # Redirect
sourceRedirect: sourceRedirect:
type: string type: string
# raspberry pi camera # Raspberry Pi Camera
rpiCameraCamID: rpiCameraCamID:
type: integer type: integer
rpiCameraWidth: rpiCameraWidth:
@ -303,7 +302,7 @@ components:
rpiCameraTextOverlay: rpiCameraTextOverlay:
type: string type: string
# external commands # External commands
runOnInit: runOnInit:
type: string type: string
runOnInitRestart: runOnInitRestart:

36
internal/core/async_writer.go → internal/asyncwriter/async_writer.go

@ -1,19 +1,16 @@
package core // Package asyncwriter contains an asynchronous writer.
package asyncwriter
import ( import (
"fmt" "fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
const ( // Writer is an asynchronous writer.
minIntervalBetweenWarnings = 1 * time.Second type Writer struct {
)
type asyncWriter struct {
writeErrLogger logger.Writer writeErrLogger logger.Writer
buffer *ringbuffer.RingBuffer buffer *ringbuffer.RingBuffer
@ -21,37 +18,41 @@ type asyncWriter struct {
err chan error err chan error
} }
func newAsyncWriter( // New allocates a Writer.
func New(
queueSize int, queueSize int,
parent logger.Writer, parent logger.Writer,
) *asyncWriter { ) *Writer {
buffer, _ := ringbuffer.New(uint64(queueSize)) buffer, _ := ringbuffer.New(uint64(queueSize))
return &asyncWriter{ return &Writer{
writeErrLogger: newLimitedLogger(parent), writeErrLogger: logger.NewLimitedLogger(parent),
buffer: buffer, buffer: buffer,
err: make(chan error), err: make(chan error),
} }
} }
func (w *asyncWriter) start() { // Start starts the writer routine.
func (w *Writer) Start() {
go w.run() go w.run()
} }
func (w *asyncWriter) stop() { // Stop stops the writer routine.
func (w *Writer) Stop() {
w.buffer.Close() w.buffer.Close()
<-w.err <-w.err
} }
func (w *asyncWriter) error() chan error { // Error returns whenever there's an error.
func (w *Writer) Error() chan error {
return w.err return w.err
} }
func (w *asyncWriter) run() { func (w *Writer) run() {
w.err <- w.runInner() w.err <- w.runInner()
} }
func (w *asyncWriter) runInner() error { func (w *Writer) runInner() error {
for { for {
cb, ok := w.buffer.Pull() cb, ok := w.buffer.Pull()
if !ok { if !ok {
@ -65,7 +66,8 @@ func (w *asyncWriter) runInner() error {
} }
} }
func (w *asyncWriter) push(cb func() error) { // Push appends an element to the queue.
func (w *Writer) Push(cb func() error) {
ok := w.buffer.Push(cb) ok := w.buffer.Push(cb)
if !ok { if !ok {
w.writeErrLogger.Log(logger.Warn, "write queue is full") w.writeErrLogger.Log(logger.Warn, "write queue is full")

11
internal/conf/conf.go

@ -89,7 +89,7 @@ func contains(list []headers.AuthMethod, item headers.AuthMethod) bool {
// Conf is a configuration. // Conf is a configuration.
type Conf struct { type Conf struct {
// general // General
LogLevel LogLevel `json:"logLevel"` LogLevel LogLevel `json:"logLevel"`
LogDestinations LogDestinations `json:"logDestinations"` LogDestinations LogDestinations `json:"logDestinations"`
LogFile string `json:"logFile"` LogFile string `json:"logFile"`
@ -169,7 +169,7 @@ type Conf struct {
SRT bool `json:"srt"` SRT bool `json:"srt"`
SRTAddress string `json:"srtAddress"` SRTAddress string `json:"srtAddress"`
// paths // Paths
Paths map[string]*PathConf `json:"paths"` Paths map[string]*PathConf `json:"paths"`
} }
@ -218,7 +218,8 @@ func (conf Conf) Clone() *Conf {
// Check checks the configuration for errors. // Check checks the configuration for errors.
func (conf *Conf) Check() error { func (conf *Conf) Check() error {
// general // General
if conf.ReadBufferCount != 0 { if conf.ReadBufferCount != 0 {
conf.WriteQueueSize = conf.ReadBufferCount conf.WriteQueueSize = conf.ReadBufferCount
} }
@ -240,6 +241,7 @@ func (conf *Conf) Check() error {
} }
// RTSP // RTSP
if conf.RTSPDisable { if conf.RTSPDisable {
conf.RTSP = false conf.RTSP = false
} }
@ -253,16 +255,19 @@ func (conf *Conf) Check() error {
} }
// RTMP // RTMP
if conf.RTMPDisable { if conf.RTMPDisable {
conf.RTMP = false conf.RTMP = false
} }
// HLS // HLS
if conf.HLSDisable { if conf.HLSDisable {
conf.HLS = false conf.HLS = false
} }
// WebRTC // WebRTC
if conf.WebRTCDisable { if conf.WebRTCDisable {
conf.WebRTC = false conf.WebRTC = false
} }

43
internal/conf/path.go

@ -42,17 +42,15 @@ func IsValidPathName(name string) error {
type PathConf struct { type PathConf struct {
Regexp *regexp.Regexp `json:"-"` Regexp *regexp.Regexp `json:"-"`
// source // General
Source string `json:"source"` Source string `json:"source"`
// general
SourceFingerprint string `json:"sourceFingerprint"` SourceFingerprint string `json:"sourceFingerprint"`
SourceOnDemand bool `json:"sourceOnDemand"` SourceOnDemand bool `json:"sourceOnDemand"`
SourceOnDemandStartTimeout StringDuration `json:"sourceOnDemandStartTimeout"` SourceOnDemandStartTimeout StringDuration `json:"sourceOnDemandStartTimeout"`
SourceOnDemandCloseAfter StringDuration `json:"sourceOnDemandCloseAfter"` SourceOnDemandCloseAfter StringDuration `json:"sourceOnDemandCloseAfter"`
MaxReaders int `json:"maxReaders"` MaxReaders int `json:"maxReaders"`
// authentication // Authentication
PublishUser Credential `json:"publishUser"` PublishUser Credential `json:"publishUser"`
PublishPass Credential `json:"publishPass"` PublishPass Credential `json:"publishPass"`
PublishIPs IPsOrCIDRs `json:"publishIPs"` PublishIPs IPsOrCIDRs `json:"publishIPs"`
@ -60,21 +58,21 @@ type PathConf struct {
ReadPass Credential `json:"readPass"` ReadPass Credential `json:"readPass"`
ReadIPs IPsOrCIDRs `json:"readIPs"` ReadIPs IPsOrCIDRs `json:"readIPs"`
// publisher // Publisher
OverridePublisher bool `json:"overridePublisher"` OverridePublisher bool `json:"overridePublisher"`
DisablePublisherOverride bool `json:"disablePublisherOverride"` // deprecated DisablePublisherOverride bool `json:"disablePublisherOverride"` // deprecated
Fallback string `json:"fallback"` Fallback string `json:"fallback"`
// rtsp // RTSP
SourceProtocol SourceProtocol `json:"sourceProtocol"` SourceProtocol SourceProtocol `json:"sourceProtocol"`
SourceAnyPortEnable bool `json:"sourceAnyPortEnable"` SourceAnyPortEnable bool `json:"sourceAnyPortEnable"`
RtspRangeType RtspRangeType `json:"rtspRangeType"` RtspRangeType RtspRangeType `json:"rtspRangeType"`
RtspRangeStart string `json:"rtspRangeStart"` RtspRangeStart string `json:"rtspRangeStart"`
// redirect // Redirect
SourceRedirect string `json:"sourceRedirect"` SourceRedirect string `json:"sourceRedirect"`
// raspberry pi camera // Raspberry Pi Camera
RPICameraCamID int `json:"rpiCameraCamID"` RPICameraCamID int `json:"rpiCameraCamID"`
RPICameraWidth int `json:"rpiCameraWidth"` RPICameraWidth int `json:"rpiCameraWidth"`
RPICameraHeight int `json:"rpiCameraHeight"` RPICameraHeight int `json:"rpiCameraHeight"`
@ -108,7 +106,7 @@ type PathConf struct {
RPICameraTextOverlayEnable bool `json:"rpiCameraTextOverlayEnable"` RPICameraTextOverlayEnable bool `json:"rpiCameraTextOverlayEnable"`
RPICameraTextOverlay string `json:"rpiCameraTextOverlay"` RPICameraTextOverlay string `json:"rpiCameraTextOverlay"`
// external commands // External commands
RunOnInit string `json:"runOnInit"` RunOnInit string `json:"runOnInit"`
RunOnInitRestart bool `json:"runOnInitRestart"` RunOnInitRestart bool `json:"runOnInitRestart"`
RunOnDemand string `json:"runOnDemand"` RunOnDemand string `json:"runOnDemand"`
@ -140,6 +138,8 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
pconf.Regexp = pathRegexp pconf.Regexp = pathRegexp
} }
// General
switch { switch {
case pconf.Source == "publisher": case pconf.Source == "publisher":
@ -263,10 +263,11 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
} }
} }
// Publisher
if pconf.DisablePublisherOverride { if pconf.DisablePublisherOverride {
pconf.OverridePublisher = true pconf.OverridePublisher = true
} }
if pconf.Fallback != "" { if pconf.Fallback != "" {
if strings.HasPrefix(pconf.Fallback, "/") { if strings.HasPrefix(pconf.Fallback, "/") {
err := IsValidPathName(pconf.Fallback[1:]) err := IsValidPathName(pconf.Fallback[1:])
@ -281,26 +282,24 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
} }
} }
// Authentication
if (pconf.PublishUser != "" && pconf.PublishPass == "") || if (pconf.PublishUser != "" && pconf.PublishPass == "") ||
(pconf.PublishUser == "" && pconf.PublishPass != "") { (pconf.PublishUser == "" && pconf.PublishPass != "") {
return fmt.Errorf("read username and password must be both filled") return fmt.Errorf("read username and password must be both filled")
} }
if pconf.PublishUser != "" && pconf.Source != "publisher" { if pconf.PublishUser != "" && pconf.Source != "publisher" {
return fmt.Errorf("'publishUser' is useless when source is not 'publisher', since " + return fmt.Errorf("'publishUser' is useless when source is not 'publisher', since " +
"the stream is not provided by a publisher, but by a fixed source") "the stream is not provided by a publisher, but by a fixed source")
} }
if len(pconf.PublishIPs) > 0 && pconf.Source != "publisher" { if len(pconf.PublishIPs) > 0 && pconf.Source != "publisher" {
return fmt.Errorf("'publishIPs' is useless when source is not 'publisher', since " + return fmt.Errorf("'publishIPs' is useless when source is not 'publisher', since " +
"the stream is not provided by a publisher, but by a fixed source") "the stream is not provided by a publisher, but by a fixed source")
} }
if (pconf.ReadUser != "" && pconf.ReadPass == "") || if (pconf.ReadUser != "" && pconf.ReadPass == "") ||
(pconf.ReadUser == "" && pconf.ReadPass != "") { (pconf.ReadUser == "" && pconf.ReadPass != "") {
return fmt.Errorf("read username and password must be both filled") return fmt.Errorf("read username and password must be both filled")
} }
if contains(conf.AuthMethods, headers.AuthDigest) { if contains(conf.AuthMethods, headers.AuthDigest) {
if strings.HasPrefix(string(pconf.PublishUser), "sha256:") || if strings.HasPrefix(string(pconf.PublishUser), "sha256:") ||
strings.HasPrefix(string(pconf.PublishPass), "sha256:") || strings.HasPrefix(string(pconf.PublishPass), "sha256:") ||
@ -309,7 +308,6 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
return fmt.Errorf("hashed credentials can't be used when the digest auth method is available") return fmt.Errorf("hashed credentials can't be used when the digest auth method is available")
} }
} }
if conf.ExternalAuthenticationURL != "" { if conf.ExternalAuthenticationURL != "" {
if pconf.PublishUser != "" || if pconf.PublishUser != "" ||
len(pconf.PublishIPs) > 0 || len(pconf.PublishIPs) > 0 ||
@ -319,10 +317,11 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
} }
} }
// External commands
if pconf.RunOnInit != "" && pconf.Regexp != nil { if pconf.RunOnInit != "" && pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path") return fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path")
} }
if pconf.RunOnDemand != "" && pconf.Source != "publisher" { if pconf.RunOnDemand != "" && pconf.Source != "publisher" {
return fmt.Errorf("'runOnDemand' can be used only when source is 'publisher'") return fmt.Errorf("'runOnDemand' can be used only when source is 'publisher'")
} }
@ -378,17 +377,17 @@ func (pconf PathConf) HasOnDemandPublisher() bool {
// UnmarshalJSON implements json.Unmarshaler. It is used to set default values. // UnmarshalJSON implements json.Unmarshaler. It is used to set default values.
func (pconf *PathConf) UnmarshalJSON(b []byte) error { func (pconf *PathConf) UnmarshalJSON(b []byte) error {
// source // Source
pconf.Source = "publisher" pconf.Source = "publisher"
// general // General
pconf.SourceOnDemandStartTimeout = 10 * StringDuration(time.Second) pconf.SourceOnDemandStartTimeout = 10 * StringDuration(time.Second)
pconf.SourceOnDemandCloseAfter = 10 * StringDuration(time.Second) pconf.SourceOnDemandCloseAfter = 10 * StringDuration(time.Second)
// publisher // Publisher
pconf.OverridePublisher = true pconf.OverridePublisher = true
// raspberry pi camera // Raspberry Pi Camera
pconf.RPICameraWidth = 1920 pconf.RPICameraWidth = 1920
pconf.RPICameraHeight = 1080 pconf.RPICameraHeight = 1080
pconf.RPICameraContrast = 1 pconf.RPICameraContrast = 1
@ -401,7 +400,7 @@ func (pconf *PathConf) UnmarshalJSON(b []byte) error {
pconf.RPICameraLevel = "4.1" pconf.RPICameraLevel = "4.1"
pconf.RPICameraTextOverlay = "%Y-%m-%d %H:%M:%S - MediaMTX" pconf.RPICameraTextOverlay = "%Y-%m-%d %H:%M:%S - MediaMTX"
// external commands // External commands
pconf.RunOnDemandStartTimeout = 10 * StringDuration(time.Second) pconf.RunOnDemandStartTimeout = 10 * StringDuration(time.Second)
pconf.RunOnDemandCloseAfter = 10 * StringDuration(time.Second) pconf.RunOnDemandCloseAfter = 10 * StringDuration(time.Second)

2
internal/core/hls_manager.go

@ -142,7 +142,7 @@ func newHLSManager(
// Log is the main logging function. // Log is the main logging function.
func (m *hlsManager) Log(level logger.Level, format string, args ...interface{}) { func (m *hlsManager) Log(level logger.Level, format string, args ...interface{}) {
m.parent.Log(level, "[HLS] "+format, append([]interface{}{}, args...)...) m.parent.Log(level, "[HLS] "+format, args...)
} }
func (m *hlsManager) close() { func (m *hlsManager) close() {

192
internal/core/hls_muxer.go

@ -17,6 +17,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
@ -75,7 +76,7 @@ type hlsMuxer struct {
ctxCancel func() ctxCancel func()
created time.Time created time.Time
path *path path *path
writer *asyncWriter writer *asyncwriter.Writer
lastRequestTime *int64 lastRequestTime *int64
muxer *gohlslib.Muxer muxer *gohlslib.Muxer
requests []*hlsMuxerHandleRequestReq requests []*hlsMuxerHandleRequestReq
@ -207,7 +208,7 @@ func (m *hlsMuxer) run() {
innerCtxCancel() innerCtxCancel()
if m.remoteAddr == "" { // created with "always remux" if m.remoteAddr == "" { // created with "always remux"
m.Log(logger.Info, "ERR: %v", err) m.Log(logger.Error, err.Error())
m.clearQueuedRequests() m.clearQueuedRequests()
isReady = false isReady = false
isRecreating = true isRecreating = true
@ -253,7 +254,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
defer m.path.removeReader(pathRemoveReaderReq{author: m}) defer m.path.removeReader(pathRemoveReaderReq{author: m})
m.writer = newAsyncWriter(m.writeQueueSize, m) m.writer = asyncwriter.New(m.writeQueueSize, m)
var medias []*description.Media var medias []*description.Media
@ -267,7 +268,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
medias = append(medias, audioMedia) medias = append(medias, audioMedia)
} }
defer res.stream.RemoveReader(m) defer res.stream.RemoveReader(m.writer)
if medias == nil { if medias == nil {
return fmt.Errorf( return fmt.Errorf(
@ -303,7 +304,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.Log(logger.Info, "is converting into HLS, %s", m.Log(logger.Info, "is converting into HLS, %s",
sourceMediaInfo(medias)) sourceMediaInfo(medias))
m.writer.start() m.writer.Start()
closeCheckTicker := time.NewTicker(closeCheckPeriod) closeCheckTicker := time.NewTicker(closeCheckPeriod)
defer closeCheckTicker.Stop() defer closeCheckTicker.Stop()
@ -314,16 +315,16 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
if m.remoteAddr != "" { if m.remoteAddr != "" {
t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime)) t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime))
if time.Since(t) >= closeAfterInactivity { if time.Since(t) >= closeAfterInactivity {
m.writer.stop() m.writer.Stop()
return fmt.Errorf("not used anymore") return fmt.Errorf("not used anymore")
} }
} }
case err := <-m.writer.error(): case err := <-m.writer.Error():
return err return err
case <-innerCtx.Done(): case <-innerCtx.Done():
m.writer.stop() m.writer.Stop()
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
} }
} }
@ -334,22 +335,19 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media,
videoMedia := stream.Desc().FindFormat(&videoFormatAV1) videoMedia := stream.Desc().FindFormat(&videoFormatAV1)
if videoFormatAV1 != nil { if videoFormatAV1 != nil {
stream.AddReader(m, videoMedia, videoFormatAV1, func(u unit.Unit) { stream.AddReader(m.writer, videoMedia, videoFormatAV1, func(u unit.Unit) error {
m.writer.push(func() error { tunit := u.(*unit.AV1)
tunit := u.(*unit.AV1)
if tunit.TU == nil { if tunit.TU == nil {
return nil return nil
} }
pts := tunit.PTS err := m.muxer.WriteAV1(tunit.NTP, tunit.PTS, tunit.TU)
err := m.muxer.WriteAV1(tunit.NTP, pts, tunit.TU) if err != nil {
if err != nil { return fmt.Errorf("muxer error: %v", err)
return fmt.Errorf("muxer error: %v", err) }
}
return nil return nil
})
}) })
return videoMedia, &gohlslib.Track{ return videoMedia, &gohlslib.Track{
@ -361,22 +359,19 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media,
videoMedia = stream.Desc().FindFormat(&videoFormatVP9) videoMedia = stream.Desc().FindFormat(&videoFormatVP9)
if videoFormatVP9 != nil { if videoFormatVP9 != nil {
stream.AddReader(m, videoMedia, videoFormatVP9, func(u unit.Unit) { stream.AddReader(m.writer, videoMedia, videoFormatVP9, func(u unit.Unit) error {
m.writer.push(func() error { tunit := u.(*unit.VP9)
tunit := u.(*unit.VP9)
if tunit.Frame == nil { if tunit.Frame == nil {
return nil return nil
} }
pts := tunit.PTS err := m.muxer.WriteVP9(tunit.NTP, tunit.PTS, tunit.Frame)
err := m.muxer.WriteVP9(tunit.NTP, pts, tunit.Frame) if err != nil {
if err != nil { return fmt.Errorf("muxer error: %v", err)
return fmt.Errorf("muxer error: %v", err) }
}
return nil return nil
})
}) })
return videoMedia, &gohlslib.Track{ return videoMedia, &gohlslib.Track{
@ -388,22 +383,19 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media,
videoMedia = stream.Desc().FindFormat(&videoFormatH265) videoMedia = stream.Desc().FindFormat(&videoFormatH265)
if videoFormatH265 != nil { if videoFormatH265 != nil {
stream.AddReader(m, videoMedia, videoFormatH265, func(u unit.Unit) { stream.AddReader(m.writer, videoMedia, videoFormatH265, func(u unit.Unit) error {
m.writer.push(func() error { tunit := u.(*unit.H265)
tunit := u.(*unit.H265)
if tunit.AU == nil { if tunit.AU == nil {
return nil return nil
} }
pts := tunit.PTS err := m.muxer.WriteH26x(tunit.NTP, tunit.PTS, tunit.AU)
err := m.muxer.WriteH26x(tunit.NTP, pts, tunit.AU) if err != nil {
if err != nil { return fmt.Errorf("muxer error: %v", err)
return fmt.Errorf("muxer error: %v", err) }
}
return nil return nil
})
}) })
vps, sps, pps := videoFormatH265.SafeParams() vps, sps, pps := videoFormatH265.SafeParams()
@ -421,22 +413,19 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*description.Media,
videoMedia = stream.Desc().FindFormat(&videoFormatH264) videoMedia = stream.Desc().FindFormat(&videoFormatH264)
if videoFormatH264 != nil { if videoFormatH264 != nil {
stream.AddReader(m, videoMedia, videoFormatH264, func(u unit.Unit) { stream.AddReader(m.writer, videoMedia, videoFormatH264, func(u unit.Unit) error {
m.writer.push(func() error { tunit := u.(*unit.H264)
tunit := u.(*unit.H264)
if tunit.AU == nil { if tunit.AU == nil {
return nil return nil
} }
pts := tunit.PTS err := m.muxer.WriteH26x(tunit.NTP, tunit.PTS, tunit.AU)
err := m.muxer.WriteH26x(tunit.NTP, pts, tunit.AU) if err != nil {
if err != nil { return fmt.Errorf("muxer error: %v", err)
return fmt.Errorf("muxer error: %v", err) }
}
return nil return nil
})
}) })
sps, pps := videoFormatH264.SafeParams() sps, pps := videoFormatH264.SafeParams()
@ -457,21 +446,18 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media,
audioMedia := stream.Desc().FindFormat(&audioFormatOpus) audioMedia := stream.Desc().FindFormat(&audioFormatOpus)
if audioMedia != nil { if audioMedia != nil {
stream.AddReader(m, audioMedia, audioFormatOpus, func(u unit.Unit) { stream.AddReader(m.writer, audioMedia, audioFormatOpus, func(u unit.Unit) error {
m.writer.push(func() error { tunit := u.(*unit.Opus)
tunit := u.(*unit.Opus)
err := m.muxer.WriteOpus(
pts := tunit.PTS tunit.NTP,
err := m.muxer.WriteOpus( tunit.PTS,
tunit.NTP, tunit.Packets)
pts, if err != nil {
tunit.Packets) return fmt.Errorf("muxer error: %v", err)
if err != nil { }
return fmt.Errorf("muxer error: %v", err)
}
return nil return nil
})
}) })
return audioMedia, &gohlslib.Track{ return audioMedia, &gohlslib.Track{
@ -490,25 +476,22 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media,
audioMedia = stream.Desc().FindFormat(&audioFormatMPEG4AudioGeneric) audioMedia = stream.Desc().FindFormat(&audioFormatMPEG4AudioGeneric)
if audioMedia != nil { if audioMedia != nil {
stream.AddReader(m, audioMedia, audioFormatMPEG4AudioGeneric, func(u unit.Unit) { stream.AddReader(m.writer, audioMedia, audioFormatMPEG4AudioGeneric, func(u unit.Unit) error {
m.writer.push(func() error { tunit := u.(*unit.MPEG4AudioGeneric)
tunit := u.(*unit.MPEG4AudioGeneric)
if tunit.AUs == nil { if tunit.AUs == nil {
return nil return nil
} }
pts := tunit.PTS err := m.muxer.WriteMPEG4Audio(
err := m.muxer.WriteMPEG4Audio( tunit.NTP,
tunit.NTP, tunit.PTS,
pts, tunit.AUs)
tunit.AUs) if err != nil {
if err != nil { return fmt.Errorf("muxer error: %v", err)
return fmt.Errorf("muxer error: %v", err) }
}
return nil return nil
})
}) })
return audioMedia, &gohlslib.Track{ return audioMedia, &gohlslib.Track{
@ -525,25 +508,22 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*description.Media,
audioFormatMPEG4AudioLATM.Config != nil && audioFormatMPEG4AudioLATM.Config != nil &&
len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 && len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 &&
len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 { len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 {
stream.AddReader(m, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) { stream.AddReader(m.writer, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) error {
m.writer.push(func() error { tunit := u.(*unit.MPEG4AudioLATM)
tunit := u.(*unit.MPEG4AudioLATM)
if tunit.AU == nil { if tunit.AU == nil {
return nil return nil
} }
pts := tunit.PTS err := m.muxer.WriteMPEG4Audio(
err := m.muxer.WriteMPEG4Audio( tunit.NTP,
tunit.NTP, tunit.PTS,
pts, [][]byte{tunit.AU})
[][]byte{tunit.AU}) if err != nil {
if err != nil { return fmt.Errorf("muxer error: %v", err)
return fmt.Errorf("muxer error: %v", err) }
}
return nil return nil
})
}) })
return audioMedia, &gohlslib.Track{ return audioMedia, &gohlslib.Track{

2
internal/core/hls_source.go

@ -48,7 +48,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
} }
}() }()
decodeErrLogger := newLimitedLogger(s) decodeErrLogger := logger.NewLimitedLogger(s)
var c *gohlslib.Client var c *gohlslib.Client
c = &gohlslib.Client{ c = &gohlslib.Client{

30
internal/core/limited_logger.go

@ -1,30 +0,0 @@
package core
import (
"sync"
"time"
"github.com/bluenviron/mediamtx/internal/logger"
)
type limitedLogger struct {
w logger.Writer
mutex sync.Mutex
lastPrinted time.Time
}
func newLimitedLogger(w logger.Writer) *limitedLogger {
return &limitedLogger{
w: w,
}
}
func (l *limitedLogger) Log(level logger.Level, format string, args ...interface{}) {
now := time.Now()
l.mutex.Lock()
if now.Sub(l.lastPrinted) >= minIntervalBetweenWarnings {
l.lastPrinted = now
l.w.Log(level, format, args...)
}
l.mutex.Unlock()
}

512
internal/core/path.go

@ -380,112 +380,85 @@ func (pa *path) runInner() error {
for { for {
select { select {
case <-pa.onDemandStaticSourceReadyTimer.C: case <-pa.onDemandStaticSourceReadyTimer.C:
for _, req := range pa.describeRequestsOnHold { pa.doOnDemandStaticSourceReadyTimer()
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.readerAddRequestsOnHold = nil
pa.onDemandStaticSourceStop()
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case <-pa.onDemandStaticSourceCloseTimer.C: case <-pa.onDemandStaticSourceCloseTimer.C:
pa.setNotReady() pa.doOnDemandStaticSourceCloseTimer()
pa.onDemandStaticSourceStop()
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case <-pa.onDemandPublisherReadyTimer.C: case <-pa.onDemandPublisherReadyTimer.C:
for _, req := range pa.describeRequestsOnHold { pa.doOnDemandPublisherReadyTimer()
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.readerAddRequestsOnHold = nil
pa.onDemandStopPublisher()
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case <-pa.onDemandPublisherCloseTimer.C: case <-pa.onDemandPublisherCloseTimer.C:
pa.onDemandStopPublisher() pa.doOnDemandPublisherCloseTimer()
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case newConf := <-pa.chReloadConf: case newConf := <-pa.chReloadConf:
if pa.conf.HasStaticSource() { pa.doReloadConf(newConf)
go pa.source.(*sourceStatic).reloadConf(newConf)
}
pa.confMutex.Lock()
pa.conf = newConf
pa.confMutex.Unlock()
case req := <-pa.chSourceStaticSetReady: case req := <-pa.chSourceStaticSetReady:
pa.handleSourceStaticSetReady(req) pa.doSourceStaticSetReady(req)
case req := <-pa.chSourceStaticSetNotReady: case req := <-pa.chSourceStaticSetNotReady:
pa.handleSourceStaticSetNotReady(req) pa.doSourceStaticSetNotReady(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chDescribe: case req := <-pa.chDescribe:
pa.handleDescribe(req) pa.doDescribe(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chRemovePublisher: case req := <-pa.chRemovePublisher:
pa.handleRemovePublisher(req) pa.doRemovePublisher(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chAddPublisher: case req := <-pa.chAddPublisher:
pa.handleAddPublisher(req) pa.doAddPublisher(req)
case req := <-pa.chStartPublisher: case req := <-pa.chStartPublisher:
pa.handleStartPublisher(req) pa.doStartPublisher(req)
case req := <-pa.chStopPublisher: case req := <-pa.chStopPublisher:
pa.handleStopPublisher(req) pa.doStopPublisher(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chAddReader: case req := <-pa.chAddReader:
pa.handleAddReader(req) pa.doAddReader(req)
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
} }
case req := <-pa.chRemoveReader: case req := <-pa.chRemoveReader:
pa.handleRemoveReader(req) pa.doRemoveReader(req)
case req := <-pa.chAPIPathsGet: case req := <-pa.chAPIPathsGet:
pa.handleAPIPathsGet(req) pa.doAPIPathsGet(req)
case <-pa.ctx.Done(): case <-pa.ctx.Done():
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
@ -493,167 +466,54 @@ func (pa *path) runInner() error {
} }
} }
func (pa *path) shouldClose() bool { func (pa *path) doOnDemandStaticSourceReadyTimer() {
return pa.conf.Regexp != nil && for _, req := range pa.describeRequestsOnHold {
pa.source == nil && req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
len(pa.readers) == 0 &&
len(pa.describeRequestsOnHold) == 0 &&
len(pa.readerAddRequestsOnHold) == 0
}
func (pa *path) externalCmdEnv() externalcmd.Environment {
_, port, _ := net.SplitHostPort(pa.rtspAddress)
env := externalcmd.Environment{
"MTX_PATH": pa.name,
"RTSP_PATH": pa.name, // deprecated
"RTSP_PORT": port,
}
if len(pa.matches) > 1 {
for i, ma := range pa.matches[1:] {
env["G"+strconv.FormatInt(int64(i+1), 10)] = ma
}
}
return env
}
func (pa *path) onDemandStaticSourceStart() {
pa.source.(*sourceStatic).start()
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
pa.onDemandStaticSourceState = pathOnDemandStateWaitingReady
}
func (pa *path) onDemandStaticSourceScheduleClose() {
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter))
pa.onDemandStaticSourceState = pathOnDemandStateClosing
}
func (pa *path) onDemandStaticSourceStop() {
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = newEmptyTimer()
}
pa.onDemandStaticSourceState = pathOnDemandStateInitial
pa.source.(*sourceStatic).stop()
}
func (pa *path) onDemandStartPublisher() {
pa.Log(logger.Info, "runOnDemand command started")
pa.onDemandCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnDemand,
pa.conf.RunOnDemandRestart,
pa.externalCmdEnv(),
func(err error) {
pa.Log(logger.Info, "runOnDemand command exited: %v", err)
})
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
pa.onDemandPublisherState = pathOnDemandStateWaitingReady
}
func (pa *path) onDemandPublisherScheduleClose() {
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter))
pa.onDemandPublisherState = pathOnDemandStateClosing
}
func (pa *path) onDemandStopPublisher() {
if pa.source != nil {
pa.source.(publisher).close()
pa.doPublisherRemove()
} }
pa.describeRequestsOnHold = nil
if pa.onDemandPublisherState == pathOnDemandStateClosing { for _, req := range pa.readerAddRequestsOnHold {
pa.onDemandPublisherCloseTimer.Stop() req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
pa.onDemandPublisherCloseTimer = newEmptyTimer()
} }
pa.readerAddRequestsOnHold = nil
pa.onDemandPublisherState = pathOnDemandStateInitial pa.onDemandStaticSourceStop()
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
pa.Log(logger.Info, "runOnDemand command stopped")
}
} }
func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { func (pa *path) doOnDemandStaticSourceCloseTimer() {
stream, err := stream.New( pa.setNotReady()
pa.udpMaxPayloadSize, pa.onDemandStaticSourceStop()
desc,
allocateEncoder,
pa.bytesReceived,
newLimitedLogger(pa.source),
)
if err != nil {
return err
}
pa.stream = stream
pa.readyTime = time.Now()
if pa.conf.RunOnReady != "" {
pa.Log(logger.Info, "runOnReady command started")
pa.onReadyCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnReady,
pa.conf.RunOnReadyRestart,
pa.externalCmdEnv(),
func(err error) {
pa.Log(logger.Info, "runOnReady command exited: %v", err)
})
}
pa.parent.pathReady(pa)
return nil
} }
func (pa *path) setNotReady() { func (pa *path) doOnDemandPublisherReadyTimer() {
pa.parent.pathNotReady(pa) for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
for r := range pa.readers {
pa.doRemoveReader(r)
r.close()
} }
pa.describeRequestsOnHold = nil
if pa.onReadyCmd != nil { for _, req := range pa.readerAddRequestsOnHold {
pa.onReadyCmd.Close() req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
pa.onReadyCmd = nil
pa.Log(logger.Info, "runOnReady command stopped")
} }
pa.readerAddRequestsOnHold = nil
if pa.stream != nil { pa.onDemandStopPublisher()
pa.stream.Close()
pa.stream = nil
}
} }
func (pa *path) doRemoveReader(r reader) { func (pa *path) doOnDemandPublisherCloseTimer() {
delete(pa.readers, r) pa.onDemandStopPublisher()
} }
func (pa *path) doPublisherRemove() { func (pa *path) doReloadConf(newConf *conf.PathConf) {
if pa.stream != nil { if pa.conf.HasStaticSource() {
pa.setNotReady() go pa.source.(*sourceStatic).reloadConf(newConf)
} }
pa.source = nil pa.confMutex.Lock()
pa.conf = newConf
pa.confMutex.Unlock()
} }
func (pa *path) handleSourceStaticSetReady(req pathSourceStaticSetReadyReq) { func (pa *path) doSourceStaticSetReady(req pathSourceStaticSetReadyReq) {
err := pa.setReady(req.desc, req.generateRTPPackets) err := pa.setReady(req.desc, req.generateRTPPackets)
if err != nil { if err != nil {
req.res <- pathSourceStaticSetReadyRes{err: err} req.res <- pathSourceStaticSetReadyRes{err: err}
@ -674,7 +534,7 @@ func (pa *path) handleSourceStaticSetReady(req pathSourceStaticSetReadyReq) {
pa.describeRequestsOnHold = nil pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold { for _, req := range pa.readerAddRequestsOnHold {
pa.handleAddReaderPost(req) pa.addReaderPost(req)
} }
pa.readerAddRequestsOnHold = nil pa.readerAddRequestsOnHold = nil
} }
@ -682,7 +542,7 @@ func (pa *path) handleSourceStaticSetReady(req pathSourceStaticSetReadyReq) {
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream} req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
} }
func (pa *path) handleSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) { func (pa *path) doSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
pa.setNotReady() pa.setNotReady()
// send response before calling onDemandStaticSourceStop() // send response before calling onDemandStaticSourceStop()
@ -694,7 +554,7 @@ func (pa *path) handleSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq
} }
} }
func (pa *path) handleDescribe(req pathDescribeReq) { func (pa *path) doDescribe(req pathDescribeReq) {
if _, ok := pa.source.(*sourceRedirect); ok { if _, ok := pa.source.(*sourceRedirect); ok {
req.res <- pathDescribeRes{ req.res <- pathDescribeRes{
redirect: pa.conf.SourceRedirect, redirect: pa.conf.SourceRedirect,
@ -745,14 +605,14 @@ func (pa *path) handleDescribe(req pathDescribeReq) {
req.res <- pathDescribeRes{err: errPathNoOnePublishing{pathName: pa.name}} req.res <- pathDescribeRes{err: errPathNoOnePublishing{pathName: pa.name}}
} }
func (pa *path) handleRemovePublisher(req pathRemovePublisherReq) { func (pa *path) doRemovePublisher(req pathRemovePublisherReq) {
if pa.source == req.author { if pa.source == req.author {
pa.doPublisherRemove() pa.executeRemovePublisher()
} }
close(req.res) close(req.res)
} }
func (pa *path) handleAddPublisher(req pathAddPublisherReq) { func (pa *path) doAddPublisher(req pathAddPublisherReq) {
if pa.conf.Source != "publisher" { if pa.conf.Source != "publisher" {
req.res <- pathAddPublisherRes{ req.res <- pathAddPublisherRes{
err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name), err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name),
@ -768,7 +628,7 @@ func (pa *path) handleAddPublisher(req pathAddPublisherReq) {
pa.Log(logger.Info, "closing existing publisher") pa.Log(logger.Info, "closing existing publisher")
pa.source.(publisher).close() pa.source.(publisher).close()
pa.doPublisherRemove() pa.executeRemovePublisher()
} }
pa.source = req.author pa.source = req.author
@ -776,7 +636,7 @@ func (pa *path) handleAddPublisher(req pathAddPublisherReq) {
req.res <- pathAddPublisherRes{path: pa} req.res <- pathAddPublisherRes{path: pa}
} }
func (pa *path) handleStartPublisher(req pathStartPublisherReq) { func (pa *path) doStartPublisher(req pathStartPublisherReq) {
if pa.source != req.author { if pa.source != req.author {
req.res <- pathStartPublisherRes{err: fmt.Errorf("publisher is not assigned to this path anymore")} req.res <- pathStartPublisherRes{err: fmt.Errorf("publisher is not assigned to this path anymore")}
return return
@ -806,7 +666,7 @@ func (pa *path) handleStartPublisher(req pathStartPublisherReq) {
pa.describeRequestsOnHold = nil pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold { for _, req := range pa.readerAddRequestsOnHold {
pa.handleAddReaderPost(req) pa.addReaderPost(req)
} }
pa.readerAddRequestsOnHold = nil pa.readerAddRequestsOnHold = nil
} }
@ -814,35 +674,16 @@ func (pa *path) handleStartPublisher(req pathStartPublisherReq) {
req.res <- pathStartPublisherRes{stream: pa.stream} req.res <- pathStartPublisherRes{stream: pa.stream}
} }
func (pa *path) handleStopPublisher(req pathStopPublisherReq) { func (pa *path) doStopPublisher(req pathStopPublisherReq) {
if req.author == pa.source && pa.stream != nil { if req.author == pa.source && pa.stream != nil {
pa.setNotReady() pa.setNotReady()
} }
close(req.res) close(req.res)
} }
func (pa *path) handleRemoveReader(req pathRemoveReaderReq) { func (pa *path) doAddReader(req pathAddReaderReq) {
if _, ok := pa.readers[req.author]; ok {
pa.doRemoveReader(req.author)
}
close(req.res)
if len(pa.readers) == 0 {
if pa.conf.HasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateReady {
pa.onDemandStaticSourceScheduleClose()
}
} else if pa.conf.HasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateReady {
pa.onDemandPublisherScheduleClose()
}
}
}
}
func (pa *path) handleAddReader(req pathAddReaderReq) {
if pa.stream != nil { if pa.stream != nil {
pa.handleAddReaderPost(req) pa.addReaderPost(req)
return return
} }
@ -865,45 +706,26 @@ func (pa *path) handleAddReader(req pathAddReaderReq) {
req.res <- pathAddReaderRes{err: errPathNoOnePublishing{pathName: pa.name}} req.res <- pathAddReaderRes{err: errPathNoOnePublishing{pathName: pa.name}}
} }
func (pa *path) handleAddReaderPost(req pathAddReaderReq) { func (pa *path) doRemoveReader(req pathRemoveReaderReq) {
if _, ok := pa.readers[req.author]; ok { if _, ok := pa.readers[req.author]; ok {
req.res <- pathAddReaderRes{ pa.executeRemoveReader(req.author)
path: pa,
stream: pa.stream,
}
return
} }
close(req.res)
if pa.conf.MaxReaders != 0 && len(pa.readers) >= pa.conf.MaxReaders { if len(pa.readers) == 0 {
req.res <- pathAddReaderRes{ if pa.conf.HasOnDemandStaticSource() {
err: fmt.Errorf("maximum reader count reached"), if pa.onDemandStaticSourceState == pathOnDemandStateReady {
} pa.onDemandStaticSourceScheduleClose()
return }
} } else if pa.conf.HasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateReady {
pa.readers[req.author] = struct{}{} pa.onDemandPublisherScheduleClose()
}
if pa.conf.HasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
pa.onDemandStaticSourceState = pathOnDemandStateReady
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = newEmptyTimer()
}
} else if pa.conf.HasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateClosing {
pa.onDemandPublisherState = pathOnDemandStateReady
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = newEmptyTimer()
} }
} }
req.res <- pathAddReaderRes{
path: pa,
stream: pa.stream,
}
} }
func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) { func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
req.res <- pathAPIPathsGetRes{ req.res <- pathAPIPathsGetRes{
data: &apiPath{ data: &apiPath{
Name: pa.name, Name: pa.name,
@ -942,6 +764,204 @@ func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) {
} }
} }
func (pa *path) shouldClose() bool {
return pa.conf.Regexp != nil &&
pa.source == nil &&
len(pa.readers) == 0 &&
len(pa.describeRequestsOnHold) == 0 &&
len(pa.readerAddRequestsOnHold) == 0
}
func (pa *path) externalCmdEnv() externalcmd.Environment {
_, port, _ := net.SplitHostPort(pa.rtspAddress)
env := externalcmd.Environment{
"MTX_PATH": pa.name,
"RTSP_PATH": pa.name, // deprecated
"RTSP_PORT": port,
}
if len(pa.matches) > 1 {
for i, ma := range pa.matches[1:] {
env["G"+strconv.FormatInt(int64(i+1), 10)] = ma
}
}
return env
}
func (pa *path) onDemandStaticSourceStart() {
pa.source.(*sourceStatic).start()
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
pa.onDemandStaticSourceState = pathOnDemandStateWaitingReady
}
func (pa *path) onDemandStaticSourceScheduleClose() {
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter))
pa.onDemandStaticSourceState = pathOnDemandStateClosing
}
func (pa *path) onDemandStaticSourceStop() {
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = newEmptyTimer()
}
pa.onDemandStaticSourceState = pathOnDemandStateInitial
pa.source.(*sourceStatic).stop()
}
func (pa *path) onDemandStartPublisher() {
pa.Log(logger.Info, "runOnDemand command started")
pa.onDemandCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnDemand,
pa.conf.RunOnDemandRestart,
pa.externalCmdEnv(),
func(err error) {
pa.Log(logger.Info, "runOnDemand command exited: %v", err)
})
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
pa.onDemandPublisherState = pathOnDemandStateWaitingReady
}
func (pa *path) onDemandPublisherScheduleClose() {
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter))
pa.onDemandPublisherState = pathOnDemandStateClosing
}
func (pa *path) onDemandStopPublisher() {
if pa.source != nil {
pa.source.(publisher).close()
pa.executeRemovePublisher()
}
if pa.onDemandPublisherState == pathOnDemandStateClosing {
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = newEmptyTimer()
}
pa.onDemandPublisherState = pathOnDemandStateInitial
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
pa.Log(logger.Info, "runOnDemand command stopped")
}
}
func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error {
var err error
pa.stream, err = stream.New(
pa.udpMaxPayloadSize,
desc,
allocateEncoder,
pa.bytesReceived,
logger.NewLimitedLogger(pa.source),
)
if err != nil {
return err
}
pa.readyTime = time.Now()
if pa.conf.RunOnReady != "" {
pa.Log(logger.Info, "runOnReady command started")
pa.onReadyCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnReady,
pa.conf.RunOnReadyRestart,
pa.externalCmdEnv(),
func(err error) {
pa.Log(logger.Info, "runOnReady command exited: %v", err)
})
}
pa.parent.pathReady(pa)
return nil
}
func (pa *path) setNotReady() {
pa.parent.pathNotReady(pa)
for r := range pa.readers {
pa.executeRemoveReader(r)
r.close()
}
if pa.onReadyCmd != nil {
pa.onReadyCmd.Close()
pa.onReadyCmd = nil
pa.Log(logger.Info, "runOnReady command stopped")
}
if pa.stream != nil {
pa.stream.Close()
pa.stream = nil
}
}
func (pa *path) executeRemoveReader(r reader) {
delete(pa.readers, r)
}
func (pa *path) executeRemovePublisher() {
if pa.stream != nil {
pa.setNotReady()
}
pa.source = nil
}
func (pa *path) addReaderPost(req pathAddReaderReq) {
if _, ok := pa.readers[req.author]; ok {
req.res <- pathAddReaderRes{
path: pa,
stream: pa.stream,
}
return
}
if pa.conf.MaxReaders != 0 && len(pa.readers) >= pa.conf.MaxReaders {
req.res <- pathAddReaderRes{
err: fmt.Errorf("maximum reader count reached"),
}
return
}
pa.readers[req.author] = struct{}{}
if pa.conf.HasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
pa.onDemandStaticSourceState = pathOnDemandStateReady
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = newEmptyTimer()
}
} else if pa.conf.HasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateClosing {
pa.onDemandPublisherState = pathOnDemandStateReady
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = newEmptyTimer()
}
}
req.res <- pathAddReaderRes{
path: pa,
stream: pa.stream,
}
}
// reloadConf is called by pathManager. // reloadConf is called by pathManager.
func (pa *path) reloadConf(newConf *conf.PathConf) { func (pa *path) reloadConf(newConf *conf.PathConf) {
select { select {

298
internal/core/path_manager.go

@ -85,6 +85,7 @@ type pathManager struct {
// in // in
chReloadConf chan map[string]*conf.PathConf chReloadConf chan map[string]*conf.PathConf
chSetHLSManager chan pathManagerHLSManager
chClosePath chan *path chClosePath chan *path
chPathReady chan *path chPathReady chan *path
chPathNotReady chan *path chPathNotReady chan *path
@ -92,7 +93,6 @@ type pathManager struct {
chDescribe chan pathDescribeReq chDescribe chan pathDescribeReq
chAddReader chan pathAddReaderReq chAddReader chan pathAddReaderReq
chAddPublisher chan pathAddPublisherReq chAddPublisher chan pathAddPublisherReq
chSetHLSManager chan pathManagerHLSManager
chAPIPathsList chan pathAPIPathsListReq chAPIPathsList chan pathAPIPathsListReq
chAPIPathsGet chan pathAPIPathsGetReq chAPIPathsGet chan pathAPIPathsGetReq
} }
@ -129,6 +129,7 @@ func newPathManager(
paths: make(map[string]*path), paths: make(map[string]*path),
pathsByConf: make(map[string]map[*path]struct{}), pathsByConf: make(map[string]map[*path]struct{}),
chReloadConf: make(chan map[string]*conf.PathConf), chReloadConf: make(chan map[string]*conf.PathConf),
chSetHLSManager: make(chan pathManagerHLSManager),
chClosePath: make(chan *path), chClosePath: make(chan *path),
chPathReady: make(chan *path), chPathReady: make(chan *path),
chPathNotReady: make(chan *path), chPathNotReady: make(chan *path),
@ -136,7 +137,6 @@ func newPathManager(
chDescribe: make(chan pathDescribeReq), chDescribe: make(chan pathDescribeReq),
chAddReader: make(chan pathAddReaderReq), chAddReader: make(chan pathAddReaderReq),
chAddPublisher: make(chan pathAddPublisherReq), chAddPublisher: make(chan pathAddPublisherReq),
chSetHLSManager: make(chan pathManagerHLSManager),
chAPIPathsList: make(chan pathAPIPathsListReq), chAPIPathsList: make(chan pathAPIPathsListReq),
chAPIPathsGet: make(chan pathAPIPathsGetReq), chAPIPathsGet: make(chan pathAPIPathsGetReq),
} }
@ -177,168 +177,212 @@ outer:
for { for {
select { select {
case newPathConfs := <-pm.chReloadConf: case newPathConfs := <-pm.chReloadConf:
for confName, pathConf := range pm.pathConfs { pm.doReloadConf(newPathConfs)
if newPathConf, ok := newPathConfs[confName]; ok {
// configuration has changed
if !newPathConf.Equal(pathConf) {
if pathConfCanBeUpdated(pathConf, newPathConf) { // paths associated with the configuration can be updated
for pa := range pm.pathsByConf[confName] {
go pa.reloadConf(newPathConf)
}
} else { // paths associated with the configuration must be recreated
for pa := range pm.pathsByConf[confName] {
pm.removePath(pa)
pa.close()
pa.wait() // avoid conflicts between sources
}
}
}
} else {
// configuration has been deleted, remove associated paths
for pa := range pm.pathsByConf[confName] {
pm.removePath(pa)
pa.close()
pa.wait() // avoid conflicts between sources
}
}
}
pm.pathConfs = newPathConfs
// add new paths case m := <-pm.chSetHLSManager:
for pathConfName, pathConf := range pm.pathConfs { pm.doSetHLSManager(m)
if _, ok := pm.paths[pathConfName]; !ok && pathConf.Regexp == nil {
pm.createPath(pathConfName, pathConf, pathConfName, nil)
}
}
case pa := <-pm.chClosePath: case pa := <-pm.chClosePath:
if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa { pm.doClosePath(pa)
continue
}
pm.removePath(pa)
case pa := <-pm.chPathReady: case pa := <-pm.chPathReady:
if pm.hlsManager != nil { pm.doPathReady(pa)
pm.hlsManager.pathReady(pa)
}
case pa := <-pm.chPathNotReady: case pa := <-pm.chPathNotReady:
if pm.hlsManager != nil { pm.doPathNotReady(pa)
pm.hlsManager.pathNotReady(pa)
}
case req := <-pm.chGetConfForPath: case req := <-pm.chGetConfForPath:
_, pathConf, _, err := getConfForPath(pm.pathConfs, req.name) pm.doGetConfForPath(req)
if err != nil {
req.res <- pathGetConfForPathRes{err: err}
continue
}
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, case req := <-pm.chDescribe:
req.name, pathConf, req.publish, req.credentials) pm.doDescribe(req)
if err != nil {
req.res <- pathGetConfForPathRes{err: err}
continue
}
req.res <- pathGetConfForPathRes{conf: pathConf} case req := <-pm.chAddReader:
pm.doAddReader(req)
case req := <-pm.chDescribe: case req := <-pm.chAddPublisher:
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) pm.doAddPublisher(req)
if err != nil {
req.res <- pathDescribeRes{err: err}
continue
}
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) case req := <-pm.chAPIPathsList:
if err != nil { pm.doAPIPathsList(req)
req.res <- pathDescribeRes{err: err}
continue
}
// create path if it doesn't exist case req := <-pm.chAPIPathsGet:
if _, ok := pm.paths[req.pathName]; !ok { pm.doAPIPathsGet(req)
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
}
req.res <- pathDescribeRes{path: pm.paths[req.pathName]} case <-pm.ctx.Done():
break outer
}
}
case req := <-pm.chAddReader: pm.ctxCancel()
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
if err != nil {
req.res <- pathAddReaderRes{err: err}
continue
}
if !req.skipAuth { if pm.metrics != nil {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials) pm.metrics.pathManagerSet(nil)
if err != nil { }
req.res <- pathAddReaderRes{err: err} }
continue
func (pm *pathManager) doReloadConf(newPathConfs map[string]*conf.PathConf) {
for confName, pathConf := range pm.pathConfs {
if newPathConf, ok := newPathConfs[confName]; ok {
// configuration has changed
if !newPathConf.Equal(pathConf) {
if pathConfCanBeUpdated(pathConf, newPathConf) { // paths associated with the configuration can be updated
for pa := range pm.pathsByConf[confName] {
go pa.reloadConf(newPathConf)
}
} else { // paths associated with the configuration must be recreated
for pa := range pm.pathsByConf[confName] {
pm.removePath(pa)
pa.close()
pa.wait() // avoid conflicts between sources
}
} }
} }
} else {
// create path if it doesn't exist // configuration has been deleted, remove associated paths
if _, ok := pm.paths[req.pathName]; !ok { for pa := range pm.pathsByConf[confName] {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) pm.removePath(pa)
pa.close()
pa.wait() // avoid conflicts between sources
} }
}
}
req.res <- pathAddReaderRes{path: pm.paths[req.pathName]} pm.pathConfs = newPathConfs
case req := <-pm.chAddPublisher: // add new paths
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName) for pathConfName, pathConf := range pm.pathConfs {
if err != nil { if _, ok := pm.paths[pathConfName]; !ok && pathConf.Regexp == nil {
req.res <- pathAddPublisherRes{err: err} pm.createPath(pathConfName, pathConf, pathConfName, nil)
continue }
} }
}
if !req.skipAuth { func (pm *pathManager) doSetHLSManager(m pathManagerHLSManager) {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials) pm.hlsManager = m
if err != nil { }
req.res <- pathAddPublisherRes{err: err}
continue
}
}
// create path if it doesn't exist func (pm *pathManager) doClosePath(pa *path) {
if _, ok := pm.paths[req.pathName]; !ok { if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches) return
} }
pm.removePath(pa)
}
req.res <- pathAddPublisherRes{path: pm.paths[req.pathName]} func (pm *pathManager) doPathReady(pa *path) {
if pm.hlsManager != nil {
pm.hlsManager.pathReady(pa)
}
}
case s := <-pm.chSetHLSManager: func (pm *pathManager) doPathNotReady(pa *path) {
pm.hlsManager = s if pm.hlsManager != nil {
pm.hlsManager.pathNotReady(pa)
}
}
case req := <-pm.chAPIPathsList: func (pm *pathManager) doGetConfForPath(req pathGetConfForPathReq) {
paths := make(map[string]*path) _, pathConf, _, err := getConfForPath(pm.pathConfs, req.name)
if err != nil {
req.res <- pathGetConfForPathRes{err: err}
return
}
for name, pa := range pm.paths { err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods,
paths[name] = pa req.name, pathConf, req.publish, req.credentials)
} if err != nil {
req.res <- pathGetConfForPathRes{err: err}
return
}
req.res <- pathAPIPathsListRes{paths: paths} req.res <- pathGetConfForPathRes{conf: pathConf}
}
case req := <-pm.chAPIPathsGet: func (pm *pathManager) doDescribe(req pathDescribeReq) {
path, ok := pm.paths[req.name] pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
if !ok { if err != nil {
req.res <- pathAPIPathsGetRes{err: errAPINotFound} req.res <- pathDescribeRes{err: err}
continue return
} }
req.res <- pathAPIPathsGetRes{path: path} err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials)
if err != nil {
req.res <- pathDescribeRes{err: err}
return
}
case <-pm.ctx.Done(): // create path if it doesn't exist
break outer if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
}
req.res <- pathDescribeRes{path: pm.paths[req.pathName]}
}
func (pm *pathManager) doAddReader(req pathAddReaderReq) {
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
if err != nil {
req.res <- pathAddReaderRes{err: err}
return
}
if !req.skipAuth {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, false, req.credentials)
if err != nil {
req.res <- pathAddReaderRes{err: err}
return
} }
} }
pm.ctxCancel() // create path if it doesn't exist
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
}
if pm.metrics != nil { req.res <- pathAddReaderRes{path: pm.paths[req.pathName]}
pm.metrics.pathManagerSet(nil) }
func (pm *pathManager) doAddPublisher(req pathAddPublisherReq) {
pathConfName, pathConf, pathMatches, err := getConfForPath(pm.pathConfs, req.pathName)
if err != nil {
req.res <- pathAddPublisherRes{err: err}
return
}
if !req.skipAuth {
err = doAuthentication(pm.externalAuthenticationURL, pm.authMethods, req.pathName, pathConf, true, req.credentials)
if err != nil {
req.res <- pathAddPublisherRes{err: err}
return
}
}
// create path if it doesn't exist
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
}
req.res <- pathAddPublisherRes{path: pm.paths[req.pathName]}
}
func (pm *pathManager) doAPIPathsList(req pathAPIPathsListReq) {
paths := make(map[string]*path)
for name, pa := range pm.paths {
paths[name] = pa
} }
req.res <- pathAPIPathsListRes{paths: paths}
}
func (pm *pathManager) doAPIPathsGet(req pathAPIPathsGetReq) {
path, ok := pm.paths[req.name]
if !ok {
req.res <- pathAPIPathsGetRes{err: errAPINotFound}
return
}
req.res <- pathAPIPathsGetRes{path: path}
} }
func (pm *pathManager) createPath( func (pm *pathManager) createPath(

205
internal/core/rtmp_conn.go

@ -17,6 +17,7 @@ import (
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
@ -240,7 +241,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
c.pathName = pathName c.pathName = pathName
c.mutex.Unlock() c.mutex.Unlock()
writer := newAsyncWriter(c.writeQueueSize, c) writer := asyncwriter.New(c.writeQueueSize, c)
var medias []*description.Media var medias []*description.Media
var w *rtmp.Writer var w *rtmp.Writer
@ -266,7 +267,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
"the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio") "the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio")
} }
defer res.stream.RemoveReader(c) defer res.stream.RemoveReader(writer)
c.Log(logger.Info, "is reading from path '%s', %s", c.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(medias)) res.path.name, sourceMediaInfo(medias))
@ -298,14 +299,14 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
// disable read deadline // disable read deadline
c.nconn.SetReadDeadline(time.Time{}) c.nconn.SetReadDeadline(time.Time{})
writer.start() writer.Start()
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
writer.stop() writer.Stop()
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
case err := <-writer.error(): case err := <-writer.Error():
return err return err
} }
} }
@ -313,7 +314,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
func (c *rtmpConn) setupVideo( func (c *rtmpConn) setupVideo(
w **rtmp.Writer, w **rtmp.Writer,
stream *stream.Stream, stream *stream.Stream,
writer *asyncWriter, writer *asyncwriter.Writer,
) (*description.Media, format.Format) { ) (*description.Media, format.Format) {
var videoFormatH264 *format.H264 var videoFormatH264 *format.H264
videoMedia := stream.Desc().FindFormat(&videoFormatH264) videoMedia := stream.Desc().FindFormat(&videoFormatH264)
@ -321,60 +322,56 @@ func (c *rtmpConn) setupVideo(
if videoFormatH264 != nil { if videoFormatH264 != nil {
var videoDTSExtractor *h264.DTSExtractor var videoDTSExtractor *h264.DTSExtractor
stream.AddReader(c, videoMedia, videoFormatH264, func(u unit.Unit) { stream.AddReader(writer, videoMedia, videoFormatH264, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.H264)
tunit := u.(*unit.H264)
if tunit.AU == nil { if tunit.AU == nil {
return nil return nil
}
idrPresent := false
nonIDRPresent := false
for _, nalu := range tunit.AU {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeIDR:
idrPresent = true
case h264.NALUTypeNonIDR:
nonIDRPresent = true
} }
}
pts := tunit.PTS var dts time.Duration
idrPresent := false // wait until we receive an IDR
nonIDRPresent := false if videoDTSExtractor == nil {
if !idrPresent {
return nil
}
for _, nalu := range tunit.AU { videoDTSExtractor = h264.NewDTSExtractor()
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeIDR:
idrPresent = true
case h264.NALUTypeNonIDR: var err error
nonIDRPresent = true dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS)
} if err != nil {
return err
}
} else {
if !idrPresent && !nonIDRPresent {
return nil
} }
var dts time.Duration var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS)
// wait until we receive an IDR if err != nil {
if videoDTSExtractor == nil { return err
if !idrPresent {
return nil
}
videoDTSExtractor = h264.NewDTSExtractor()
var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, pts)
if err != nil {
return err
}
} else {
if !idrPresent && !nonIDRPresent {
return nil
}
var err error
dts, err = videoDTSExtractor.Extract(tunit.AU, pts)
if err != nil {
return err
}
} }
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
return (*w).WriteH264(pts, dts, idrPresent, tunit.AU) return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU)
})
}) })
return videoMedia, videoFormatH264 return videoMedia, videoFormatH264
@ -386,36 +383,32 @@ func (c *rtmpConn) setupVideo(
func (c *rtmpConn) setupAudio( func (c *rtmpConn) setupAudio(
w **rtmp.Writer, w **rtmp.Writer,
stream *stream.Stream, stream *stream.Stream,
writer *asyncWriter, writer *asyncwriter.Writer,
) (*description.Media, format.Format) { ) (*description.Media, format.Format) {
var audioFormatMPEG4Generic *format.MPEG4AudioGeneric var audioFormatMPEG4Generic *format.MPEG4AudioGeneric
audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Generic) audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Generic)
if audioMedia != nil { if audioMedia != nil {
stream.AddReader(c, audioMedia, audioFormatMPEG4Generic, func(u unit.Unit) { stream.AddReader(writer, audioMedia, audioFormatMPEG4Generic, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.MPEG4AudioGeneric)
tunit := u.(*unit.MPEG4AudioGeneric)
if tunit.AUs == nil { if tunit.AUs == nil {
return nil return nil
} }
pts := tunit.PTS for i, au := range tunit.AUs {
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
for i, au := range tunit.AUs { err := (*w).WriteMPEG4Audio(
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) tunit.PTS+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
err := (*w).WriteMPEG4Audio( time.Second/time.Duration(audioFormatMPEG4Generic.ClockRate()),
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* au,
time.Second/time.Duration(audioFormatMPEG4Generic.ClockRate()), )
au, if err != nil {
) return err
if err != nil {
return err
}
} }
}
return nil return nil
})
}) })
return audioMedia, audioFormatMPEG4Generic return audioMedia, audioFormatMPEG4Generic
@ -428,19 +421,15 @@ func (c *rtmpConn) setupAudio(
audioFormatMPEG4AudioLATM.Config != nil && audioFormatMPEG4AudioLATM.Config != nil &&
len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 && len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 &&
len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 { len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 {
stream.AddReader(c, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) { stream.AddReader(writer, audioMedia, audioFormatMPEG4AudioLATM, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.MPEG4AudioLATM)
tunit := u.(*unit.MPEG4AudioLATM)
if tunit.AU == nil {
return nil
}
pts := tunit.PTS if tunit.AU == nil {
return nil
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
return (*w).WriteMPEG4Audio(pts, tunit.AU) return (*w).WriteMPEG4Audio(tunit.PTS, tunit.AU)
})
}) })
return audioMedia, audioFormatMPEG4AudioLATM return audioMedia, audioFormatMPEG4AudioLATM
@ -450,35 +439,33 @@ func (c *rtmpConn) setupAudio(
audioMedia = stream.Desc().FindFormat(&audioFormatMPEG1) audioMedia = stream.Desc().FindFormat(&audioFormatMPEG1)
if audioMedia != nil { if audioMedia != nil {
stream.AddReader(c, audioMedia, audioFormatMPEG1, func(u unit.Unit) { stream.AddReader(writer, audioMedia, audioFormatMPEG1, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.MPEG1Audio)
tunit := u.(*unit.MPEG1Audio)
pts := tunit.PTS
pts := tunit.PTS
for _, frame := range tunit.Frames {
for _, frame := range tunit.Frames { var h mpeg1audio.FrameHeader
var h mpeg1audio.FrameHeader err := h.Unmarshal(frame)
err := h.Unmarshal(frame) if err != nil {
if err != nil { return err
return err
}
if !(!h.MPEG2 && h.Layer == 3) {
return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio")
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = (*w).WriteMPEG1Audio(pts, &h, frame)
if err != nil {
return err
}
pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
} }
return nil if !(!h.MPEG2 && h.Layer == 3) {
}) return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio")
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = (*w).WriteMPEG1Audio(pts, &h, frame)
if err != nil {
return err
}
pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
}
return nil
}) })
return audioMedia, audioFormatMPEG1 return audioMedia, audioFormatMPEG1

4
internal/core/rtsp_session.go

@ -74,8 +74,8 @@ func newRTSPSession(
created: time.Now(), created: time.Now(),
} }
s.decodeErrLogger = newLimitedLogger(s) s.decodeErrLogger = logger.NewLimitedLogger(s)
s.writeErrLogger = newLimitedLogger(s) s.writeErrLogger = logger.NewLimitedLogger(s)
s.Log(logger.Info, "created by %v", s.author.NetConn().RemoteAddr()) s.Log(logger.Info, "created by %v", s.author.NetConn().RemoteAddr())

2
internal/core/rtsp_source.go

@ -94,7 +94,7 @@ func (s *rtspSource) Log(level logger.Level, format string, args ...interface{})
func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error { func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error {
s.Log(logger.Debug, "connecting") s.Log(logger.Debug, "connecting")
decodeErrLogger := newLimitedLogger(s) decodeErrLogger := logger.NewLimitedLogger(s)
c := &gortsplib.Client{ c := &gortsplib.Client{
Transport: cnf.SourceProtocol.Transport, Transport: cnf.SourceProtocol.Transport,

2
internal/core/source_static.go

@ -167,7 +167,7 @@ func (s *sourceStatic) run() {
select { select {
case err := <-implErr: case err := <-implErr:
innerCtxCancel() innerCtxCancel()
s.impl.Log(logger.Info, "ERR: %v", err) s.impl.Log(logger.Error, err.Error())
recreating = true recreating = true
recreateTimer = time.NewTimer(sourceStaticRetryPause) recreateTimer = time.NewTimer(sourceStaticRetryPause)

227
internal/core/srt_conn.go

@ -18,6 +18,7 @@ import (
"github.com/datarhei/gosrt" "github.com/datarhei/gosrt"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
@ -234,7 +235,7 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error {
return err return err
} }
decodeErrLogger := newLimitedLogger(c) decodeErrLogger := logger.NewLimitedLogger(c)
r.OnDecodeError(func(err error) { r.OnDecodeError(func(err error) {
decodeErrLogger.Log(logger.Warn, err.Error()) decodeErrLogger.Log(logger.Warn, err.Error())
@ -421,7 +422,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
c.conn = sconn c.conn = sconn
c.mutex.Unlock() c.mutex.Unlock()
writer := newAsyncWriter(c.writeQueueSize, c) writer := asyncwriter.New(c.writeQueueSize, c)
var w *mpegts.Writer var w *mpegts.Writer
var tracks []*mpegts.Track var tracks []*mpegts.Track
@ -443,75 +444,67 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
case *format.H265: //nolint:dupl case *format.H265: //nolint:dupl
track := addTrack(medi, &mpegts.CodecH265{}) track := addTrack(medi, &mpegts.CodecH265{})
randomAccessReceived := false var dtsExtractor *h265.DTSExtractor
dtsExtractor := h265.NewDTSExtractor()
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.H265)
tunit := u.(*unit.H265) if tunit.AU == nil {
if tunit.AU == nil { return nil
return nil }
}
randomAccess := h265.IsRandomAccess(tunit.AU) randomAccess := h265.IsRandomAccess(tunit.AU)
if !randomAccessReceived {
if !randomAccess {
return nil
}
randomAccessReceived = true
}
pts := tunit.PTS
dts, err := dtsExtractor.Extract(tunit.AU, pts)
if err != nil {
return err
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) if dtsExtractor == nil {
err = w.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), randomAccess, tunit.AU) if !randomAccess {
if err != nil { return nil
return err
} }
return bw.Flush() dtsExtractor = h265.NewDTSExtractor()
}) }
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU)
if err != nil {
return err
}
return bw.Flush()
}) })
case *format.H264: //nolint:dupl case *format.H264: //nolint:dupl
track := addTrack(medi, &mpegts.CodecH264{}) track := addTrack(medi, &mpegts.CodecH264{})
firstIDRReceived := false var dtsExtractor *h264.DTSExtractor
dtsExtractor := h264.NewDTSExtractor()
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.H264)
tunit := u.(*unit.H264) if tunit.AU == nil {
if tunit.AU == nil { return nil
return nil }
}
idrPresent := h264.IDRPresent(tunit.AU) idrPresent := h264.IDRPresent(tunit.AU)
if !firstIDRReceived { if dtsExtractor == nil {
if !idrPresent { if !idrPresent {
return nil return nil
}
firstIDRReceived = true
}
pts := tunit.PTS
dts, err := dtsExtractor.Extract(tunit.AU, pts)
if err != nil {
return err
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, tunit.AU)
if err != nil {
return err
} }
return bw.Flush() dtsExtractor = h264.NewDTSExtractor()
}) }
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU)
if err != nil {
return err
}
return bw.Flush()
}) })
case *format.MPEG4AudioGeneric: case *format.MPEG4AudioGeneric:
@ -519,22 +512,18 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
Config: *forma.Config, Config: *forma.Config,
}) })
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.MPEG4AudioGeneric)
tunit := u.(*unit.MPEG4AudioGeneric) if tunit.AUs == nil {
if tunit.AUs == nil { return nil
return nil }
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
pts := tunit.PTS err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs)
if err != nil {
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) return err
err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(pts), tunit.AUs) }
if err != nil { return bw.Flush()
return err
}
return bw.Flush()
})
}) })
case *format.MPEG4AudioLATM: case *format.MPEG4AudioLATM:
@ -545,22 +534,18 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
Config: *forma.Config.Programs[0].Layers[0].AudioSpecificConfig, Config: *forma.Config.Programs[0].Layers[0].AudioSpecificConfig,
}) })
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.MPEG4AudioLATM)
tunit := u.(*unit.MPEG4AudioLATM) if tunit.AU == nil {
if tunit.AU == nil { return nil
return nil }
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
pts := tunit.PTS err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), [][]byte{tunit.AU})
if err != nil {
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) return err
err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(pts), [][]byte{tunit.AU}) }
if err != nil { return bw.Flush()
return err
}
return bw.Flush()
})
}) })
} }
@ -574,43 +559,35 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
}(), }(),
}) })
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.Opus)
tunit := u.(*unit.Opus) if tunit.Packets == nil {
if tunit.Packets == nil { return nil
return nil }
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
pts := tunit.PTS err = w.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets)
if err != nil {
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) return err
err = w.WriteOpus(track, durationGoToMPEGTS(pts), tunit.Packets) }
if err != nil { return bw.Flush()
return err
}
return bw.Flush()
})
}) })
case *format.MPEG1Audio: case *format.MPEG1Audio:
track := addTrack(medi, &mpegts.CodecMPEG1Audio{}) track := addTrack(medi, &mpegts.CodecMPEG1Audio{})
res.stream.AddReader(c, medi, forma, func(u unit.Unit) { res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
writer.push(func() error { tunit := u.(*unit.MPEG1Audio)
tunit := u.(*unit.MPEG1Audio) if tunit.Frames == nil {
if tunit.Frames == nil { return nil
return nil }
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
pts := tunit.PTS err = w.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames)
if err != nil {
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) return err
err = w.WriteMPEG1Audio(track, durationGoToMPEGTS(pts), tunit.Frames) }
if err != nil { return bw.Flush()
return err
}
return bw.Flush()
})
}) })
} }
} }
@ -647,14 +624,14 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
// disable read deadline // disable read deadline
sconn.SetReadDeadline(time.Time{}) sconn.SetReadDeadline(time.Time{})
writer.start() writer.Start()
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
writer.stop() writer.Stop()
return true, fmt.Errorf("terminated") return true, fmt.Errorf("terminated")
case err := <-writer.error(): case err := <-writer.Error():
return true, err return true, err
} }
} }

2
internal/core/srt_server.go

@ -137,7 +137,7 @@ func newSRTServer(
// Log is the main logging function. // Log is the main logging function.
func (s *srtServer) Log(level logger.Level, format string, args ...interface{}) { func (s *srtServer) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[SRT] "+format, append([]interface{}{}, args...)...) s.parent.Log(level, "[SRT] "+format, args...)
} }
func (s *srtServer) close() { func (s *srtServer) close() {

2
internal/core/srt_source.go

@ -91,7 +91,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error {
return err return err
} }
decodeErrLogger := newLimitedLogger(s) decodeErrLogger := logger.NewLimitedLogger(s)
r.OnDecodeError(func(err error) { r.OnDecodeError(func(err error) {
decodeErrLogger.Log(logger.Warn, err.Error()) decodeErrLogger.Log(logger.Warn, err.Error())

2
internal/core/udp_source.go

@ -140,7 +140,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
return err return err
} }
decodeErrLogger := newLimitedLogger(s) decodeErrLogger := logger.NewLimitedLogger(s)
r.OnDecodeError(func(err error) { r.OnDecodeError(func(err error) {
decodeErrLogger.Log(logger.Warn, err.Error()) decodeErrLogger.Log(logger.Warn, err.Error())

2
internal/core/webrtc_manager.go

@ -416,7 +416,7 @@ func newWebRTCManager(
// Log is the main logging function. // Log is the main logging function.
func (m *webRTCManager) Log(level logger.Level, format string, args ...interface{}) { func (m *webRTCManager) Log(level logger.Level, format string, args ...interface{}) {
m.parent.Log(level, "[WebRTC] "+format, append([]interface{}{}, args...)...) m.parent.Log(level, "[WebRTC] "+format, args...)
} }
func (m *webRTCManager) close() { func (m *webRTCManager) close() {

10
internal/core/webrtc_outgoing_track.go

@ -12,6 +12,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit" "github.com/bluenviron/mediamtx/internal/unit"
) )
@ -348,9 +349,8 @@ func newWebRTCOutgoingTrackAudio(desc *description.Session) (*webRTCOutgoingTrac
} }
func (t *webRTCOutgoingTrack) start( func (t *webRTCOutgoingTrack) start(
r reader,
stream *stream.Stream, stream *stream.Stream,
writer *asyncWriter, writer *asyncwriter.Writer,
) { ) {
// read incoming RTCP packets to make interceptors work // read incoming RTCP packets to make interceptors work
go func() { go func() {
@ -363,9 +363,7 @@ func (t *webRTCOutgoingTrack) start(
} }
}() }()
stream.AddReader(r, t.media, t.format, func(u unit.Unit) { stream.AddReader(writer, t.media, t.format, func(u unit.Unit) error {
writer.push(func() error { return t.cb(u)
return t.cb(u)
})
}) })
} }

15
internal/core/webrtc_session.go

@ -16,6 +16,7 @@ import (
"github.com/pion/sdp/v3" "github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtcpc" "github.com/bluenviron/mediamtx/internal/webrtcpc"
) )
@ -511,29 +512,29 @@ func (s *webRTCSession) runRead() (int, error) {
s.pc = pc s.pc = pc
s.mutex.Unlock() s.mutex.Unlock()
writer := newAsyncWriter(s.writeQueueSize, s) writer := asyncwriter.New(s.writeQueueSize, s)
for _, track := range tracks { for _, track := range tracks {
track.start(s, res.stream, writer) track.start(res.stream, writer)
} }
defer res.stream.RemoveReader(s) defer res.stream.RemoveReader(writer)
s.Log(logger.Info, "is reading from path '%s', %s", s.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks))) res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks)))
writer.start() writer.Start()
select { select {
case <-pc.Disconnected(): case <-pc.Disconnected():
writer.stop() writer.Stop()
return 0, fmt.Errorf("peer connection closed") return 0, fmt.Errorf("peer connection closed")
case err := <-writer.error(): case err := <-writer.Error():
return 0, err return 0, err
case <-s.ctx.Done(): case <-s.ctx.Done():
writer.stop() writer.Stop()
return 0, fmt.Errorf("terminated") return 0, fmt.Errorf("terminated")
} }
} }

2
internal/formatprocessor/processor.go

@ -1,4 +1,4 @@
// Package formatprocessor contains code to cleanup and normalize streams. // Package formatprocessor cleans and normalizes streams.
package formatprocessor package formatprocessor
import ( import (

34
internal/logger/limited_logger.go

@ -0,0 +1,34 @@
package logger
import (
"sync"
"time"
)
const (
minIntervalBetweenWarnings = 1 * time.Second
)
type limitedLogger struct {
w Writer
mutex sync.Mutex
lastPrinted time.Time
}
// NewLimitedLogger is a wrapper around a Writer that limits printed messages.
func NewLimitedLogger(w Writer) Writer {
return &limitedLogger{
w: w,
}
}
// Log is the main logging function.
func (l *limitedLogger) Log(level Level, format string, args ...interface{}) {
now := time.Now()
l.mutex.Lock()
if now.Sub(l.lastPrinted) >= minIntervalBetweenWarnings {
l.lastPrinted = now
l.w.Log(level, format, args...)
}
l.mutex.Unlock()
}

15
internal/stream/stream.go

@ -10,12 +10,15 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit" "github.com/bluenviron/mediamtx/internal/unit"
) )
type readerFunc func(unit.Unit) error
// Stream is a media stream. // Stream is a media stream.
// It stores tracks, readers and allow to write data to readers. // It stores tracks, readers and allows to write data to readers.
type Stream struct { type Stream struct {
desc *description.Session desc *description.Session
bytesReceived *uint64 bytesReceived *uint64
@ -62,7 +65,7 @@ func (s *Stream) Close() {
} }
} }
// Desc returns description of the stream. // Desc returns the description of the stream.
func (s *Stream) Desc() *description.Session { func (s *Stream) Desc() *description.Session {
return s.desc return s.desc
} }
@ -90,7 +93,7 @@ func (s *Stream) RTSPSStream(server *gortsplib.Server) *gortsplib.ServerStream {
} }
// AddReader adds a reader. // AddReader adds a reader.
func (s *Stream) AddReader(r interface{}, medi *description.Media, forma format.Format, cb func(unit.Unit)) { func (s *Stream) AddReader(r *asyncwriter.Writer, medi *description.Media, forma format.Format, cb readerFunc) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -100,7 +103,7 @@ func (s *Stream) AddReader(r interface{}, medi *description.Media, forma format.
} }
// RemoveReader removes a reader. // RemoveReader removes a reader.
func (s *Stream) RemoveReader(r interface{}) { func (s *Stream) RemoveReader(r *asyncwriter.Writer) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -112,14 +115,14 @@ func (s *Stream) RemoveReader(r interface{}) {
} }
// WriteUnit writes a Unit. // WriteUnit writes a Unit.
func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, data unit.Unit) { func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u unit.Unit) {
sm := s.smedias[medi] sm := s.smedias[medi]
sf := sm.formats[forma] sf := sm.formats[forma]
s.mutex.RLock() s.mutex.RLock()
defer s.mutex.RUnlock() defer s.mutex.RUnlock()
sf.writeUnit(s, medi, data) sf.writeUnit(s, medi, u)
} }
// WriteRTPPacket writes a RTP packet. // WriteRTPPacket writes a RTP packet.

48
internal/stream/stream_format.go

@ -8,15 +8,24 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/formatprocessor" "github.com/bluenviron/mediamtx/internal/formatprocessor"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit" "github.com/bluenviron/mediamtx/internal/unit"
) )
func unitSize(u unit.Unit) uint64 {
n := uint64(0)
for _, pkt := range u.GetRTPPackets() {
n += uint64(pkt.MarshalSize())
}
return n
}
type streamFormat struct { type streamFormat struct {
decodeErrLogger logger.Writer decodeErrLogger logger.Writer
proc formatprocessor.Processor proc formatprocessor.Processor
nonRTSPReaders map[interface{}]func(unit.Unit) readers map[*asyncwriter.Writer]readerFunc
} }
func newStreamFormat( func newStreamFormat(
@ -33,50 +42,47 @@ func newStreamFormat(
sf := &streamFormat{ sf := &streamFormat{
decodeErrLogger: decodeErrLogger, decodeErrLogger: decodeErrLogger,
proc: proc, proc: proc,
nonRTSPReaders: make(map[interface{}]func(unit.Unit)), readers: make(map[*asyncwriter.Writer]readerFunc),
} }
return sf, nil return sf, nil
} }
func (sf *streamFormat) addReader(r interface{}, cb func(unit.Unit)) { func (sf *streamFormat) addReader(r *asyncwriter.Writer, cb readerFunc) {
sf.nonRTSPReaders[r] = cb sf.readers[r] = cb
} }
func (sf *streamFormat) removeReader(r interface{}) { func (sf *streamFormat) removeReader(r *asyncwriter.Writer) {
delete(sf.nonRTSPReaders, r) delete(sf.readers, r)
} }
func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, data unit.Unit) { func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) {
hasNonRTSPReaders := len(sf.nonRTSPReaders) > 0 hasNonRTSPReaders := len(sf.readers) > 0
err := sf.proc.Process(data, hasNonRTSPReaders) err := sf.proc.Process(u, hasNonRTSPReaders)
if err != nil { if err != nil {
sf.decodeErrLogger.Log(logger.Warn, err.Error()) sf.decodeErrLogger.Log(logger.Warn, err.Error())
return return
} }
n := uint64(0) atomic.AddUint64(s.bytesReceived, unitSize(u))
for _, pkt := range data.GetRTPPackets() {
n += uint64(pkt.MarshalSize())
}
atomic.AddUint64(s.bytesReceived, n)
if s.rtspStream != nil { if s.rtspStream != nil {
for _, pkt := range data.GetRTPPackets() { for _, pkt := range u.GetRTPPackets() {
s.rtspStream.WritePacketRTPWithNTP(medi, pkt, data.GetNTP()) //nolint:errcheck s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
} }
} }
if s.rtspsStream != nil { if s.rtspsStream != nil {
for _, pkt := range data.GetRTPPackets() { for _, pkt := range u.GetRTPPackets() {
s.rtspsStream.WritePacketRTPWithNTP(medi, pkt, data.GetNTP()) //nolint:errcheck s.rtspsStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
} }
} }
// forward decoded frames to non-RTSP readers for writer, cb := range sf.readers {
for _, cb := range sf.nonRTSPReaders { writer.Push(func() error {
cb(data) return cb(u)
})
} }
} }

57
mediamtx.yml

@ -1,6 +1,6 @@
############################################### ###############################################
# General parameters # General settings
# Sets the verbosity of the program; available values are "error", "warn", "info", "debug". # Sets the verbosity of the program; available values are "error", "warn", "info", "debug".
logLevel: info logLevel: info
@ -62,9 +62,9 @@ runOnConnect:
runOnConnectRestart: no runOnConnectRestart: no
############################################### ###############################################
# RTSP parameters # RTSP settings
# Enable support for the RTSP protocol. # Allow publishing and reading streams with the RTSP protocol.
rtsp: yes rtsp: yes
# List of enabled RTSP transport protocols. # List of enabled RTSP transport protocols.
# UDP is the most performant, but doesn't work when there's a NAT/firewall between # UDP is the most performant, but doesn't work when there's a NAT/firewall between
@ -102,9 +102,9 @@ serverCert: server.crt
authMethods: [basic] authMethods: [basic]
############################################### ###############################################
# RTMP parameters # RTMP settings
# Enable support for the RTMP protocol. # Allow publishing and reading streams with the RTMP protocol.
rtmp: yes rtmp: yes
# Address of the RTMP listener. This is needed only when encryption is "no" or "optional". # Address of the RTMP listener. This is needed only when encryption is "no" or "optional".
rtmpAddress: :1935 rtmpAddress: :1935
@ -122,9 +122,9 @@ rtmpServerKey: server.key
rtmpServerCert: server.crt rtmpServerCert: server.crt
############################################### ###############################################
# HLS parameters # HLS settings
# Enable support for the HLS protocol. # Allow reading streams with the HLS protocol.
hls: yes hls: yes
# Address of the HLS listener. # Address of the HLS listener.
hlsAddress: :8888 hlsAddress: :8888
@ -178,9 +178,9 @@ hlsTrustedProxies: []
hlsDirectory: '' hlsDirectory: ''
############################################### ###############################################
# WebRTC parameters # WebRTC settings
# Enable support for the WebRTC protocol. # Allow publishing and reading streams with the WebRTC protocol.
webrtc: yes webrtc: yes
# Address of the WebRTC listener. # Address of the WebRTC listener.
webrtcAddress: :8889 webrtcAddress: :8889
@ -222,20 +222,20 @@ webrtcICEUDPMuxAddress:
# Address of a ICE TCP listener in format host:port. # Address of a ICE TCP listener in format host:port.
# If filled, ICE traffic will pass through a single TCP port, # If filled, ICE traffic will pass through a single TCP port,
# allowing the deployment of the server inside a container or behind a NAT. # allowing the deployment of the server inside a container or behind a NAT.
# Setting this parameter forces usage of the TCP protocol, which is not # Using this setting forces usage of the TCP protocol, which is not
# optimal for WebRTC. # optimal for WebRTC.
webrtcICETCPMuxAddress: webrtcICETCPMuxAddress:
############################################### ###############################################
# SRT parameters # SRT settings
# Enables support for the SRT protocol. # Allow publishing and reading streams with the SRT protocol.
srt: yes srt: yes
# Address of the SRT listener. # Address of the SRT listener.
srtAddress: :8890 srtAddress: :8890
############################################### ###############################################
# Path parameters # Path settings
# These settings are path-dependent, and the map key is the name of the path. # These settings are path-dependent, and the map key is the name of the path.
# It's possible to use regular expressions by using a tilde as prefix, # It's possible to use regular expressions by using a tilde as prefix,
@ -245,25 +245,24 @@ srtAddress: :8890
# another entry. # another entry.
paths: paths:
all: all:
###############################################
# General path settings
# Source of the stream. This can be: # Source of the stream. This can be:
# * publisher -> the stream is published by a RTSP, RTMP, WebRTC or SRT client # * publisher -> the stream is provided by a RTSP, RTMP, WebRTC or SRT client
# * rtsp://existing-url -> the stream is pulled from another RTSP server / camera # * rtsp://existing-url -> the stream is pulled from another RTSP server / camera
# * rtsps://existing-url -> the stream is pulled from another RTSP server / camera with RTSPS # * rtsps://existing-url -> the stream is pulled from another RTSP server / camera with RTSPS
# * rtmp://existing-url -> the stream is pulled from another RTMP server / camera # * rtmp://existing-url -> the stream is pulled from another RTMP server / camera
# * rtmps://existing-url -> the stream is pulled from another RTMP server / camera with RTMPS # * rtmps://existing-url -> the stream is pulled from another RTMP server / camera with RTMPS
# * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server # * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server / camera
# * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server with HTTPS # * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server / camera with HTTPS
# * udp://ip:port -> the stream is pulled with UDP, by listening on the specified IP and port # * udp://ip:port -> the stream is pulled with UDP, by listening on the specified IP and port
# * srt://existing-url -> the stream is pulled from another SRT server # * srt://existing-url -> the stream is pulled from another SRT server / camera
# * whep://existing-url -> the stream is pulled from another WebRTC server # * whep://existing-url -> the stream is pulled from another WebRTC server / camera
# * wheps://existing-url -> the stream is pulled from another WebRTC server with HTTPS # * wheps://existing-url -> the stream is pulled from another WebRTC server / camera with HTTPS
# * redirect -> the stream is provided by another path or server # * redirect -> the stream is provided by another path or server
# * rpiCamera -> the stream is provided by a Raspberry Pi Camera # * rpiCamera -> the stream is provided by a Raspberry Pi Camera
source: publisher source: publisher
###############################################
# General path parameters
# If the source is a URL, and the source certificate is self-signed # If the source is a URL, and the source certificate is self-signed
# or invalid, you can provide the fingerprint of the certificate in order to # or invalid, you can provide the fingerprint of the certificate in order to
# validate it anyway. It can be obtained by running: # validate it anyway. It can be obtained by running:
@ -283,7 +282,7 @@ paths:
maxReaders: 0 maxReaders: 0
############################################### ###############################################
# Authentication path parameters # Authentication path settings
# Username required to publish. # Username required to publish.
# SHA256-hashed values can be inserted with the "sha256:" prefix. # SHA256-hashed values can be inserted with the "sha256:" prefix.
@ -304,7 +303,7 @@ paths:
readIPs: [] readIPs: []
############################################### ###############################################
# Publisher path parameters (when source is "publisher") # Publisher path settings (when source is "publisher")
# allow another client to disconnect the current publisher and publish in its place. # allow another client to disconnect the current publisher and publish in its place.
overridePublisher: yes overridePublisher: yes
@ -313,7 +312,7 @@ paths:
fallback: fallback:
############################################### ###############################################
# RTSP path parameters (when source is a RTSP or a RTSPS URL) # RTSP path settings (when source is a RTSP or a RTSPS URL)
# protocol used to pull the stream. available values are "automatic", "udp", "multicast", "tcp". # protocol used to pull the stream. available values are "automatic", "udp", "multicast", "tcp".
sourceProtocol: automatic sourceProtocol: automatic
@ -333,13 +332,13 @@ paths:
rtspRangeStart: rtspRangeStart:
############################################### ###############################################
# Redirect path parameters (when source is "redirect") # Redirect path settings (when source is "redirect")
# RTSP URL which clients will be redirected to. # RTSP URL which clients will be redirected to.
sourceRedirect: sourceRedirect:
############################################### ###############################################
# Raspberry Pi Camera path parameters (when source is "rpiCamera") # Raspberry Pi Camera path settings (when source is "rpiCamera")
# ID of the camera # ID of the camera
rpiCameraCamID: 0 rpiCameraCamID: 0
@ -421,7 +420,7 @@ paths:
rpiCameraTextOverlay: '%Y-%m-%d %H:%M:%S - MediaMTX' rpiCameraTextOverlay: '%Y-%m-%d %H:%M:%S - MediaMTX'
############################################### ###############################################
# external commands path parameters # External commands path settings
# Command to run when this path is initialized. # Command to run when this path is initialized.
# This can be used to publish a stream and keep it always opened. # This can be used to publish a stream and keep it always opened.

Loading…
Cancel
Save