Browse Source

implement native recording (#1399) (#2255)

* implement native recording (#1399)

* support saving VP9 tracks

* support saving MPEG-1 audio tracks

* switch segment when codec parameters change

* allow to disable recording on a path basis

* allow disabling recording cleaner

* support recording MPEG-1/2/4 video tracks

* add microseconds to file names

* add tests
pull/2355/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
73ddb21e63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      README.md
  2. 16
      apidocs/openapi.yaml
  3. 4
      go.mod
  4. 4
      go.sum
  5. 21
      internal/conf/conf.go
  6. 1
      internal/conf/path.go
  7. 31
      internal/core/core.go
  8. 42
      internal/core/path.go
  9. 16
      internal/core/path_manager.go
  10. 24
      internal/core/srt_conn.go
  11. 11
      internal/core/webrtc_outgoing_track.go
  12. 738
      internal/record/agent.go
  13. 151
      internal/record/agent_test.go
  14. 144
      internal/record/cleaner.go
  15. 46
      internal/record/cleaner_test.go
  16. 74
      internal/record/part.go
  17. 2
      internal/record/record.go
  18. 138
      internal/record/record_path.go
  19. 116
      internal/record/segment.go
  20. 57
      internal/record/track.go
  21. 11
      internal/stream/stream.go
  22. 24
      mediamtx.yml

27
README.md

@ -48,6 +48,7 @@ And can be read from the server with:
* Read live streams from the server * Read live streams from the server
* Streams are automatically converted from a protocol to another. For instance, it's possible to publish a stream with RTSP and read it with HLS * Streams are automatically converted from a protocol to another. For instance, it's possible to publish a stream with RTSP and read it with HLS
* Serve multiple streams at once in separate paths * Serve multiple streams at once in separate paths
* Record streams to disk
* Authenticate users; use internal or external authentication * Authenticate users; use internal or external authentication
* Redirect readers to other RTSP servers (load balancing) * Redirect readers to other RTSP servers (load balancing)
* Query and control the server through the API * Query and control the server through the API
@ -106,7 +107,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [Authentication](#authentication) * [Authentication](#authentication)
* [Encrypt the configuration](#encrypt-the-configuration) * [Encrypt the configuration](#encrypt-the-configuration)
* [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression) * [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression)
* [Save streams to disk](#save-streams-to-disk) * [Record streams to disk](#record-streams-to-disk)
* [Forward streams to another server](#forward-streams-to-another-server) * [Forward streams to another server](#forward-streams-to-another-server)
* [On-demand publishing](#on-demand-publishing) * [On-demand publishing](#on-demand-publishing)
* [Start on boot](#start-on-boot) * [Start on boot](#start-on-boot)
@ -1136,21 +1137,25 @@ paths:
runOnReadyRestart: yes runOnReadyRestart: yes
``` ```
### Save streams to disk ### Record streams to disk
To save available streams to disk, use _FFmpeg_ inside the `runOnReady` parameter: To save available streams to disk, set the `record` and the `recordPath` parameter in the configuration file:
```yml ```yml
paths: # Record streams to disk.
all: record: yes
runOnReady: > # Path of recording segments.
ffmpeg -i rtsp://localhost:$RTSP_PORT/$MTX_PATH # Extension is added automatically.
-c copy # Available variables are %path (path name), %Y %m %d %H %M %S %f (time in strftime format)
-f segment -strftime 1 -segment_time 60 -segment_format mpegts saved_%Y-%m-%d_%H-%M-%S.ts recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f
runOnReadyRestart: yes
``` ```
In the configuration above, streams are saved in MPEG-TS format, that is resilient to system crashes. All available recording parameters are listed in the [sample configuration file](/mediamtx.yml).
Currently the server supports recording tracks encoded with the following codecs:
* Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video
* Audio: Opus, MPEG-4 Audio (AAC), MPEG-1 Audio (MP3)
### Forward streams to another server ### Forward streams to another server

16
apidocs/openapi.yaml

@ -175,6 +175,20 @@ components:
srtAddress: srtAddress:
type: string type: string
# Record
record:
type: boolean
recordPath:
type: string
recordFormat:
type: string
recordPartDuration:
type: string
recordSegmentDuration:
type: string
recordDeleteAfter:
type: string
# Paths # Paths
paths: paths:
type: object type: object
@ -197,6 +211,8 @@ components:
type: string type: string
maxReaders: maxReaders:
type: integer type: integer
record:
type: boolean
# Authentication # Authentication
publishUser: publishUser:

4
go.mod

@ -6,9 +6,10 @@ require (
code.cloudfoundry.org/bytefmt v0.0.0 code.cloudfoundry.org/bytefmt v0.0.0
github.com/abema/go-mp4 v0.13.0 github.com/abema/go-mp4 v0.13.0
github.com/alecthomas/kong v0.8.0 github.com/alecthomas/kong v0.8.0
github.com/aler9/writerseeker v1.1.0
github.com/bluenviron/gohlslib v1.0.2 github.com/bluenviron/gohlslib v1.0.2
github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9 github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9
github.com/bluenviron/mediacommon v1.2.0 github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260
github.com/datarhei/gosrt v0.5.4 github.com/datarhei/gosrt v0.5.4
github.com/fsnotify/fsnotify v1.6.0 github.com/fsnotify/fsnotify v1.6.0
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
@ -30,7 +31,6 @@ require (
) )
require ( require (
github.com/aler9/writerseeker v1.1.0 // indirect
github.com/asticode/go-astikit v0.30.0 // indirect github.com/asticode/go-astikit v0.30.0 // indirect
github.com/asticode/go-astits v1.13.0 // indirect github.com/asticode/go-astits v1.13.0 // indirect
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c // indirect github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c // indirect

4
go.sum

@ -16,8 +16,8 @@ github.com/bluenviron/gohlslib v1.0.2 h1:LDA/CubL525e9rLWw+G/9GbFS6iXwozmOg8KJBT
github.com/bluenviron/gohlslib v1.0.2/go.mod h1:oam0wsI2XqcHLTG6NM8HRvxAQsa3hIA0MLRiTOE7CB8= github.com/bluenviron/gohlslib v1.0.2/go.mod h1:oam0wsI2XqcHLTG6NM8HRvxAQsa3hIA0MLRiTOE7CB8=
github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9 h1:NIJRhT/AYhPNe1GdWqAhDsYOTo6/hvAz5pEe1Ss+NuE= github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9 h1:NIJRhT/AYhPNe1GdWqAhDsYOTo6/hvAz5pEe1Ss+NuE=
github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9/go.mod h1:aAHfFXD3LE19h86jUGJK7PpM1uwp9VFVReuH89ZlpuE= github.com/bluenviron/gortsplib/v4 v4.0.2-0.20230916090332-99773e19afc9/go.mod h1:aAHfFXD3LE19h86jUGJK7PpM1uwp9VFVReuH89ZlpuE=
github.com/bluenviron/mediacommon v1.2.0 h1:5tz92r2S4gPSiTlycepjXFZCgwGfVL2htCeVsoBac+U= github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260 h1:s0LwQH/+cV2DdCcmqNXoIwpeoT94xnd2UtuvJEkjhiQ=
github.com/bluenviron/mediacommon v1.2.0/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM= github.com/bluenviron/mediacommon v1.2.1-0.20230916111859-5042498c7260/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=

21
internal/conf/conf.go

@ -169,6 +169,14 @@ type Conf struct {
SRT bool `json:"srt"` SRT bool `json:"srt"`
SRTAddress string `json:"srtAddress"` SRTAddress string `json:"srtAddress"`
// Record
Record bool `json:"record"`
RecordPath string `json:"recordPath"`
RecordFormat string `json:"recordFormat"`
RecordPartDuration StringDuration `json:"recordPartDuration"`
RecordSegmentDuration StringDuration `json:"recordSegmentDuration"`
RecordDeleteAfter StringDuration `json:"recordDeleteAfter"`
// Paths // Paths
Paths map[string]*PathConf `json:"paths"` Paths map[string]*PathConf `json:"paths"`
} }
@ -294,6 +302,12 @@ func (conf *Conf) Check() error {
} }
} }
// Record
if conf.RecordFormat != "fmp4" {
return fmt.Errorf("unsupported record format '%s'", conf.RecordFormat)
}
// do not add automatically "all", since user may want to // do not add automatically "all", since user may want to
// initialize all paths through API or hot reloading. // initialize all paths through API or hot reloading.
if conf.Paths == nil { if conf.Paths == nil {
@ -379,6 +393,13 @@ func (conf *Conf) UnmarshalJSON(b []byte) error {
conf.SRT = true conf.SRT = true
conf.SRTAddress = ":8890" conf.SRTAddress = ":8890"
// Record
conf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S"
conf.RecordFormat = "fmp4"
conf.RecordPartDuration = 100 * StringDuration(time.Millisecond)
conf.RecordSegmentDuration = 3600 * StringDuration(time.Second)
conf.RecordDeleteAfter = 24 * 3600 * StringDuration(time.Second)
type alias Conf type alias Conf
d := json.NewDecoder(bytes.NewReader(b)) d := json.NewDecoder(bytes.NewReader(b))
d.DisallowUnknownFields() d.DisallowUnknownFields()

1
internal/conf/path.go

@ -49,6 +49,7 @@ type PathConf struct {
SourceOnDemandStartTimeout StringDuration `json:"sourceOnDemandStartTimeout"` SourceOnDemandStartTimeout StringDuration `json:"sourceOnDemandStartTimeout"`
SourceOnDemandCloseAfter StringDuration `json:"sourceOnDemandCloseAfter"` SourceOnDemandCloseAfter StringDuration `json:"sourceOnDemandCloseAfter"`
MaxReaders int `json:"maxReaders"` MaxReaders int `json:"maxReaders"`
Record bool `json:"record"`
// Authentication // Authentication
PublishUser Credential `json:"publishUser"` PublishUser Credential `json:"publishUser"`

31
internal/core/core.go

@ -7,6 +7,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"reflect" "reflect"
"time"
"github.com/alecthomas/kong" "github.com/alecthomas/kong"
"github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4"
@ -16,6 +17,7 @@ import (
"github.com/bluenviron/mediamtx/internal/confwatcher" "github.com/bluenviron/mediamtx/internal/confwatcher"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/rlimit" "github.com/bluenviron/mediamtx/internal/rlimit"
) )
@ -37,6 +39,7 @@ type Core struct {
externalCmdPool *externalcmd.Pool externalCmdPool *externalcmd.Pool
metrics *metrics metrics *metrics
pprof *pprof pprof *pprof
recordCleaner *record.Cleaner
pathManager *pathManager pathManager *pathManager
rtspServer *rtspServer rtspServer *rtspServer
rtspsServer *rtspServer rtspsServer *rtspServer
@ -237,6 +240,16 @@ func (p *Core) createResources(initial bool) error {
} }
} }
if p.conf.Record &&
p.conf.RecordDeleteAfter != 0 &&
p.recordCleaner == nil {
p.recordCleaner = record.NewCleaner(
p.conf.RecordPath,
time.Duration(p.conf.RecordDeleteAfter),
p,
)
}
if p.pathManager == nil { if p.pathManager == nil {
p.pathManager = newPathManager( p.pathManager = newPathManager(
p.conf.ExternalAuthenticationURL, p.conf.ExternalAuthenticationURL,
@ -246,6 +259,10 @@ func (p *Core) createResources(initial bool) error {
p.conf.WriteTimeout, p.conf.WriteTimeout,
p.conf.WriteQueueSize, p.conf.WriteQueueSize,
p.conf.UDPMaxPayloadSize, p.conf.UDPMaxPayloadSize,
p.conf.Record,
p.conf.RecordPath,
p.conf.RecordPartDuration,
p.conf.RecordSegmentDuration,
p.conf.Paths, p.conf.Paths,
p.externalCmdPool, p.externalCmdPool,
p.metrics, p.metrics,
@ -491,6 +508,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.ReadTimeout != p.conf.ReadTimeout || newConf.ReadTimeout != p.conf.ReadTimeout ||
closeLogger closeLogger
closeRecorderCleaner := newConf == nil ||
newConf.Record != p.conf.Record ||
newConf.RecordPath != p.conf.RecordPath ||
newConf.RecordDeleteAfter != p.conf.RecordDeleteAfter
closePathManager := newConf == nil || closePathManager := newConf == nil ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL || newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.RTSPAddress != p.conf.RTSPAddress || newConf.RTSPAddress != p.conf.RTSPAddress ||
@ -499,6 +521,10 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.WriteTimeout != p.conf.WriteTimeout || newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.WriteQueueSize != p.conf.WriteQueueSize ||
newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize || newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize ||
newConf.Record != p.conf.Record ||
newConf.RecordPath != p.conf.RecordPath ||
newConf.RecordPartDuration != p.conf.RecordPartDuration ||
newConf.RecordSegmentDuration != p.conf.RecordSegmentDuration ||
closeMetrics || closeMetrics ||
closeLogger closeLogger
if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
@ -692,6 +718,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
p.pathManager = nil p.pathManager = nil
} }
if closeRecorderCleaner && p.recordCleaner != nil {
p.recordCleaner.Close()
p.recordCleaner = nil
}
if closePPROF && p.pprof != nil { if closePPROF && p.pprof != nil {
p.pprof.close() p.pprof.close()
p.pprof = nil p.pprof = nil

42
internal/core/path.go

@ -7,7 +7,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/description"
@ -16,6 +15,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/stream"
) )
@ -176,6 +176,10 @@ type path struct {
writeTimeout conf.StringDuration writeTimeout conf.StringDuration
writeQueueSize int writeQueueSize int
udpMaxPayloadSize int udpMaxPayloadSize int
record bool
recordPath string
recordPartDuration conf.StringDuration
recordSegmentDuration conf.StringDuration
confName string confName string
conf *conf.PathConf conf *conf.PathConf
name string name string
@ -189,8 +193,8 @@ type path struct {
confMutex sync.RWMutex confMutex sync.RWMutex
source source source source
stream *stream.Stream stream *stream.Stream
recordAgent *record.Agent
readyTime time.Time readyTime time.Time
bytesReceived *uint64
readers map[reader]struct{} readers map[reader]struct{}
describeRequestsOnHold []pathDescribeReq describeRequestsOnHold []pathDescribeReq
readerAddRequestsOnHold []pathAddReaderReq readerAddRequestsOnHold []pathAddReaderReq
@ -227,6 +231,10 @@ func newPath(
writeTimeout conf.StringDuration, writeTimeout conf.StringDuration,
writeQueueSize int, writeQueueSize int,
udpMaxPayloadSize int, udpMaxPayloadSize int,
record bool,
recordPath string,
recordPartDuration conf.StringDuration,
recordSegmentDuration conf.StringDuration,
confName string, confName string,
cnf *conf.PathConf, cnf *conf.PathConf,
name string, name string,
@ -243,6 +251,10 @@ func newPath(
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
writeQueueSize: writeQueueSize, writeQueueSize: writeQueueSize,
udpMaxPayloadSize: udpMaxPayloadSize, udpMaxPayloadSize: udpMaxPayloadSize,
record: record,
recordPath: recordPath,
recordPartDuration: recordPartDuration,
recordSegmentDuration: recordSegmentDuration,
confName: confName, confName: confName,
conf: cnf, conf: cnf,
name: name, name: name,
@ -252,7 +264,6 @@ func newPath(
parent: parent, parent: parent,
ctx: ctx, ctx: ctx,
ctxCancel: ctxCancel, ctxCancel: ctxCancel,
bytesReceived: new(uint64),
readers: make(map[reader]struct{}), readers: make(map[reader]struct{}),
onDemandStaticSourceReadyTimer: newEmptyTimer(), onDemandStaticSourceReadyTimer: newEmptyTimer(),
onDemandStaticSourceCloseTimer: newEmptyTimer(), onDemandStaticSourceCloseTimer: newEmptyTimer(),
@ -754,7 +765,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
} }
return mediasDescription(pa.stream.Desc().Medias) return mediasDescription(pa.stream.Desc().Medias)
}(), }(),
BytesReceived: atomic.LoadUint64(pa.bytesReceived), BytesReceived: func() uint64 {
if pa.stream == nil {
return 0
}
return pa.stream.BytesReceived()
}(),
Readers: func() []interface{} { Readers: func() []interface{} {
ret := []interface{}{} ret := []interface{}{}
for r := range pa.readers { for r := range pa.readers {
@ -868,13 +884,24 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
pa.udpMaxPayloadSize, pa.udpMaxPayloadSize,
desc, desc,
allocateEncoder, allocateEncoder,
pa.bytesReceived,
logger.NewLimitedLogger(pa.source), logger.NewLimitedLogger(pa.source),
) )
if err != nil { if err != nil {
return err return err
} }
if pa.record && pa.conf.Record {
pa.recordAgent = record.NewAgent(
pa.writeQueueSize,
pa.recordPath,
time.Duration(pa.recordPartDuration),
time.Duration(pa.recordSegmentDuration),
pa.name,
pa.stream,
pa,
)
}
pa.readyTime = time.Now() pa.readyTime = time.Now()
if pa.conf.RunOnReady != "" { if pa.conf.RunOnReady != "" {
@ -908,6 +935,11 @@ func (pa *path) setNotReady() {
pa.Log(logger.Info, "runOnReady command stopped") pa.Log(logger.Info, "runOnReady command stopped")
} }
if pa.recordAgent != nil {
pa.recordAgent.Close()
pa.recordAgent = nil
}
if pa.stream != nil { if pa.stream != nil {
pa.stream.Close() pa.stream.Close()
pa.stream = nil pa.stream = nil

16
internal/core/path_manager.go

@ -71,6 +71,10 @@ type pathManager struct {
writeTimeout conf.StringDuration writeTimeout conf.StringDuration
writeQueueSize int writeQueueSize int
udpMaxPayloadSize int udpMaxPayloadSize int
record bool
recordPath string
recordPartDuration conf.StringDuration
recordSegmentDuration conf.StringDuration
pathConfs map[string]*conf.PathConf pathConfs map[string]*conf.PathConf
externalCmdPool *externalcmd.Pool externalCmdPool *externalcmd.Pool
metrics *metrics metrics *metrics
@ -105,6 +109,10 @@ func newPathManager(
writeTimeout conf.StringDuration, writeTimeout conf.StringDuration,
writeQueueSize int, writeQueueSize int,
udpMaxPayloadSize int, udpMaxPayloadSize int,
record bool,
recordPath string,
recordPartDuration conf.StringDuration,
recordSegmentDuration conf.StringDuration,
pathConfs map[string]*conf.PathConf, pathConfs map[string]*conf.PathConf,
externalCmdPool *externalcmd.Pool, externalCmdPool *externalcmd.Pool,
metrics *metrics, metrics *metrics,
@ -120,6 +128,10 @@ func newPathManager(
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
writeQueueSize: writeQueueSize, writeQueueSize: writeQueueSize,
udpMaxPayloadSize: udpMaxPayloadSize, udpMaxPayloadSize: udpMaxPayloadSize,
record: record,
recordPath: recordPath,
recordPartDuration: recordPartDuration,
recordSegmentDuration: recordSegmentDuration,
pathConfs: pathConfs, pathConfs: pathConfs,
externalCmdPool: externalCmdPool, externalCmdPool: externalCmdPool,
metrics: metrics, metrics: metrics,
@ -398,6 +410,10 @@ func (pm *pathManager) createPath(
pm.writeTimeout, pm.writeTimeout,
pm.writeQueueSize, pm.writeQueueSize,
pm.udpMaxPayloadSize, pm.udpMaxPayloadSize,
pm.record,
pm.recordPath,
pm.recordPartDuration,
pm.recordSegmentDuration,
pathConfName, pathConfName,
pathConf, pathConf,
name, name,

24
internal/core/srt_conn.go

@ -548,14 +548,24 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
case *format.MPEG4Video: case *format.MPEG4Video:
track := addTrack(medi, &mpegts.CodecMPEG4Video{}) track := addTrack(medi, &mpegts.CodecMPEG4Video{})
firstReceived := false
var lastPTS time.Duration
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video) tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil { if tunit.Frame == nil {
return nil return nil
} }
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteMPEGxVideo(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) err = w.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
if err != nil { if err != nil {
return err return err
} }
@ -565,14 +575,24 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
case *format.MPEG1Video: case *format.MPEG1Video:
track := addTrack(medi, &mpegts.CodecMPEG1Video{}) track := addTrack(medi, &mpegts.CodecMPEG1Video{})
firstReceived := false
var lastPTS time.Duration
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video) tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil { if tunit.Frame == nil {
return nil return nil
} }
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteMPEGxVideo(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) err = w.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
if err != nil { if err != nil {
return err return err
} }

11
internal/core/webrtc_outgoing_track.go

@ -204,8 +204,8 @@ func newWebRTCOutgoingTrackVideo(desc *description.Session) (*webRTCOutgoingTrac
return nil, err return nil, err
} }
firstReceived := false
var lastPTS time.Duration var lastPTS time.Duration
firstNALUReceived := false
return &webRTCOutgoingTrack{ return &webRTCOutgoingTrack{
media: videoMedia, media: videoMedia,
@ -218,15 +218,12 @@ func newWebRTCOutgoingTrackVideo(desc *description.Session) (*webRTCOutgoingTrac
return nil return nil
} }
if !firstNALUReceived { if !firstReceived {
firstNALUReceived = true firstReceived = true
lastPTS = tunit.PTS } else if tunit.PTS < lastPTS {
} else {
if tunit.PTS < lastPTS {
return fmt.Errorf("WebRTC doesn't support H264 streams with B-frames") return fmt.Errorf("WebRTC doesn't support H264 streams with B-frames")
} }
lastPTS = tunit.PTS lastPTS = tunit.PTS
}
packets, err := encoder.Encode(tunit.AU) packets, err := encoder.Encode(tunit.AU)
if err != nil { if err != nil {

738
internal/record/agent.go

@ -0,0 +1,738 @@
package record
import (
"bytes"
"context"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video"
"github.com/bluenviron/mediacommon/pkg/codecs/opus"
"github.com/bluenviron/mediacommon/pkg/codecs/vp9"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
func durationGoToMp4(v time.Duration, timeScale uint32) uint64 {
timeScale64 := uint64(timeScale)
secs := v / time.Second
dec := v % time.Second
return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second)
}
func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int {
switch cm {
case mpeg1audio.ChannelModeStereo,
mpeg1audio.ChannelModeJointStereo,
mpeg1audio.ChannelModeDualChannel:
return 2
default:
return 1
}
}
type sample struct {
*fmp4.PartSample
dts time.Duration
}
// Agent saves streams on disk.
type Agent struct {
path string
partDuration time.Duration
segmentDuration time.Duration
stream *stream.Stream
parent logger.Writer
ctx context.Context
ctxCancel func()
writer *asyncwriter.Writer
tracks []*track
hasVideo bool
currentSegment *segment
done chan struct{}
}
// NewAgent allocates a nAgent.
func NewAgent(
writeQueueSize int,
recordPath string,
partDuration time.Duration,
segmentDuration time.Duration,
pathName string,
stream *stream.Stream,
parent logger.Writer,
) *Agent {
recordPath, _ = filepath.Abs(recordPath)
recordPath = strings.ReplaceAll(recordPath, "%path", pathName)
recordPath += ".mp4"
ctx, ctxCancel := context.WithCancel(context.Background())
r := &Agent{
path: recordPath,
partDuration: partDuration,
segmentDuration: segmentDuration,
stream: stream,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
done: make(chan struct{}),
}
r.writer = asyncwriter.New(writeQueueSize, r)
nextID := 1
addTrack := func(codec fmp4.Codec) *track {
initTrack := &fmp4.InitTrack{
TimeScale: 90000,
Codec: codec,
}
initTrack.ID = nextID
nextID++
track := newTrack(r, initTrack)
r.tracks = append(r.tracks, track)
return track
}
for _, media := range stream.Desc().Medias {
for _, forma := range media.Formats {
switch forma := forma.(type) {
case *format.AV1:
codec := &fmp4.CodecAV1{
SequenceHeader: []byte{
8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64,
},
}
track := addTrack(codec)
firstReceived := false
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AV1)
if tunit.TU == nil {
return nil
}
randomAccess := false
for _, obu := range tunit.TU {
var h av1.OBUHeader
err := h.Unmarshal(obu)
if err != nil {
return err
}
if h.Type == av1.OBUTypeSequenceHeader {
if !bytes.Equal(codec.SequenceHeader, obu) {
codec.SequenceHeader = obu
r.updateCodecs()
}
randomAccess = true
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
}
sampl, err := fmp4.NewPartSampleAV1(
randomAccess,
tunit.TU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: tunit.PTS,
})
})
case *format.VP9:
codec := &fmp4.CodecVP9{
Width: 1280,
Height: 720,
Profile: 1,
BitDepth: 8,
ChromaSubsampling: 1,
ColorRange: false,
}
track := addTrack(codec)
firstReceived := false
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.VP9)
if tunit.Frame == nil {
return nil
}
var h vp9.Header
err := h.Unmarshal(tunit.Frame)
if err != nil {
return err
}
randomAccess := false
if h.FrameType == vp9.FrameTypeKeyFrame {
randomAccess = true
if w := h.Width(); codec.Width != w {
codec.Width = w
r.updateCodecs()
}
if h := h.Width(); codec.Height != h {
codec.Height = h
r.updateCodecs()
}
if codec.Profile != h.Profile {
codec.Profile = h.Profile
r.updateCodecs()
}
if codec.BitDepth != h.ColorConfig.BitDepth {
codec.BitDepth = h.ColorConfig.BitDepth
r.updateCodecs()
}
if c := h.ChromaSubsampling(); codec.ChromaSubsampling != c {
codec.ChromaSubsampling = c
r.updateCodecs()
}
if codec.ColorRange != h.ColorConfig.ColorRange {
codec.ColorRange = h.ColorConfig.ColorRange
r.updateCodecs()
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
}
return track.record(&sample{
PartSample: &fmp4.PartSample{
IsNonSyncSample: !randomAccess,
Payload: tunit.Frame,
},
dts: tunit.PTS,
})
})
case *format.VP8:
// TODO
case *format.H265:
vps, sps, pps := forma.SafeParams()
if vps == nil || sps == nil || pps == nil {
vps = []byte{
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20,
0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24,
}
sps = []byte{
0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03,
0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d,
0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88,
0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9,
0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc,
0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a,
0x02, 0x02, 0x02, 0x01,
}
pps = []byte{
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
}
}
codec := &fmp4.CodecH265{
VPS: vps,
SPS: sps,
PPS: pps,
}
track := addTrack(codec)
var dtsExtractor *h265.DTSExtractor
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
}
randomAccess := false
for _, nalu := range tunit.AU {
typ := h265.NALUType((nalu[0] >> 1) & 0b111111)
switch typ {
case h265.NALUType_VPS_NUT:
if !bytes.Equal(codec.VPS, nalu) {
codec.VPS = nalu
r.updateCodecs()
}
case h265.NALUType_SPS_NUT:
if !bytes.Equal(codec.SPS, nalu) {
codec.SPS = nalu
r.updateCodecs()
}
case h265.NALUType_PPS_NUT:
if !bytes.Equal(codec.PPS, nalu) {
codec.PPS = nalu
r.updateCodecs()
}
case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT:
randomAccess = true
}
}
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h265.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sampl, err := fmp4.NewPartSampleH26x(
int32(durationGoToMp4(tunit.PTS-dts, 90000)),
randomAccess,
tunit.AU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: dts,
})
})
case *format.H264:
sps, pps := forma.SafeParams()
if sps == nil || pps == nil {
sps = []byte{
0x67, 0x42, 0xc0, 0x1f, 0xd9, 0x00, 0xf0, 0x11,
0x7e, 0xf0, 0x11, 0x00, 0x00, 0x03, 0x00, 0x01,
0x00, 0x00, 0x03, 0x00, 0x30, 0x8f, 0x18, 0x32,
0x48,
}
pps = []byte{
0x68, 0xcb, 0x8c, 0xb2,
}
}
codec := &fmp4.CodecH264{
SPS: sps,
PPS: pps,
}
track := addTrack(codec)
var dtsExtractor *h264.DTSExtractor
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
randomAccess := false
for _, nalu := range tunit.AU {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
if !bytes.Equal(codec.SPS, nalu) {
codec.SPS = nalu
r.updateCodecs()
}
case h264.NALUTypePPS:
if !bytes.Equal(codec.PPS, nalu) {
codec.PPS = nalu
r.updateCodecs()
}
case h264.NALUTypeIDR:
randomAccess = true
}
}
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h264.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sampl, err := fmp4.NewPartSampleH26x(
int32(durationGoToMp4(tunit.PTS-dts, 90000)),
randomAccess,
tunit.AU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: dts,
})
})
case *format.MPEG4Video:
config := forma.SafeParams()
if config == nil {
config = []byte{
0x00, 0x00, 0x01, 0xb0, 0x01, 0x00, 0x00, 0x01,
0xb5, 0x89, 0x13, 0x00, 0x00, 0x01, 0x00, 0x00,
0x00, 0x01, 0x20, 0x00, 0xc4, 0x8d, 0x88, 0x00,
0xf5, 0x3c, 0x04, 0x87, 0x14, 0x63, 0x00, 0x00,
0x01, 0xb2, 0x4c, 0x61, 0x76, 0x63, 0x35, 0x38,
0x2e, 0x31, 0x33, 0x34, 0x2e, 0x31, 0x30, 0x30,
}
}
codec := &fmp4.CodecMPEG4Video{
Config: config,
}
track := addTrack(codec)
firstReceived := false
var lastPTS time.Duration
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
}
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) {
end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
if end >= 0 {
config := tunit.Frame[:end+4]
if !bytes.Equal(codec.Config, config) {
codec.Config = config
r.updateCodecs()
}
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Frame,
IsNonSyncSample: !randomAccess,
},
dts: tunit.PTS,
})
})
case *format.MPEG1Video:
codec := &fmp4.CodecMPEG1Video{
Config: []byte{
0x00, 0x00, 0x01, 0xb3, 0x78, 0x04, 0x38, 0x35,
0xff, 0xff, 0xe0, 0x18, 0x00, 0x00, 0x01, 0xb5,
0x14, 0x4a, 0x00, 0x01, 0x00, 0x00,
},
}
track := addTrack(codec)
firstReceived := false
var lastPTS time.Duration
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
}
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8})
if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, 0xB3}) {
end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, 0xB8})
if end >= 0 {
config := tunit.Frame[:end+4]
if !bytes.Equal(codec.Config, config) {
codec.Config = config
r.updateCodecs()
}
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Frame,
IsNonSyncSample: !randomAccess,
},
dts: tunit.PTS,
})
})
case *format.MJPEG:
// TODO
case *format.Opus:
codec := &fmp4.CodecOpus{
ChannelCount: func() int {
if forma.IsStereo {
return 2
}
return 1
}(),
}
track := addTrack(codec)
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
}
pts := tunit.PTS
for _, packet := range tunit.Packets {
err := track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: packet,
},
dts: pts,
})
if err != nil {
return err
}
pts += opus.PacketDuration(packet)
}
return nil
})
case *format.MPEG4AudioGeneric:
codec := &fmp4.CodecMPEG4Audio{
Config: *forma.Config,
}
track := addTrack(codec)
sampleRate := time.Duration(forma.Config.SampleRate)
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4AudioGeneric)
if tunit.AUs == nil {
return nil
}
for i, au := range tunit.AUs {
auPTS := tunit.PTS + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/sampleRate
err := track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: au,
},
dts: auPTS,
})
if err != nil {
return err
}
}
return nil
})
case *format.MPEG4AudioLATM:
codec := &fmp4.CodecMPEG4Audio{
Config: *forma.Config.Programs[0].Layers[0].AudioSpecificConfig,
}
track := addTrack(codec)
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4AudioLATM)
if tunit.AU == nil {
return nil
}
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.AU,
},
dts: tunit.PTS,
})
})
case *format.MPEG1Audio:
codec := &fmp4.CodecMPEG1Audio{
SampleRate: 32000,
ChannelCount: 2,
}
track := addTrack(codec)
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
}
pts := tunit.PTS
for _, frame := range tunit.Frames {
var h mpeg1audio.FrameHeader
err := h.Unmarshal(frame)
if err != nil {
return err
}
if codec.SampleRate != h.SampleRate {
codec.SampleRate = h.SampleRate
r.updateCodecs()
}
if c := mpeg1audioChannelCount(h.ChannelMode); codec.ChannelCount != c {
codec.ChannelCount = c
r.updateCodecs()
}
err = track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: frame,
},
dts: pts,
})
if err != nil {
return err
}
pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
}
return nil
})
case *format.G722:
// TODO
case *format.G711:
// TODO
case *format.LPCM:
// TODO
}
}
}
r.Log(logger.Info, "recording %d %s",
len(r.tracks),
func() string {
if len(r.tracks) == 1 {
return "track"
}
return "tracks"
}())
go r.run()
return r
}
// Close closes the Agent.
func (r *Agent) Close() {
r.ctxCancel()
<-r.done
}
// Log is the main logging function.
func (r *Agent) Log(level logger.Level, format string, args ...interface{}) {
r.parent.Log(level, "[record] "+format, args...)
}
func (r *Agent) run() {
close(r.done)
r.writer.Start()
select {
case err := <-r.writer.Error():
r.Log(logger.Error, err.Error())
r.stream.RemoveReader(r.writer)
case <-r.ctx.Done():
r.stream.RemoveReader(r.writer)
r.writer.Stop()
}
if r.currentSegment != nil {
r.currentSegment.close() //nolint:errcheck
}
}
func (r *Agent) updateCodecs() {
// if codec parameters have been updated,
// and current segment has already written codec parameters on disk,
// close current segment.
if r.currentSegment != nil && r.currentSegment.f != nil {
r.currentSegment.close() //nolint:errcheck
r.currentSegment = nil
}
}

151
internal/record/agent_test.go

@ -0,0 +1,151 @@
package record
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
type nilLogger struct{}
func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
}
func TestAgent(t *testing.T) {
n := 0
timeNow = func() time.Time {
n++
if n >= 2 {
return time.Date(2008, 0o5, 20, 22, 15, 25, 125000, time.UTC)
}
return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC)
}
desc := &description.Session{Medias: []*description.Media{
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
}},
},
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
},
{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}},
},
}}
stream, err := stream.New(
1460,
desc,
true,
&nilLogger{},
)
require.NoError(t, err)
defer stream.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err)
defer os.RemoveAll(dir)
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
a := NewAgent(
1024,
recordPath,
100*time.Millisecond,
1*time.Second,
"mypath",
stream,
&nilLogger{},
)
for i := 0; i < 3; i++ {
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // VPS
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20,
0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24,
},
{ // SPS
0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03,
0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d,
0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88,
0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9,
0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc,
0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a,
0x02, 0x02, 0x02, 0x01,
},
{ // PPS
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
},
{byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR
},
})
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // SPS
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
{ // PPS
0x08, 0x06, 0x07, 0x08,
},
{5}, // IDR
},
})
stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4AudioGeneric{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AUs: [][]byte{{1, 2, 3, 4}},
})
}
time.Sleep(500 * time.Millisecond)
a.Close()
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4"))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427.mp4"))
require.NoError(t, err)
}

144
internal/record/cleaner.go

@ -0,0 +1,144 @@
package record
import (
"context"
"io/fs"
"os"
"path/filepath"
"strings"
"time"
"github.com/bluenviron/mediamtx/internal/logger"
)
func commonPath(v string) string {
common := ""
remaining := v
for {
i := strings.IndexAny(remaining, "\\/")
if i < 0 {
break
}
var part string
part, remaining = remaining[:i+1], remaining[i+1:]
if strings.Contains(part, "%") {
break
}
common += part
}
if len(common) > 0 {
common = common[:len(common)-1]
}
return common
}
// Cleaner removes expired recordings from disk.
type Cleaner struct {
ctx context.Context
ctxCancel func()
path string
deleteAfter time.Duration
parent logger.Writer
done chan struct{}
}
// NewCleaner allocates a Cleaner.
func NewCleaner(
recordPath string,
deleteAfter time.Duration,
parent logger.Writer,
) *Cleaner {
recordPath, _ = filepath.Abs(recordPath)
recordPath += ".mp4"
ctx, ctxCancel := context.WithCancel(context.Background())
c := &Cleaner{
ctx: ctx,
ctxCancel: ctxCancel,
path: recordPath,
deleteAfter: deleteAfter,
parent: parent,
done: make(chan struct{}),
}
go c.run()
return c
}
// Close closes the Cleaner.
func (c *Cleaner) Close() {
c.ctxCancel()
<-c.done
}
// Log is the main logging function.
func (c *Cleaner) Log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[record cleaner]"+format, args...)
}
func (c *Cleaner) run() {
defer close(c.done)
interval := 30 * 60 * time.Second
if interval > (c.deleteAfter / 2) {
interval = c.deleteAfter / 2
}
c.doRun() //nolint:errcheck
for {
select {
case <-time.After(interval):
c.doRun() //nolint:errcheck
case <-c.ctx.Done():
return
}
}
}
func (c *Cleaner) doRun() error {
commonPath := commonPath(c.path)
now := timeNow()
filepath.Walk(commonPath, func(path string, info fs.FileInfo, err error) error { //nolint:errcheck
if err != nil {
return err
}
if !info.IsDir() {
params := decodeRecordPath(c.path, path)
if params != nil {
if now.Sub(params.time) > c.deleteAfter {
c.Log(logger.Debug, "removing %s", path)
os.Remove(path)
}
}
}
return nil
})
filepath.Walk(commonPath, func(path string, info fs.FileInfo, err error) error { //nolint:errcheck
if err != nil {
return err
}
if info.IsDir() {
os.Remove(path)
}
return nil
})
return nil
}

46
internal/record/cleaner_test.go

@ -0,0 +1,46 @@
package record
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestCleaner(t *testing.T) {
timeNow = func() time.Time {
return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC)
}
dir, err := os.MkdirTemp("", "mediamtx-cleaner")
require.NoError(t, err)
defer os.RemoveAll(dir)
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4"), []byte{1}, 0o644)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427.mp4"), []byte{1}, 0o644)
require.NoError(t, err)
c := NewCleaner(
recordPath,
10*time.Second,
nilLogger{},
)
defer c.Close()
time.Sleep(500 * time.Millisecond)
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4"))
require.Error(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427.mp4"))
require.NoError(t, err)
}

74
internal/record/part.go

@ -0,0 +1,74 @@
package record
import (
"io"
"time"
"github.com/aler9/writerseeker"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
)
func writePart(f io.Writer, partTracks map[*track]*fmp4.PartTrack) error {
fmp4PartTracks := make([]*fmp4.PartTrack, len(partTracks))
i := 0
for _, partTrack := range partTracks {
fmp4PartTracks[i] = partTrack
i++
}
part := &fmp4.Part{
Tracks: fmp4PartTracks,
}
var ws writerseeker.WriterSeeker
err := part.Marshal(&ws)
if err != nil {
return err
}
_, err = f.Write(ws.Bytes())
return err
}
type part struct {
s *segment
startDTS time.Duration
partTracks map[*track]*fmp4.PartTrack
endDTS time.Duration
}
func newPart(
s *segment,
startDTS time.Duration,
) *part {
return &part{
s: s,
startDTS: startDTS,
partTracks: make(map[*track]*fmp4.PartTrack),
}
}
func (p *part) close() error {
return writePart(p.s.f, p.partTracks)
}
func (p *part) record(track *track, sample *sample) error {
partTrack, ok := p.partTracks[track]
if !ok {
partTrack = &fmp4.PartTrack{
ID: track.initTrack.ID,
BaseTime: durationGoToMp4(sample.dts-p.s.startDTS, track.initTrack.TimeScale),
}
p.partTracks[track] = partTrack
}
partTrack.Samples = append(partTrack.Samples, sample.PartSample)
p.endDTS = sample.dts
return nil
}
func (p *part) duration() time.Duration {
return p.endDTS - p.startDTS
}

2
internal/record/record.go

@ -0,0 +1,2 @@
// Package record contains the recording system.
package record

138
internal/record/record_path.go

@ -0,0 +1,138 @@
package record
import (
"regexp"
"strconv"
"strings"
"time"
)
func leadingZeros(v int, size int) string {
out := strconv.FormatInt(int64(v), 10)
if len(out) >= size {
return out
}
out2 := ""
for i := 0; i < (size - len(out)); i++ {
out2 += "0"
}
return out2 + out
}
type recordPathParams struct {
path string
time time.Time
}
func decodeRecordPath(format string, v string) *recordPathParams {
re := format
re = strings.ReplaceAll(re, "\\", "\\\\")
re = strings.ReplaceAll(re, "%path", "(.*?)")
re = strings.ReplaceAll(re, "%Y", "([0-9]{4})")
re = strings.ReplaceAll(re, "%m", "([0-9]{2})")
re = strings.ReplaceAll(re, "%d", "([0-9]{2})")
re = strings.ReplaceAll(re, "%H", "([0-9]{2})")
re = strings.ReplaceAll(re, "%M", "([0-9]{2})")
re = strings.ReplaceAll(re, "%S", "([0-9]{2})")
re = strings.ReplaceAll(re, "%f", "([0-9]{6})")
r := regexp.MustCompile(re)
var groupMapping []string
cur := format
for {
i := strings.Index(cur, "%")
if i < 0 {
break
}
cur = cur[i:]
for _, va := range []string{
"%path",
"%Y",
"%m",
"%d",
"%H",
"%M",
"%S",
"%f",
} {
if strings.HasPrefix(cur, va) {
groupMapping = append(groupMapping, va)
}
}
cur = cur[1:]
}
matches := r.FindStringSubmatch(v)
if matches == nil {
return nil
}
values := make(map[string]string)
for i, match := range matches[1:] {
values[groupMapping[i]] = match
}
var year int
var month time.Month = 1
day := 1
var hour int
var minute int
var second int
var micros int
for k, v := range values {
switch k {
case "%Y":
tmp, _ := strconv.ParseInt(v, 10, 64)
year = int(tmp)
case "%m":
tmp, _ := strconv.ParseInt(v, 10, 64)
month = time.Month(int(tmp))
case "%d":
tmp, _ := strconv.ParseInt(v, 10, 64)
day = int(tmp)
case "%H":
tmp, _ := strconv.ParseInt(v, 10, 64)
hour = int(tmp)
case "%M":
tmp, _ := strconv.ParseInt(v, 10, 64)
minute = int(tmp)
case "%S":
tmp, _ := strconv.ParseInt(v, 10, 64)
second = int(tmp)
case "%f":
tmp, _ := strconv.ParseInt(v, 10, 64)
micros = int(tmp)
}
}
t := time.Date(year, month, day, hour, minute, second, micros*1000, time.Local)
return &recordPathParams{
path: values["%path"],
time: t,
}
}
func encodeRecordPath(params *recordPathParams, v string) string {
v = strings.ReplaceAll(v, "%Y", strconv.FormatInt(int64(params.time.Year()), 10))
v = strings.ReplaceAll(v, "%m", leadingZeros(int(params.time.Month()), 2))
v = strings.ReplaceAll(v, "%d", leadingZeros(params.time.Day(), 2))
v = strings.ReplaceAll(v, "%H", leadingZeros(params.time.Hour(), 2))
v = strings.ReplaceAll(v, "%M", leadingZeros(params.time.Minute(), 2))
v = strings.ReplaceAll(v, "%S", leadingZeros(params.time.Second(), 2))
v = strings.ReplaceAll(v, "%f", leadingZeros(params.time.Nanosecond()/1000, 6))
return v
}

116
internal/record/segment.go

@ -0,0 +1,116 @@
package record
import (
"io"
"os"
"path/filepath"
"time"
"github.com/aler9/writerseeker"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/logger"
)
var timeNow = time.Now
func writeInit(f io.Writer, tracks []*track) error {
fmp4Tracks := make([]*fmp4.InitTrack, len(tracks))
for i, track := range tracks {
fmp4Tracks[i] = track.initTrack
}
init := fmp4.Init{
Tracks: fmp4Tracks,
}
var ws writerseeker.WriterSeeker
err := init.Marshal(&ws)
if err != nil {
return err
}
_, err = f.Write(ws.Bytes())
return err
}
type segment struct {
r *Agent
startDTS time.Duration
fpath string
f *os.File
curPart *part
}
func newSegment(
r *Agent,
startDTS time.Duration,
) *segment {
return &segment{
r: r,
startDTS: startDTS,
}
}
func (s *segment) close() error {
if s.curPart != nil {
err := s.flush()
if s.f != nil {
s.r.Log(logger.Debug, "closing segment %s", s.fpath)
err2 := s.f.Close()
if err == nil {
err = err2
}
}
return err
}
return nil
}
func (s *segment) record(track *track, sample *sample) error {
if s.curPart == nil {
s.curPart = newPart(s, sample.dts)
} else if s.curPart.duration() >= s.r.partDuration {
err := s.flush()
if err != nil {
s.curPart = nil
return err
}
s.curPart = newPart(s, sample.dts)
}
return s.curPart.record(track, sample)
}
func (s *segment) flush() error {
if s.f == nil {
s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, s.r.path)
s.r.Log(logger.Debug, "opening segment %s", s.fpath)
err := os.MkdirAll(filepath.Dir(s.fpath), 0o755)
if err != nil {
return err
}
f, err := os.Create(s.fpath)
if err != nil {
return err
}
err = writeInit(f, s.r.tracks)
if err != nil {
f.Close()
return err
}
s.f = f
}
return s.curPart.close()
}

57
internal/record/track.go

@ -0,0 +1,57 @@
package record
import (
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
)
type track struct {
r *Agent
initTrack *fmp4.InitTrack
nextSample *sample
}
func newTrack(
r *Agent,
initTrack *fmp4.InitTrack,
) *track {
return &track{
r: r,
initTrack: initTrack,
}
}
func (t *track) record(sample *sample) error {
// wait the first video sample before setting hasVideo
if t.initTrack.Codec.IsVideo() {
t.r.hasVideo = true
}
if t.r.currentSegment == nil {
t.r.currentSegment = newSegment(t.r, sample.dts)
}
sample, t.nextSample = t.nextSample, sample
if sample == nil {
return nil
}
sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale))
err := t.r.currentSegment.record(t, sample)
if err != nil {
return err
}
if (!t.r.hasVideo || t.initTrack.Codec.IsVideo()) &&
!t.nextSample.IsNonSyncSample &&
(t.nextSample.dts-t.r.currentSegment.startDTS) >= t.r.segmentDuration {
err := t.r.currentSegment.close()
if err != nil {
return err
}
t.r.currentSegment = newSegment(t.r, t.nextSample.dts)
}
return nil
}

11
internal/stream/stream.go

@ -3,6 +3,7 @@ package stream
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4"
@ -21,8 +22,8 @@ type readerFunc func(unit.Unit) error
// It stores tracks, readers and allows to write data to readers. // It stores tracks, readers and allows to write data to readers.
type Stream struct { type Stream struct {
desc *description.Session desc *description.Session
bytesReceived *uint64
bytesReceived *uint64
smedias map[*description.Media]*streamMedia smedias map[*description.Media]*streamMedia
mutex sync.RWMutex mutex sync.RWMutex
rtspStream *gortsplib.ServerStream rtspStream *gortsplib.ServerStream
@ -34,12 +35,11 @@ func New(
udpMaxPayloadSize int, udpMaxPayloadSize int,
desc *description.Session, desc *description.Session,
generateRTPPackets bool, generateRTPPackets bool,
bytesReceived *uint64,
decodeErrLogger logger.Writer, decodeErrLogger logger.Writer,
) (*Stream, error) { ) (*Stream, error) {
s := &Stream{ s := &Stream{
bytesReceived: bytesReceived,
desc: desc, desc: desc,
bytesReceived: new(uint64),
} }
s.smedias = make(map[*description.Media]*streamMedia) s.smedias = make(map[*description.Media]*streamMedia)
@ -70,6 +70,11 @@ func (s *Stream) Desc() *description.Session {
return s.desc return s.desc
} }
// BytesReceived returns received bytes.
func (s *Stream) BytesReceived() uint64 {
return atomic.LoadUint64(s.bytesReceived)
}
// RTSPStream returns the RTSP stream. // RTSPStream returns the RTSP stream.
func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream { func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream {
s.mutex.Lock() s.mutex.Lock()

24
mediamtx.yml

@ -234,6 +234,28 @@ srt: yes
# Address of the SRT listener. # Address of the SRT listener.
srtAddress: :8890 srtAddress: :8890
###############################################
# Recording settings
# Record streams to disk.
record: no
# Path of recording segments.
# Extension is added automatically.
# Available variables are %path (path name), %Y %m %d %H %M %S %f (time in strftime format)
recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f
# Format of recorded segments.
# Currently the only available format is fmp4 (fragmented MP4).
recordFormat: fmp4
# fMP4 files are concatenation of small MP4 files (parts), each with this duration.
# When a system failure occurs, the last part gets lost.
# Therefore, the part duration is equal to the RPO (recovery point objective).
recordPartDuration: 100ms
# Minimum duration of each segment.
recordSegmentDuration: 1h
# Delete segments after this timespan.
# Set to 0 to disable automatic deletion.
recordDeleteAfter: 24h
############################################### ###############################################
# Path settings # Path settings
@ -280,6 +302,8 @@ paths:
sourceOnDemandCloseAfter: 10s sourceOnDemandCloseAfter: 10s
# Maximum number of readers. Zero means no limit. # Maximum number of readers. Zero means no limit.
maxReaders: 0 maxReaders: 0
# Record streams to disk (if global recording is enabled).
record: yes
############################################### ###############################################
# Authentication path settings # Authentication path settings

Loading…
Cancel
Save