From 73ddb21e63fc908d191d57378d0759d68f044427 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sat, 16 Sep 2023 17:27:07 +0200 Subject: [PATCH] implement native recording (#1399) (#2255) * implement native recording (#1399) * support saving VP9 tracks * support saving MPEG-1 audio tracks * switch segment when codec parameters change * allow to disable recording on a path basis * allow disabling recording cleaner * support recording MPEG-1/2/4 video tracks * add microseconds to file names * add tests --- README.md | 27 +- apidocs/openapi.yaml | 16 + go.mod | 4 +- go.sum | 4 +- internal/conf/conf.go | 21 + internal/conf/path.go | 1 + internal/core/core.go | 31 ++ internal/core/path.go | 66 ++- internal/core/path_manager.go | 16 + internal/core/srt_conn.go | 24 +- internal/core/webrtc_outgoing_track.go | 15 +- internal/record/agent.go | 738 +++++++++++++++++++++++++ internal/record/agent_test.go | 151 +++++ internal/record/cleaner.go | 144 +++++ internal/record/cleaner_test.go | 46 ++ internal/record/part.go | 74 +++ internal/record/record.go | 2 + internal/record/record_path.go | 138 +++++ internal/record/segment.go | 116 ++++ internal/record/track.go | 57 ++ internal/stream/stream.go | 21 +- mediamtx.yml | 24 + 22 files changed, 1685 insertions(+), 51 deletions(-) create mode 100644 internal/record/agent.go create mode 100644 internal/record/agent_test.go create mode 100644 internal/record/cleaner.go create mode 100644 internal/record/cleaner_test.go create mode 100644 internal/record/part.go create mode 100644 internal/record/record.go create mode 100644 internal/record/record_path.go create mode 100644 internal/record/segment.go create mode 100644 internal/record/track.go diff --git a/README.md b/README.md index 9b93088e..3c1a00d8 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ And can be read from the server with: * Read live streams from the server * Streams are automatically converted from a protocol to another. For instance, it's possible to publish a stream with RTSP and read it with HLS * Serve multiple streams at once in separate paths +* Record streams to disk * Authenticate users; use internal or external authentication * Redirect readers to other RTSP servers (load balancing) * Query and control the server through the API @@ -106,7 +107,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi * [Authentication](#authentication) * [Encrypt the configuration](#encrypt-the-configuration) * [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression) - * [Save streams to disk](#save-streams-to-disk) + * [Record streams to disk](#record-streams-to-disk) * [Forward streams to another server](#forward-streams-to-another-server) * [On-demand publishing](#on-demand-publishing) * [Start on boot](#start-on-boot) @@ -1136,21 +1137,25 @@ paths: runOnReadyRestart: yes ``` -### Save streams to disk +### Record streams to disk -To save available streams to disk, use _FFmpeg_ inside the `runOnReady` parameter: +To save available streams to disk, set the `record` and the `recordPath` parameter in the configuration file: ```yml -paths: - all: - runOnReady: > - ffmpeg -i rtsp://localhost:$RTSP_PORT/$MTX_PATH - -c copy - -f segment -strftime 1 -segment_time 60 -segment_format mpegts saved_%Y-%m-%d_%H-%M-%S.ts - runOnReadyRestart: yes +# Record streams to disk. +record: yes +# Path of recording segments. +# Extension is added automatically. +# Available variables are %path (path name), %Y %m %d %H %M %S %f (time in strftime format) +recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f ``` -In the configuration above, streams are saved in MPEG-TS format, that is resilient to system crashes. +All available recording parameters are listed in the [sample configuration file](/mediamtx.yml). + +Currently the server supports recording tracks encoded with the following codecs: + +* Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video +* Audio: Opus, MPEG-4 Audio (AAC), MPEG-1 Audio (MP3) ### Forward streams to another server diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 0603b1db..b9815b21 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -175,6 +175,20 @@ components: srtAddress: type: string + # Record + record: + type: boolean + recordPath: + type: string + recordFormat: + type: string + recordPartDuration: + type: string + recordSegmentDuration: + type: string + recordDeleteAfter: + type: string + # Paths paths: type: object @@ -197,6 +211,8 @@ components: type: string maxReaders: type: integer + record: + type: boolean # Authentication publishUser: diff --git a/go.mod b/go.mod index 0120e45c..3c520bfe 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,10 @@ require ( code.cloudfoundry.org/bytefmt v0.0.0 github.com/abema/go-mp4 v0.13.0 github.com/alecthomas/kong v0.8.0 + github.com/aler9/writerseeker v1.1.0 github.com/bluenviron/gohlslib v1.0.2 github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9 - github.com/bluenviron/mediacommon v1.2.0 + github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260 github.com/datarhei/gosrt v0.5.4 github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.9.1 @@ -30,7 +31,6 @@ require ( ) require ( - github.com/aler9/writerseeker v1.1.0 // indirect github.com/asticode/go-astikit v0.30.0 // indirect github.com/asticode/go-astits v1.13.0 // indirect github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c // indirect diff --git a/go.sum b/go.sum index 96b0adbf..a12f1fb7 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/bluenviron/gohlslib v1.0.2 h1:LDA/CubL525e9rLWw+G/9GbFS6iXwozmOg8KJBT github.com/bluenviron/gohlslib v1.0.2/go.mod h1:oam0wsI2XqcHLTG6NM8HRvxAQsa3hIA0MLRiTOE7CB8= github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9 h1:NIJRhT/AYhPNe1GdWqAhDsYOTo6/hvAz5pEe1Ss+NuE= github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9/go.mod h1:aAHfFXD3LE19h86jUGJK7PpM1uwp9VFVReuH89ZlpuE= -github.com/bluenviron/mediacommon v1.2.0 h1:5tz92r2S4gPSiTlycepjXFZCgwGfVL2htCeVsoBac+U= -github.com/bluenviron/mediacommon v1.2.0/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM= +github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260 h1:s0LwQH/+cV2DdCcmqNXoIwpeoT94xnd2UtuvJEkjhiQ= +github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/internal/conf/conf.go b/internal/conf/conf.go index b7b5fe65..2970a127 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -169,6 +169,14 @@ type Conf struct { SRT bool `json:"srt"` SRTAddress string `json:"srtAddress"` + // Record + Record bool `json:"record"` + RecordPath string `json:"recordPath"` + RecordFormat string `json:"recordFormat"` + RecordPartDuration StringDuration `json:"recordPartDuration"` + RecordSegmentDuration StringDuration `json:"recordSegmentDuration"` + RecordDeleteAfter StringDuration `json:"recordDeleteAfter"` + // Paths Paths map[string]*PathConf `json:"paths"` } @@ -294,6 +302,12 @@ func (conf *Conf) Check() error { } } + // Record + + if conf.RecordFormat != "fmp4" { + return fmt.Errorf("unsupported record format '%s'", conf.RecordFormat) + } + // do not add automatically "all", since user may want to // initialize all paths through API or hot reloading. if conf.Paths == nil { @@ -379,6 +393,13 @@ func (conf *Conf) UnmarshalJSON(b []byte) error { conf.SRT = true conf.SRTAddress = ":8890" + // Record + conf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S" + conf.RecordFormat = "fmp4" + conf.RecordPartDuration = 100 * StringDuration(time.Millisecond) + conf.RecordSegmentDuration = 3600 * StringDuration(time.Second) + conf.RecordDeleteAfter = 24 * 3600 * StringDuration(time.Second) + type alias Conf d := json.NewDecoder(bytes.NewReader(b)) d.DisallowUnknownFields() diff --git a/internal/conf/path.go b/internal/conf/path.go index b9dc93d6..52d6e013 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -49,6 +49,7 @@ type PathConf struct { SourceOnDemandStartTimeout StringDuration `json:"sourceOnDemandStartTimeout"` SourceOnDemandCloseAfter StringDuration `json:"sourceOnDemandCloseAfter"` MaxReaders int `json:"maxReaders"` + Record bool `json:"record"` // Authentication PublishUser Credential `json:"publishUser"` diff --git a/internal/core/core.go b/internal/core/core.go index ed8c1a45..7894739f 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "reflect" + "time" "github.com/alecthomas/kong" "github.com/bluenviron/gortsplib/v4" @@ -16,6 +17,7 @@ import ( "github.com/bluenviron/mediamtx/internal/confwatcher" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/record" "github.com/bluenviron/mediamtx/internal/rlimit" ) @@ -37,6 +39,7 @@ type Core struct { externalCmdPool *externalcmd.Pool metrics *metrics pprof *pprof + recordCleaner *record.Cleaner pathManager *pathManager rtspServer *rtspServer rtspsServer *rtspServer @@ -237,6 +240,16 @@ func (p *Core) createResources(initial bool) error { } } + if p.conf.Record && + p.conf.RecordDeleteAfter != 0 && + p.recordCleaner == nil { + p.recordCleaner = record.NewCleaner( + p.conf.RecordPath, + time.Duration(p.conf.RecordDeleteAfter), + p, + ) + } + if p.pathManager == nil { p.pathManager = newPathManager( p.conf.ExternalAuthenticationURL, @@ -246,6 +259,10 @@ func (p *Core) createResources(initial bool) error { p.conf.WriteTimeout, p.conf.WriteQueueSize, p.conf.UDPMaxPayloadSize, + p.conf.Record, + p.conf.RecordPath, + p.conf.RecordPartDuration, + p.conf.RecordSegmentDuration, p.conf.Paths, p.externalCmdPool, p.metrics, @@ -491,6 +508,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.ReadTimeout != p.conf.ReadTimeout || closeLogger + closeRecorderCleaner := newConf == nil || + newConf.Record != p.conf.Record || + newConf.RecordPath != p.conf.RecordPath || + newConf.RecordDeleteAfter != p.conf.RecordDeleteAfter + closePathManager := newConf == nil || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || newConf.RTSPAddress != p.conf.RTSPAddress || @@ -499,6 +521,10 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.WriteTimeout != p.conf.WriteTimeout || newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize || + newConf.Record != p.conf.Record || + newConf.RecordPath != p.conf.RecordPath || + newConf.RecordPartDuration != p.conf.RecordPartDuration || + newConf.RecordSegmentDuration != p.conf.RecordSegmentDuration || closeMetrics || closeLogger if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { @@ -692,6 +718,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { p.pathManager = nil } + if closeRecorderCleaner && p.recordCleaner != nil { + p.recordCleaner.Close() + p.recordCleaner = nil + } + if closePPROF && p.pprof != nil { p.pprof.close() p.pprof = nil diff --git a/internal/core/path.go b/internal/core/path.go index 77e85c1f..f6e1e0ac 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -7,7 +7,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/bluenviron/gortsplib/v4/pkg/description" @@ -16,6 +15,7 @@ import ( "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/record" "github.com/bluenviron/mediamtx/internal/stream" ) @@ -171,26 +171,30 @@ type pathAPIPathsGetReq struct { } type path struct { - rtspAddress string - readTimeout conf.StringDuration - writeTimeout conf.StringDuration - writeQueueSize int - udpMaxPayloadSize int - confName string - conf *conf.PathConf - name string - matches []string - wg *sync.WaitGroup - externalCmdPool *externalcmd.Pool - parent pathParent + rtspAddress string + readTimeout conf.StringDuration + writeTimeout conf.StringDuration + writeQueueSize int + udpMaxPayloadSize int + record bool + recordPath string + recordPartDuration conf.StringDuration + recordSegmentDuration conf.StringDuration + confName string + conf *conf.PathConf + name string + matches []string + wg *sync.WaitGroup + externalCmdPool *externalcmd.Pool + parent pathParent ctx context.Context ctxCancel func() confMutex sync.RWMutex source source stream *stream.Stream + recordAgent *record.Agent readyTime time.Time - bytesReceived *uint64 readers map[reader]struct{} describeRequestsOnHold []pathDescribeReq readerAddRequestsOnHold []pathAddReaderReq @@ -227,6 +231,10 @@ func newPath( writeTimeout conf.StringDuration, writeQueueSize int, udpMaxPayloadSize int, + record bool, + recordPath string, + recordPartDuration conf.StringDuration, + recordSegmentDuration conf.StringDuration, confName string, cnf *conf.PathConf, name string, @@ -243,6 +251,10 @@ func newPath( writeTimeout: writeTimeout, writeQueueSize: writeQueueSize, udpMaxPayloadSize: udpMaxPayloadSize, + record: record, + recordPath: recordPath, + recordPartDuration: recordPartDuration, + recordSegmentDuration: recordSegmentDuration, confName: confName, conf: cnf, name: name, @@ -252,7 +264,6 @@ func newPath( parent: parent, ctx: ctx, ctxCancel: ctxCancel, - bytesReceived: new(uint64), readers: make(map[reader]struct{}), onDemandStaticSourceReadyTimer: newEmptyTimer(), onDemandStaticSourceCloseTimer: newEmptyTimer(), @@ -754,7 +765,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) { } return mediasDescription(pa.stream.Desc().Medias) }(), - BytesReceived: atomic.LoadUint64(pa.bytesReceived), + BytesReceived: func() uint64 { + if pa.stream == nil { + return 0 + } + return pa.stream.BytesReceived() + }(), Readers: func() []interface{} { ret := []interface{}{} for r := range pa.readers { @@ -868,13 +884,24 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error pa.udpMaxPayloadSize, desc, allocateEncoder, - pa.bytesReceived, logger.NewLimitedLogger(pa.source), ) if err != nil { return err } + if pa.record && pa.conf.Record { + pa.recordAgent = record.NewAgent( + pa.writeQueueSize, + pa.recordPath, + time.Duration(pa.recordPartDuration), + time.Duration(pa.recordSegmentDuration), + pa.name, + pa.stream, + pa, + ) + } + pa.readyTime = time.Now() if pa.conf.RunOnReady != "" { @@ -908,6 +935,11 @@ func (pa *path) setNotReady() { pa.Log(logger.Info, "runOnReady command stopped") } + if pa.recordAgent != nil { + pa.recordAgent.Close() + pa.recordAgent = nil + } + if pa.stream != nil { pa.stream.Close() pa.stream = nil diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 9694b0f3..c7eb87bf 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -71,6 +71,10 @@ type pathManager struct { writeTimeout conf.StringDuration writeQueueSize int udpMaxPayloadSize int + record bool + recordPath string + recordPartDuration conf.StringDuration + recordSegmentDuration conf.StringDuration pathConfs map[string]*conf.PathConf externalCmdPool *externalcmd.Pool metrics *metrics @@ -105,6 +109,10 @@ func newPathManager( writeTimeout conf.StringDuration, writeQueueSize int, udpMaxPayloadSize int, + record bool, + recordPath string, + recordPartDuration conf.StringDuration, + recordSegmentDuration conf.StringDuration, pathConfs map[string]*conf.PathConf, externalCmdPool *externalcmd.Pool, metrics *metrics, @@ -120,6 +128,10 @@ func newPathManager( writeTimeout: writeTimeout, writeQueueSize: writeQueueSize, udpMaxPayloadSize: udpMaxPayloadSize, + record: record, + recordPath: recordPath, + recordPartDuration: recordPartDuration, + recordSegmentDuration: recordSegmentDuration, pathConfs: pathConfs, externalCmdPool: externalCmdPool, metrics: metrics, @@ -398,6 +410,10 @@ func (pm *pathManager) createPath( pm.writeTimeout, pm.writeQueueSize, pm.udpMaxPayloadSize, + pm.record, + pm.recordPath, + pm.recordPartDuration, + pm.recordSegmentDuration, pathConfName, pathConf, name, diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index e4315bf9..17a41a39 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -548,14 +548,24 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass case *format.MPEG4Video: track := addTrack(medi, &mpegts.CodecMPEG4Video{}) + firstReceived := false + var lastPTS time.Duration + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG4Video) if tunit.Frame == nil { return nil } + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEGxVideo(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + err = w.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) if err != nil { return err } @@ -565,14 +575,24 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass case *format.MPEG1Video: track := addTrack(medi, &mpegts.CodecMPEG1Video{}) + firstReceived := false + var lastPTS time.Duration + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG1Video) if tunit.Frame == nil { return nil } + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = w.WriteMPEGxVideo(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + err = w.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) if err != nil { return err } diff --git a/internal/core/webrtc_outgoing_track.go b/internal/core/webrtc_outgoing_track.go index 745f383d..3b783d43 100644 --- a/internal/core/webrtc_outgoing_track.go +++ b/internal/core/webrtc_outgoing_track.go @@ -204,8 +204,8 @@ func newWebRTCOutgoingTrackVideo(desc *description.Session) (*webRTCOutgoingTrac return nil, err } + firstReceived := false var lastPTS time.Duration - firstNALUReceived := false return &webRTCOutgoingTrack{ media: videoMedia, @@ -218,15 +218,12 @@ func newWebRTCOutgoingTrackVideo(desc *description.Session) (*webRTCOutgoingTrac return nil } - if !firstNALUReceived { - firstNALUReceived = true - lastPTS = tunit.PTS - } else { - if tunit.PTS < lastPTS { - return fmt.Errorf("WebRTC doesn't support H264 streams with B-frames") - } - lastPTS = tunit.PTS + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("WebRTC doesn't support H264 streams with B-frames") } + lastPTS = tunit.PTS packets, err := encoder.Encode(tunit.AU) if err != nil { diff --git a/internal/record/agent.go b/internal/record/agent.go new file mode 100644 index 00000000..b68bc230 --- /dev/null +++ b/internal/record/agent.go @@ -0,0 +1,738 @@ +package record + +import ( + "bytes" + "context" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/av1" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video" + "github.com/bluenviron/mediacommon/pkg/codecs/opus" + "github.com/bluenviron/mediacommon/pkg/codecs/vp9" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + + "github.com/bluenviron/mediamtx/internal/asyncwriter" + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/unit" +) + +func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { + timeScale64 := uint64(timeScale) + secs := v / time.Second + dec := v % time.Second + return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second) +} + +func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int { + switch cm { + case mpeg1audio.ChannelModeStereo, + mpeg1audio.ChannelModeJointStereo, + mpeg1audio.ChannelModeDualChannel: + return 2 + + default: + return 1 + } +} + +type sample struct { + *fmp4.PartSample + dts time.Duration +} + +// Agent saves streams on disk. +type Agent struct { + path string + partDuration time.Duration + segmentDuration time.Duration + stream *stream.Stream + parent logger.Writer + + ctx context.Context + ctxCancel func() + writer *asyncwriter.Writer + tracks []*track + hasVideo bool + currentSegment *segment + + done chan struct{} +} + +// NewAgent allocates a nAgent. +func NewAgent( + writeQueueSize int, + recordPath string, + partDuration time.Duration, + segmentDuration time.Duration, + pathName string, + stream *stream.Stream, + parent logger.Writer, +) *Agent { + recordPath, _ = filepath.Abs(recordPath) + recordPath = strings.ReplaceAll(recordPath, "%path", pathName) + recordPath += ".mp4" + + ctx, ctxCancel := context.WithCancel(context.Background()) + + r := &Agent{ + path: recordPath, + partDuration: partDuration, + segmentDuration: segmentDuration, + stream: stream, + parent: parent, + ctx: ctx, + ctxCancel: ctxCancel, + done: make(chan struct{}), + } + + r.writer = asyncwriter.New(writeQueueSize, r) + + nextID := 1 + + addTrack := func(codec fmp4.Codec) *track { + initTrack := &fmp4.InitTrack{ + TimeScale: 90000, + Codec: codec, + } + initTrack.ID = nextID + nextID++ + + track := newTrack(r, initTrack) + r.tracks = append(r.tracks, track) + + return track + } + + for _, media := range stream.Desc().Medias { + for _, forma := range media.Formats { + switch forma := forma.(type) { + case *format.AV1: + codec := &fmp4.CodecAV1{ + SequenceHeader: []byte{ + 8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64, + }, + } + track := addTrack(codec) + + firstReceived := false + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.AV1) + if tunit.TU == nil { + return nil + } + + randomAccess := false + + for _, obu := range tunit.TU { + var h av1.OBUHeader + err := h.Unmarshal(obu) + if err != nil { + return err + } + + if h.Type == av1.OBUTypeSequenceHeader { + if !bytes.Equal(codec.SequenceHeader, obu) { + codec.SequenceHeader = obu + r.updateCodecs() + } + randomAccess = true + } + } + + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } + + sampl, err := fmp4.NewPartSampleAV1( + randomAccess, + tunit.TU) + if err != nil { + return err + } + + return track.record(&sample{ + PartSample: sampl, + dts: tunit.PTS, + }) + }) + + case *format.VP9: + codec := &fmp4.CodecVP9{ + Width: 1280, + Height: 720, + Profile: 1, + BitDepth: 8, + ChromaSubsampling: 1, + ColorRange: false, + } + track := addTrack(codec) + + firstReceived := false + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.VP9) + if tunit.Frame == nil { + return nil + } + + var h vp9.Header + err := h.Unmarshal(tunit.Frame) + if err != nil { + return err + } + + randomAccess := false + + if h.FrameType == vp9.FrameTypeKeyFrame { + randomAccess = true + + if w := h.Width(); codec.Width != w { + codec.Width = w + r.updateCodecs() + } + if h := h.Width(); codec.Height != h { + codec.Height = h + r.updateCodecs() + } + if codec.Profile != h.Profile { + codec.Profile = h.Profile + r.updateCodecs() + } + if codec.BitDepth != h.ColorConfig.BitDepth { + codec.BitDepth = h.ColorConfig.BitDepth + r.updateCodecs() + } + if c := h.ChromaSubsampling(); codec.ChromaSubsampling != c { + codec.ChromaSubsampling = c + r.updateCodecs() + } + if codec.ColorRange != h.ColorConfig.ColorRange { + codec.ColorRange = h.ColorConfig.ColorRange + r.updateCodecs() + } + } + + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + IsNonSyncSample: !randomAccess, + Payload: tunit.Frame, + }, + dts: tunit.PTS, + }) + }) + + case *format.VP8: + // TODO + + case *format.H265: + vps, sps, pps := forma.SafeParams() + + if vps == nil || sps == nil || pps == nil { + vps = []byte{ + 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20, + 0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24, + } + + sps = []byte{ + 0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03, + 0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d, + 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, + 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, + 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, + 0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a, + 0x02, 0x02, 0x02, 0x01, + } + + pps = []byte{ + 0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40, + } + } + + codec := &fmp4.CodecH265{ + VPS: vps, + SPS: sps, + PPS: pps, + } + track := addTrack(codec) + + var dtsExtractor *h265.DTSExtractor + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.H265) + if tunit.AU == nil { + return nil + } + + randomAccess := false + + for _, nalu := range tunit.AU { + typ := h265.NALUType((nalu[0] >> 1) & 0b111111) + + switch typ { + case h265.NALUType_VPS_NUT: + if !bytes.Equal(codec.VPS, nalu) { + codec.VPS = nalu + r.updateCodecs() + } + + case h265.NALUType_SPS_NUT: + if !bytes.Equal(codec.SPS, nalu) { + codec.SPS = nalu + r.updateCodecs() + } + + case h265.NALUType_PPS_NUT: + if !bytes.Equal(codec.PPS, nalu) { + codec.PPS = nalu + r.updateCodecs() + } + + case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT: + randomAccess = true + } + } + + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h265.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sampl, err := fmp4.NewPartSampleH26x( + int32(durationGoToMp4(tunit.PTS-dts, 90000)), + randomAccess, + tunit.AU) + if err != nil { + return err + } + + return track.record(&sample{ + PartSample: sampl, + dts: dts, + }) + }) + + case *format.H264: + sps, pps := forma.SafeParams() + + if sps == nil || pps == nil { + sps = []byte{ + 0x67, 0x42, 0xc0, 0x1f, 0xd9, 0x00, 0xf0, 0x11, + 0x7e, 0xf0, 0x11, 0x00, 0x00, 0x03, 0x00, 0x01, + 0x00, 0x00, 0x03, 0x00, 0x30, 0x8f, 0x18, 0x32, + 0x48, + } + + pps = []byte{ + 0x68, 0xcb, 0x8c, 0xb2, + } + } + + codec := &fmp4.CodecH264{ + SPS: sps, + PPS: pps, + } + track := addTrack(codec) + + var dtsExtractor *h264.DTSExtractor + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.H264) + if tunit.AU == nil { + return nil + } + + randomAccess := false + + for _, nalu := range tunit.AU { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS: + if !bytes.Equal(codec.SPS, nalu) { + codec.SPS = nalu + r.updateCodecs() + } + + case h264.NALUTypePPS: + if !bytes.Equal(codec.PPS, nalu) { + codec.PPS = nalu + r.updateCodecs() + } + + case h264.NALUTypeIDR: + randomAccess = true + } + } + + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h264.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sampl, err := fmp4.NewPartSampleH26x( + int32(durationGoToMp4(tunit.PTS-dts, 90000)), + randomAccess, + tunit.AU) + if err != nil { + return err + } + + return track.record(&sample{ + PartSample: sampl, + dts: dts, + }) + }) + + case *format.MPEG4Video: + config := forma.SafeParams() + + if config == nil { + config = []byte{ + 0x00, 0x00, 0x01, 0xb0, 0x01, 0x00, 0x00, 0x01, + 0xb5, 0x89, 0x13, 0x00, 0x00, 0x01, 0x00, 0x00, + 0x00, 0x01, 0x20, 0x00, 0xc4, 0x8d, 0x88, 0x00, + 0xf5, 0x3c, 0x04, 0x87, 0x14, 0x63, 0x00, 0x00, + 0x01, 0xb2, 0x4c, 0x61, 0x76, 0x63, 0x35, 0x38, + 0x2e, 0x31, 0x33, 0x34, 0x2e, 0x31, 0x30, 0x30, + } + } + + codec := &fmp4.CodecMPEG4Video{ + Config: config, + } + track := addTrack(codec) + + firstReceived := false + var lastPTS time.Duration + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Video) + if tunit.Frame == nil { + return nil + } + + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + + if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) { + end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + if end >= 0 { + config := tunit.Frame[:end+4] + + if !bytes.Equal(codec.Config, config) { + codec.Config = config + r.updateCodecs() + } + } + } + + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + IsNonSyncSample: !randomAccess, + }, + dts: tunit.PTS, + }) + }) + + case *format.MPEG1Video: + codec := &fmp4.CodecMPEG1Video{ + Config: []byte{ + 0x00, 0x00, 0x01, 0xb3, 0x78, 0x04, 0x38, 0x35, + 0xff, 0xff, 0xe0, 0x18, 0x00, 0x00, 0x01, 0xb5, + 0x14, 0x4a, 0x00, 0x01, 0x00, 0x00, + }, + } + track := addTrack(codec) + + firstReceived := false + var lastPTS time.Duration + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Video) + if tunit.Frame == nil { + return nil + } + + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) + + if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, 0xB3}) { + end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, 0xB8}) + if end >= 0 { + config := tunit.Frame[:end+4] + + if !bytes.Equal(codec.Config, config) { + codec.Config = config + r.updateCodecs() + } + } + } + + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + IsNonSyncSample: !randomAccess, + }, + dts: tunit.PTS, + }) + }) + + case *format.MJPEG: + // TODO + + case *format.Opus: + codec := &fmp4.CodecOpus{ + ChannelCount: func() int { + if forma.IsStereo { + return 2 + } + return 1 + }(), + } + track := addTrack(codec) + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.Opus) + if tunit.Packets == nil { + return nil + } + + pts := tunit.PTS + + for _, packet := range tunit.Packets { + err := track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: packet, + }, + dts: pts, + }) + if err != nil { + return err + } + + pts += opus.PacketDuration(packet) + } + + return nil + }) + + case *format.MPEG4AudioGeneric: + codec := &fmp4.CodecMPEG4Audio{ + Config: *forma.Config, + } + track := addTrack(codec) + + sampleRate := time.Duration(forma.Config.SampleRate) + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioGeneric) + if tunit.AUs == nil { + return nil + } + + for i, au := range tunit.AUs { + auPTS := tunit.PTS + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* + time.Second/sampleRate + + err := track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: au, + }, + dts: auPTS, + }) + if err != nil { + return err + } + } + + return nil + }) + + case *format.MPEG4AudioLATM: + codec := &fmp4.CodecMPEG4Audio{ + Config: *forma.Config.Programs[0].Layers[0].AudioSpecificConfig, + } + track := addTrack(codec) + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG4AudioLATM) + if tunit.AU == nil { + return nil + } + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.AU, + }, + dts: tunit.PTS, + }) + }) + + case *format.MPEG1Audio: + codec := &fmp4.CodecMPEG1Audio{ + SampleRate: 32000, + ChannelCount: 2, + } + track := addTrack(codec) + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + if tunit.Frames == nil { + return nil + } + + pts := tunit.PTS + + for _, frame := range tunit.Frames { + var h mpeg1audio.FrameHeader + err := h.Unmarshal(frame) + if err != nil { + return err + } + + if codec.SampleRate != h.SampleRate { + codec.SampleRate = h.SampleRate + r.updateCodecs() + } + if c := mpeg1audioChannelCount(h.ChannelMode); codec.ChannelCount != c { + codec.ChannelCount = c + r.updateCodecs() + } + + err = track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: frame, + }, + dts: pts, + }) + if err != nil { + return err + } + + pts += time.Duration(h.SampleCount()) * + time.Second / time.Duration(h.SampleRate) + } + + return nil + }) + + case *format.G722: + // TODO + + case *format.G711: + // TODO + + case *format.LPCM: + // TODO + } + } + } + + r.Log(logger.Info, "recording %d %s", + len(r.tracks), + func() string { + if len(r.tracks) == 1 { + return "track" + } + return "tracks" + }()) + + go r.run() + + return r +} + +// Close closes the Agent. +func (r *Agent) Close() { + r.ctxCancel() + <-r.done +} + +// Log is the main logging function. +func (r *Agent) Log(level logger.Level, format string, args ...interface{}) { + r.parent.Log(level, "[record] "+format, args...) +} + +func (r *Agent) run() { + close(r.done) + + r.writer.Start() + + select { + case err := <-r.writer.Error(): + r.Log(logger.Error, err.Error()) + r.stream.RemoveReader(r.writer) + + case <-r.ctx.Done(): + r.stream.RemoveReader(r.writer) + r.writer.Stop() + } + + if r.currentSegment != nil { + r.currentSegment.close() //nolint:errcheck + } +} + +func (r *Agent) updateCodecs() { + // if codec parameters have been updated, + // and current segment has already written codec parameters on disk, + // close current segment. + if r.currentSegment != nil && r.currentSegment.f != nil { + r.currentSegment.close() //nolint:errcheck + r.currentSegment = nil + } +} diff --git a/internal/record/agent_test.go b/internal/record/agent_test.go new file mode 100644 index 00000000..a41d001f --- /dev/null +++ b/internal/record/agent_test.go @@ -0,0 +1,151 @@ +package record + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/stretchr/testify/require" + + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/unit" +) + +type nilLogger struct{} + +func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { +} + +func TestAgent(t *testing.T) { + n := 0 + timeNow = func() time.Time { + n++ + if n >= 2 { + return time.Date(2008, 0o5, 20, 22, 15, 25, 125000, time.UTC) + } + return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC) + } + + desc := &description.Session{Medias: []*description.Media{ + { + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H265{ + PayloadTyp: 96, + }}, + }, + { + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + }, + { + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }}, + }, + }} + + stream, err := stream.New( + 1460, + desc, + true, + &nilLogger{}, + ) + require.NoError(t, err) + defer stream.Close() + + dir, err := os.MkdirTemp("", "mediamtx-agent") + require.NoError(t, err) + defer os.RemoveAll(dir) + + recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") + + a := NewAgent( + 1024, + recordPath, + 100*time.Millisecond, + 1*time.Second, + "mypath", + stream, + &nilLogger{}, + ) + + for i := 0; i < 3; i++ { + stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AU: [][]byte{ + { // VPS + 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20, + 0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24, + }, + { // SPS + 0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03, + 0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d, + 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, + 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, + 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, + 0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a, + 0x02, 0x02, 0x02, 0x01, + }, + { // PPS + 0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40, + }, + {byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR + }, + }) + + stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AU: [][]byte{ + { // SPS + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, + }, + { // PPS + 0x08, 0x06, 0x07, 0x08, + }, + {5}, // IDR + }, + }) + + stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4AudioGeneric{ + Base: unit.Base{ + PTS: (50 + time.Duration(i)) * time.Second, + }, + AUs: [][]byte{{1, 2, 3, 4}}, + }) + } + + time.Sleep(500 * time.Millisecond) + a.Close() + + _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4")) + require.NoError(t, err) + + _, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427.mp4")) + require.NoError(t, err) +} diff --git a/internal/record/cleaner.go b/internal/record/cleaner.go new file mode 100644 index 00000000..841325ab --- /dev/null +++ b/internal/record/cleaner.go @@ -0,0 +1,144 @@ +package record + +import ( + "context" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + "github.com/bluenviron/mediamtx/internal/logger" +) + +func commonPath(v string) string { + common := "" + remaining := v + + for { + i := strings.IndexAny(remaining, "\\/") + if i < 0 { + break + } + + var part string + part, remaining = remaining[:i+1], remaining[i+1:] + + if strings.Contains(part, "%") { + break + } + + common += part + } + + if len(common) > 0 { + common = common[:len(common)-1] + } + + return common +} + +// Cleaner removes expired recordings from disk. +type Cleaner struct { + ctx context.Context + ctxCancel func() + path string + deleteAfter time.Duration + parent logger.Writer + + done chan struct{} +} + +// NewCleaner allocates a Cleaner. +func NewCleaner( + recordPath string, + deleteAfter time.Duration, + parent logger.Writer, +) *Cleaner { + recordPath, _ = filepath.Abs(recordPath) + recordPath += ".mp4" + + ctx, ctxCancel := context.WithCancel(context.Background()) + + c := &Cleaner{ + ctx: ctx, + ctxCancel: ctxCancel, + path: recordPath, + deleteAfter: deleteAfter, + parent: parent, + done: make(chan struct{}), + } + + go c.run() + + return c +} + +// Close closes the Cleaner. +func (c *Cleaner) Close() { + c.ctxCancel() + <-c.done +} + +// Log is the main logging function. +func (c *Cleaner) Log(level logger.Level, format string, args ...interface{}) { + c.parent.Log(level, "[record cleaner]"+format, args...) +} + +func (c *Cleaner) run() { + defer close(c.done) + + interval := 30 * 60 * time.Second + if interval > (c.deleteAfter / 2) { + interval = c.deleteAfter / 2 + } + + c.doRun() //nolint:errcheck + + for { + select { + case <-time.After(interval): + c.doRun() //nolint:errcheck + + case <-c.ctx.Done(): + return + } + } +} + +func (c *Cleaner) doRun() error { + commonPath := commonPath(c.path) + now := timeNow() + + filepath.Walk(commonPath, func(path string, info fs.FileInfo, err error) error { //nolint:errcheck + if err != nil { + return err + } + + if !info.IsDir() { + params := decodeRecordPath(c.path, path) + if params != nil { + if now.Sub(params.time) > c.deleteAfter { + c.Log(logger.Debug, "removing %s", path) + os.Remove(path) + } + } + } + + return nil + }) + + filepath.Walk(commonPath, func(path string, info fs.FileInfo, err error) error { //nolint:errcheck + if err != nil { + return err + } + + if info.IsDir() { + os.Remove(path) + } + + return nil + }) + + return nil +} diff --git a/internal/record/cleaner_test.go b/internal/record/cleaner_test.go new file mode 100644 index 00000000..133769b6 --- /dev/null +++ b/internal/record/cleaner_test.go @@ -0,0 +1,46 @@ +package record + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCleaner(t *testing.T) { + timeNow = func() time.Time { + return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC) + } + + dir, err := os.MkdirTemp("", "mediamtx-cleaner") + require.NoError(t, err) + defer os.RemoveAll(dir) + + recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") + + err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4"), []byte{1}, 0o644) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427.mp4"), []byte{1}, 0o644) + require.NoError(t, err) + + c := NewCleaner( + recordPath, + 10*time.Second, + nilLogger{}, + ) + defer c.Close() + + time.Sleep(500 * time.Millisecond) + + _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4")) + require.Error(t, err) + + _, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427.mp4")) + require.NoError(t, err) +} diff --git a/internal/record/part.go b/internal/record/part.go new file mode 100644 index 00000000..e8d69c6e --- /dev/null +++ b/internal/record/part.go @@ -0,0 +1,74 @@ +package record + +import ( + "io" + "time" + + "github.com/aler9/writerseeker" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" +) + +func writePart(f io.Writer, partTracks map[*track]*fmp4.PartTrack) error { + fmp4PartTracks := make([]*fmp4.PartTrack, len(partTracks)) + i := 0 + for _, partTrack := range partTracks { + fmp4PartTracks[i] = partTrack + i++ + } + + part := &fmp4.Part{ + Tracks: fmp4PartTracks, + } + + var ws writerseeker.WriterSeeker + err := part.Marshal(&ws) + if err != nil { + return err + } + + _, err = f.Write(ws.Bytes()) + return err +} + +type part struct { + s *segment + startDTS time.Duration + + partTracks map[*track]*fmp4.PartTrack + endDTS time.Duration +} + +func newPart( + s *segment, + startDTS time.Duration, +) *part { + return &part{ + s: s, + startDTS: startDTS, + partTracks: make(map[*track]*fmp4.PartTrack), + } +} + +func (p *part) close() error { + return writePart(p.s.f, p.partTracks) +} + +func (p *part) record(track *track, sample *sample) error { + partTrack, ok := p.partTracks[track] + if !ok { + partTrack = &fmp4.PartTrack{ + ID: track.initTrack.ID, + BaseTime: durationGoToMp4(sample.dts-p.s.startDTS, track.initTrack.TimeScale), + } + p.partTracks[track] = partTrack + } + + partTrack.Samples = append(partTrack.Samples, sample.PartSample) + p.endDTS = sample.dts + + return nil +} + +func (p *part) duration() time.Duration { + return p.endDTS - p.startDTS +} diff --git a/internal/record/record.go b/internal/record/record.go new file mode 100644 index 00000000..9d5b89ab --- /dev/null +++ b/internal/record/record.go @@ -0,0 +1,2 @@ +// Package record contains the recording system. +package record diff --git a/internal/record/record_path.go b/internal/record/record_path.go new file mode 100644 index 00000000..4f164391 --- /dev/null +++ b/internal/record/record_path.go @@ -0,0 +1,138 @@ +package record + +import ( + "regexp" + "strconv" + "strings" + "time" +) + +func leadingZeros(v int, size int) string { + out := strconv.FormatInt(int64(v), 10) + if len(out) >= size { + return out + } + + out2 := "" + for i := 0; i < (size - len(out)); i++ { + out2 += "0" + } + + return out2 + out +} + +type recordPathParams struct { + path string + time time.Time +} + +func decodeRecordPath(format string, v string) *recordPathParams { + re := format + re = strings.ReplaceAll(re, "\\", "\\\\") + re = strings.ReplaceAll(re, "%path", "(.*?)") + re = strings.ReplaceAll(re, "%Y", "([0-9]{4})") + re = strings.ReplaceAll(re, "%m", "([0-9]{2})") + re = strings.ReplaceAll(re, "%d", "([0-9]{2})") + re = strings.ReplaceAll(re, "%H", "([0-9]{2})") + re = strings.ReplaceAll(re, "%M", "([0-9]{2})") + re = strings.ReplaceAll(re, "%S", "([0-9]{2})") + re = strings.ReplaceAll(re, "%f", "([0-9]{6})") + r := regexp.MustCompile(re) + + var groupMapping []string + cur := format + for { + i := strings.Index(cur, "%") + if i < 0 { + break + } + + cur = cur[i:] + + for _, va := range []string{ + "%path", + "%Y", + "%m", + "%d", + "%H", + "%M", + "%S", + "%f", + } { + if strings.HasPrefix(cur, va) { + groupMapping = append(groupMapping, va) + } + } + + cur = cur[1:] + } + + matches := r.FindStringSubmatch(v) + if matches == nil { + return nil + } + + values := make(map[string]string) + + for i, match := range matches[1:] { + values[groupMapping[i]] = match + } + + var year int + var month time.Month = 1 + day := 1 + var hour int + var minute int + var second int + var micros int + + for k, v := range values { + switch k { + case "%Y": + tmp, _ := strconv.ParseInt(v, 10, 64) + year = int(tmp) + + case "%m": + tmp, _ := strconv.ParseInt(v, 10, 64) + month = time.Month(int(tmp)) + + case "%d": + tmp, _ := strconv.ParseInt(v, 10, 64) + day = int(tmp) + + case "%H": + tmp, _ := strconv.ParseInt(v, 10, 64) + hour = int(tmp) + + case "%M": + tmp, _ := strconv.ParseInt(v, 10, 64) + minute = int(tmp) + + case "%S": + tmp, _ := strconv.ParseInt(v, 10, 64) + second = int(tmp) + + case "%f": + tmp, _ := strconv.ParseInt(v, 10, 64) + micros = int(tmp) + } + } + + t := time.Date(year, month, day, hour, minute, second, micros*1000, time.Local) + + return &recordPathParams{ + path: values["%path"], + time: t, + } +} + +func encodeRecordPath(params *recordPathParams, v string) string { + v = strings.ReplaceAll(v, "%Y", strconv.FormatInt(int64(params.time.Year()), 10)) + v = strings.ReplaceAll(v, "%m", leadingZeros(int(params.time.Month()), 2)) + v = strings.ReplaceAll(v, "%d", leadingZeros(params.time.Day(), 2)) + v = strings.ReplaceAll(v, "%H", leadingZeros(params.time.Hour(), 2)) + v = strings.ReplaceAll(v, "%M", leadingZeros(params.time.Minute(), 2)) + v = strings.ReplaceAll(v, "%S", leadingZeros(params.time.Second(), 2)) + v = strings.ReplaceAll(v, "%f", leadingZeros(params.time.Nanosecond()/1000, 6)) + return v +} diff --git a/internal/record/segment.go b/internal/record/segment.go new file mode 100644 index 00000000..dfb958a4 --- /dev/null +++ b/internal/record/segment.go @@ -0,0 +1,116 @@ +package record + +import ( + "io" + "os" + "path/filepath" + "time" + + "github.com/aler9/writerseeker" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + + "github.com/bluenviron/mediamtx/internal/logger" +) + +var timeNow = time.Now + +func writeInit(f io.Writer, tracks []*track) error { + fmp4Tracks := make([]*fmp4.InitTrack, len(tracks)) + for i, track := range tracks { + fmp4Tracks[i] = track.initTrack + } + + init := fmp4.Init{ + Tracks: fmp4Tracks, + } + + var ws writerseeker.WriterSeeker + err := init.Marshal(&ws) + if err != nil { + return err + } + + _, err = f.Write(ws.Bytes()) + return err +} + +type segment struct { + r *Agent + startDTS time.Duration + + fpath string + f *os.File + curPart *part +} + +func newSegment( + r *Agent, + startDTS time.Duration, +) *segment { + return &segment{ + r: r, + startDTS: startDTS, + } +} + +func (s *segment) close() error { + if s.curPart != nil { + err := s.flush() + + if s.f != nil { + s.r.Log(logger.Debug, "closing segment %s", s.fpath) + + err2 := s.f.Close() + if err == nil { + err = err2 + } + } + + return err + } + + return nil +} + +func (s *segment) record(track *track, sample *sample) error { + if s.curPart == nil { + s.curPart = newPart(s, sample.dts) + } else if s.curPart.duration() >= s.r.partDuration { + err := s.flush() + if err != nil { + s.curPart = nil + return err + } + + s.curPart = newPart(s, sample.dts) + } + + return s.curPart.record(track, sample) +} + +func (s *segment) flush() error { + if s.f == nil { + s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, s.r.path) + s.r.Log(logger.Debug, "opening segment %s", s.fpath) + + err := os.MkdirAll(filepath.Dir(s.fpath), 0o755) + if err != nil { + return err + } + + f, err := os.Create(s.fpath) + if err != nil { + return err + } + + err = writeInit(f, s.r.tracks) + if err != nil { + f.Close() + return err + } + + s.f = f + } + + return s.curPart.close() +} diff --git a/internal/record/track.go b/internal/record/track.go new file mode 100644 index 00000000..22fce751 --- /dev/null +++ b/internal/record/track.go @@ -0,0 +1,57 @@ +package record + +import ( + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" +) + +type track struct { + r *Agent + initTrack *fmp4.InitTrack + + nextSample *sample +} + +func newTrack( + r *Agent, + initTrack *fmp4.InitTrack, +) *track { + return &track{ + r: r, + initTrack: initTrack, + } +} + +func (t *track) record(sample *sample) error { + // wait the first video sample before setting hasVideo + if t.initTrack.Codec.IsVideo() { + t.r.hasVideo = true + } + + if t.r.currentSegment == nil { + t.r.currentSegment = newSegment(t.r, sample.dts) + } + + sample, t.nextSample = t.nextSample, sample + if sample == nil { + return nil + } + sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale)) + + err := t.r.currentSegment.record(t, sample) + if err != nil { + return err + } + + if (!t.r.hasVideo || t.initTrack.Codec.IsVideo()) && + !t.nextSample.IsNonSyncSample && + (t.nextSample.dts-t.r.currentSegment.startDTS) >= t.r.segmentDuration { + err := t.r.currentSegment.close() + if err != nil { + return err + } + + t.r.currentSegment = newSegment(t.r, t.nextSample.dts) + } + + return nil +} diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 91ee4c29..ce01d3e4 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -3,6 +3,7 @@ package stream import ( "sync" + "sync/atomic" "time" "github.com/bluenviron/gortsplib/v4" @@ -20,13 +21,13 @@ type readerFunc func(unit.Unit) error // Stream is a media stream. // It stores tracks, readers and allows to write data to readers. type Stream struct { - desc *description.Session - bytesReceived *uint64 + desc *description.Session - smedias map[*description.Media]*streamMedia - mutex sync.RWMutex - rtspStream *gortsplib.ServerStream - rtspsStream *gortsplib.ServerStream + bytesReceived *uint64 + smedias map[*description.Media]*streamMedia + mutex sync.RWMutex + rtspStream *gortsplib.ServerStream + rtspsStream *gortsplib.ServerStream } // New allocates a Stream. @@ -34,12 +35,11 @@ func New( udpMaxPayloadSize int, desc *description.Session, generateRTPPackets bool, - bytesReceived *uint64, decodeErrLogger logger.Writer, ) (*Stream, error) { s := &Stream{ - bytesReceived: bytesReceived, desc: desc, + bytesReceived: new(uint64), } s.smedias = make(map[*description.Media]*streamMedia) @@ -70,6 +70,11 @@ func (s *Stream) Desc() *description.Session { return s.desc } +// BytesReceived returns received bytes. +func (s *Stream) BytesReceived() uint64 { + return atomic.LoadUint64(s.bytesReceived) +} + // RTSPStream returns the RTSP stream. func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream { s.mutex.Lock() diff --git a/mediamtx.yml b/mediamtx.yml index 8df4eab5..68b941a7 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -234,6 +234,28 @@ srt: yes # Address of the SRT listener. srtAddress: :8890 +############################################### +# Recording settings + +# Record streams to disk. +record: no +# Path of recording segments. +# Extension is added automatically. +# Available variables are %path (path name), %Y %m %d %H %M %S %f (time in strftime format) +recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f +# Format of recorded segments. +# Currently the only available format is fmp4 (fragmented MP4). +recordFormat: fmp4 +# fMP4 files are concatenation of small MP4 files (parts), each with this duration. +# When a system failure occurs, the last part gets lost. +# Therefore, the part duration is equal to the RPO (recovery point objective). +recordPartDuration: 100ms +# Minimum duration of each segment. +recordSegmentDuration: 1h +# Delete segments after this timespan. +# Set to 0 to disable automatic deletion. +recordDeleteAfter: 24h + ############################################### # Path settings @@ -280,6 +302,8 @@ paths: sourceOnDemandCloseAfter: 10s # Maximum number of readers. Zero means no limit. maxReaders: 0 + # Record streams to disk (if global recording is enabled). + record: yes ############################################### # Authentication path settings