Browse Source

record: normalize decoding and encoding of segment paths (#2775)

pull/2777/head
Alessandro Ros 1 year ago committed by GitHub
parent
commit
7c8e593b0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      internal/core/core.go
  2. 14
      internal/core/path.go
  3. 8
      internal/record/agent.go
  4. 28
      internal/record/agent_instance.go
  5. 30
      internal/record/agent_test.go
  6. 66
      internal/record/cleaner.go
  7. 15
      internal/record/cleaner_test.go
  8. 28
      internal/record/format_fmp4.go
  9. 10
      internal/record/format_fmp4_part.go
  10. 8
      internal/record/format_fmp4_segment.go
  11. 2
      internal/record/format_fmp4_track.go
  12. 24
      internal/record/format_mpegts.go
  13. 16
      internal/record/format_mpegts_segment.go
  14. 51
      internal/record/record_path_test.go
  15. 37
      internal/record/segment_path.go
  16. 51
      internal/record/segment_path_test.go

21
internal/core/core.go

@ -40,9 +40,9 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry { @@ -40,9 +40,9 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
for _, pa := range paths {
if pa.Record && pa.RecordDeleteAfter != 0 {
entry := record.CleanerEntry{
RecordPath: pa.RecordPath,
RecordFormat: pa.RecordFormat,
RecordDeleteAfter: time.Duration(pa.RecordDeleteAfter),
SegmentPathFormat: pa.RecordPath,
Format: pa.RecordFormat,
DeleteAfter: time.Duration(pa.RecordDeleteAfter),
}
out[entry] = struct{}{}
}
@ -57,10 +57,10 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry { @@ -57,10 +57,10 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
}
sort.Slice(out2, func(i, j int) bool {
if out2[i].RecordPath != out2[j].RecordPath {
return out2[i].RecordPath < out2[j].RecordPath
if out2[i].SegmentPathFormat != out2[j].SegmentPathFormat {
return out2[i].SegmentPathFormat < out2[j].SegmentPathFormat
}
return out2[i].RecordDeleteAfter < out2[j].RecordDeleteAfter
return out2[i].DeleteAfter < out2[j].DeleteAfter
})
return out2
@ -295,10 +295,11 @@ func (p *Core) createResources(initial bool) error { @@ -295,10 +295,11 @@ func (p *Core) createResources(initial bool) error {
cleanerEntries := gatherCleanerEntries(p.conf.Paths)
if len(cleanerEntries) != 0 &&
p.recordCleaner == nil {
p.recordCleaner = record.NewCleaner(
cleanerEntries,
p,
)
p.recordCleaner = &record.Cleaner{
Entries: cleanerEntries,
Parent: p,
}
p.recordCleaner.Initialize()
}
if p.pathManager == nil {

14
internal/core/path.go

@ -897,13 +897,13 @@ func (pa *path) setNotReady() { @@ -897,13 +897,13 @@ func (pa *path) setNotReady() {
func (pa *path) startRecording() {
pa.recordAgent = &record.Agent{
WriteQueueSize: pa.writeQueueSize,
RecordPath: pa.conf.RecordPath,
Format: pa.conf.RecordFormat,
PartDuration: time.Duration(pa.conf.RecordPartDuration),
SegmentDuration: time.Duration(pa.conf.RecordSegmentDuration),
PathName: pa.name,
Stream: pa.stream,
WriteQueueSize: pa.writeQueueSize,
SegmentPathFormat: pa.conf.RecordPath,
Format: pa.conf.RecordFormat,
PartDuration: time.Duration(pa.conf.RecordPartDuration),
SegmentDuration: time.Duration(pa.conf.RecordSegmentDuration),
PathName: pa.name,
Stream: pa.stream,
OnSegmentCreate: func(segmentPath string) {
if pa.conf.RunOnRecordSegmentCreate != "" {
env := pa.externalCmdEnv()

8
internal/record/agent.go

@ -8,10 +8,10 @@ import ( @@ -8,10 +8,10 @@ import (
"github.com/bluenviron/mediamtx/internal/stream"
)
// Agent is a record agent.
// Agent writes recordings to disk.
type Agent struct {
WriteQueueSize int
RecordPath string
SegmentPathFormat string
Format conf.RecordFormat
PartDuration time.Duration
SegmentDuration time.Duration
@ -47,7 +47,7 @@ func (w *Agent) Initialize() { @@ -47,7 +47,7 @@ func (w *Agent) Initialize() {
w.done = make(chan struct{})
w.currentInstance = &agentInstance{
wrapper: w,
agent: w,
}
w.currentInstance.initialize()
@ -85,7 +85,7 @@ func (w *Agent) run() { @@ -85,7 +85,7 @@ func (w *Agent) run() {
}
w.currentInstance = &agentInstance{
wrapper: w,
agent: w,
}
w.currentInstance.initialize()
}

28
internal/record/agent_instance.go

@ -20,33 +20,35 @@ type sample struct { @@ -20,33 +20,35 @@ type sample struct {
}
type agentInstance struct {
wrapper *Agent
agent *Agent
resolvedPath string
writer *asyncwriter.Writer
format format
segmentPathFormat string
writer *asyncwriter.Writer
format format
terminate chan struct{}
done chan struct{}
}
func (a *agentInstance) initialize() {
a.resolvedPath = strings.ReplaceAll(a.wrapper.RecordPath, "%path", a.wrapper.PathName)
a.segmentPathFormat = a.agent.SegmentPathFormat
switch a.wrapper.Format {
a.segmentPathFormat = strings.ReplaceAll(a.segmentPathFormat, "%path", a.agent.PathName)
switch a.agent.Format {
case conf.RecordFormatMPEGTS:
a.resolvedPath += ".ts"
a.segmentPathFormat += ".ts"
default:
a.resolvedPath += ".mp4"
a.segmentPathFormat += ".mp4"
}
a.terminate = make(chan struct{})
a.done = make(chan struct{})
a.writer = asyncwriter.New(a.wrapper.WriteQueueSize, a.wrapper)
a.writer = asyncwriter.New(a.agent.WriteQueueSize, a.agent)
switch a.wrapper.Format {
switch a.agent.Format {
case conf.RecordFormatMPEGTS:
a.format = &formatMPEGTS{
a: a,
@ -75,11 +77,11 @@ func (a *agentInstance) run() { @@ -75,11 +77,11 @@ func (a *agentInstance) run() {
select {
case err := <-a.writer.Error():
a.wrapper.Log(logger.Error, err.Error())
a.wrapper.Stream.RemoveReader(a.writer)
a.agent.Log(logger.Error, err.Error())
a.agent.Stream.RemoveReader(a.writer)
case <-a.terminate:
a.wrapper.Stream.RemoveReader(a.writer)
a.agent.Stream.RemoveReader(a.writer)
a.writer.Stop()
}

30
internal/record/agent_test.go

@ -156,13 +156,13 @@ func TestAgent(t *testing.T) { @@ -156,13 +156,13 @@ func TestAgent(t *testing.T) {
}
w := &Agent{
WriteQueueSize: 1024,
RecordPath: recordPath,
Format: f,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
WriteQueueSize: 1024,
SegmentPathFormat: recordPath,
Format: f,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
OnSegmentCreate: func(fpath string) {
segCreated <- struct{}{}
},
@ -266,14 +266,14 @@ func TestAgentFMP4NegativeDTS(t *testing.T) { @@ -266,14 +266,14 @@ func TestAgentFMP4NegativeDTS(t *testing.T) {
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
w := &Agent{
WriteQueueSize: 1024,
RecordPath: recordPath,
Format: conf.RecordFormatFMP4,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
Parent: &nilLogger{},
WriteQueueSize: 1024,
SegmentPathFormat: recordPath,
Format: conf.RecordFormatFMP4,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
Parent: &nilLogger{},
}
w.Initialize()

66
internal/record/cleaner.go

@ -41,39 +41,28 @@ func commonPath(v string) string { @@ -41,39 +41,28 @@ func commonPath(v string) string {
// CleanerEntry is a cleaner entry.
type CleanerEntry struct {
RecordPath string
RecordFormat conf.RecordFormat
RecordDeleteAfter time.Duration
SegmentPathFormat string
Format conf.RecordFormat
DeleteAfter time.Duration
}
// Cleaner removes expired recording segments from disk.
type Cleaner struct {
Entries []CleanerEntry
Parent logger.Writer
ctx context.Context
ctxCancel func()
entries []CleanerEntry
parent logger.Writer
done chan struct{}
}
// NewCleaner allocates a Cleaner.
func NewCleaner(
entries []CleanerEntry,
parent logger.Writer,
) *Cleaner {
ctx, ctxCancel := context.WithCancel(context.Background())
c := &Cleaner{
ctx: ctx,
ctxCancel: ctxCancel,
entries: entries,
parent: parent,
done: make(chan struct{}),
}
// Initialize initializes a Cleaner.
func (c *Cleaner) Initialize() {
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.done = make(chan struct{})
go c.run()
return c
}
// Close closes the Cleaner.
@ -84,16 +73,16 @@ func (c *Cleaner) Close() { @@ -84,16 +73,16 @@ func (c *Cleaner) Close() {
// Log is the main logging function.
func (c *Cleaner) Log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[record cleaner]"+format, args...)
c.Parent.Log(level, "[record cleaner]"+format, args...)
}
func (c *Cleaner) run() {
defer close(c.done)
interval := 30 * 60 * time.Second
for _, e := range c.entries {
if interval > (e.RecordDeleteAfter / 2) {
interval = e.RecordDeleteAfter / 2
for _, e := range c.Entries {
if interval > (e.DeleteAfter / 2) {
interval = e.DeleteAfter / 2
}
}
@ -111,27 +100,27 @@ func (c *Cleaner) run() { @@ -111,27 +100,27 @@ func (c *Cleaner) run() {
}
func (c *Cleaner) doRun() {
for _, e := range c.entries {
for _, e := range c.Entries {
c.doRunEntry(&e) //nolint:errcheck
}
}
func (c *Cleaner) doRunEntry(e *CleanerEntry) error {
recordPath := e.RecordPath
segmentPathFormat := e.SegmentPathFormat
// we have to convert to absolute paths
// otherwise, commonPath and fpath inside Walk() won't have common elements
recordPath, _ = filepath.Abs(recordPath)
switch e.RecordFormat {
switch e.Format {
case conf.RecordFormatMPEGTS:
recordPath += ".ts"
segmentPathFormat += ".ts"
default:
recordPath += ".mp4"
segmentPathFormat += ".mp4"
}
commonPath := commonPath(recordPath)
// we have to convert to absolute paths
// otherwise, commonPath and fpath inside Walk() won't have common elements
segmentPathFormat, _ = filepath.Abs(segmentPathFormat)
commonPath := commonPath(segmentPathFormat)
now := timeNow()
filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { //nolint:errcheck
@ -140,9 +129,10 @@ func (c *Cleaner) doRunEntry(e *CleanerEntry) error { @@ -140,9 +129,10 @@ func (c *Cleaner) doRunEntry(e *CleanerEntry) error {
}
if !info.IsDir() {
params := decodeRecordPath(recordPath, fpath)
if params != nil {
if now.Sub(params.time) > e.RecordDeleteAfter {
var pa segmentPath
ok := pa.decode(segmentPathFormat, fpath)
if ok {
if now.Sub(pa.time) > e.DeleteAfter {
c.Log(logger.Debug, "removing %s", fpath)
os.Remove(fpath)
}

15
internal/record/cleaner_test.go

@ -31,14 +31,15 @@ func TestCleaner(t *testing.T) { @@ -31,14 +31,15 @@ func TestCleaner(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, "_-+*?^$()[]{}|_mypath", "2009-05-20_22-15-25-000427.mp4"), []byte{1}, 0o644)
require.NoError(t, err)
c := NewCleaner(
[]CleanerEntry{{
RecordPath: recordPath,
RecordFormat: conf.RecordFormatFMP4,
RecordDeleteAfter: 10 * time.Second,
c := &Cleaner{
Entries: []CleanerEntry{{
SegmentPathFormat: recordPath,
Format: conf.RecordFormatFMP4,
DeleteAfter: 10 * time.Second,
}},
nilLogger{},
)
Parent: nilLogger{},
}
c.Initialize()
defer c.Close()
time.Sleep(500 * time.Millisecond)

28
internal/record/format_fmp4.go

@ -132,7 +132,7 @@ func (f *formatFMP4) initialize() { @@ -132,7 +132,7 @@ func (f *formatFMP4) initialize() {
}
}
for _, media := range f.a.wrapper.Stream.Desc().Medias {
for _, media := range f.a.agent.Stream.Desc().Medias {
for _, forma := range media.Formats {
switch forma := forma.(type) {
case *rtspformat.AV1:
@ -145,7 +145,7 @@ func (f *formatFMP4) initialize() { @@ -145,7 +145,7 @@ func (f *formatFMP4) initialize() {
firstReceived := false
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AV1)
if tunit.TU == nil {
return nil
@ -202,7 +202,7 @@ func (f *formatFMP4) initialize() { @@ -202,7 +202,7 @@ func (f *formatFMP4) initialize() {
firstReceived := false
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.VP9)
if tunit.Frame == nil {
return nil
@ -299,7 +299,7 @@ func (f *formatFMP4) initialize() { @@ -299,7 +299,7 @@ func (f *formatFMP4) initialize() {
var dtsExtractor *h265.DTSExtractor
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
@ -384,7 +384,7 @@ func (f *formatFMP4) initialize() { @@ -384,7 +384,7 @@ func (f *formatFMP4) initialize() {
var dtsExtractor *h264.DTSExtractor
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
@ -460,7 +460,7 @@ func (f *formatFMP4) initialize() { @@ -460,7 +460,7 @@ func (f *formatFMP4) initialize() {
firstReceived := false
var lastPTS time.Duration
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
@ -512,7 +512,7 @@ func (f *formatFMP4) initialize() { @@ -512,7 +512,7 @@ func (f *formatFMP4) initialize() {
firstReceived := false
var lastPTS time.Duration
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
@ -560,7 +560,7 @@ func (f *formatFMP4) initialize() { @@ -560,7 +560,7 @@ func (f *formatFMP4) initialize() {
parsed := false
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MJPEG)
if tunit.Frame == nil {
return nil
@ -596,7 +596,7 @@ func (f *formatFMP4) initialize() { @@ -596,7 +596,7 @@ func (f *formatFMP4) initialize() {
}
track := addTrack(codec)
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
@ -629,7 +629,7 @@ func (f *formatFMP4) initialize() { @@ -629,7 +629,7 @@ func (f *formatFMP4) initialize() {
sampleRate := time.Duration(forma.ClockRate())
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
@ -662,7 +662,7 @@ func (f *formatFMP4) initialize() { @@ -662,7 +662,7 @@ func (f *formatFMP4) initialize() {
parsed := false
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
@ -716,7 +716,7 @@ func (f *formatFMP4) initialize() { @@ -716,7 +716,7 @@ func (f *formatFMP4) initialize() {
parsed := false
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
@ -782,7 +782,7 @@ func (f *formatFMP4) initialize() { @@ -782,7 +782,7 @@ func (f *formatFMP4) initialize() {
}
track := addTrack(codec)
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.LPCM)
if tunit.Samples == nil {
return nil
@ -799,7 +799,7 @@ func (f *formatFMP4) initialize() { @@ -799,7 +799,7 @@ func (f *formatFMP4) initialize() {
}
}
f.a.wrapper.Log(logger.Info, "recording %d %s",
f.a.agent.Log(logger.Info, "recording %d %s",
len(f.tracks),
func() string {
if len(f.tracks) == 1 {

10
internal/record/format_fmp4_part.go

@ -65,20 +65,20 @@ func newFormatFMP4Part( @@ -65,20 +65,20 @@ func newFormatFMP4Part(
func (p *formatFMP4Part) close() error {
if p.s.fi == nil {
p.s.fpath = encodeRecordPath(&recordPathParams{time: p.created}, p.s.f.a.resolvedPath)
p.s.f.a.wrapper.Log(logger.Debug, "creating segment %s", p.s.fpath)
p.s.path = segmentPath{time: p.created}.encode(p.s.f.a.segmentPathFormat)
p.s.f.a.agent.Log(logger.Debug, "creating segment %s", p.s.path)
err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755)
err := os.MkdirAll(filepath.Dir(p.s.path), 0o755)
if err != nil {
return err
}
fi, err := os.Create(p.s.fpath)
fi, err := os.Create(p.s.path)
if err != nil {
return err
}
p.s.f.a.wrapper.OnSegmentCreate(p.s.fpath)
p.s.f.a.agent.OnSegmentCreate(p.s.path)
err = writeInit(fi, p.s.f.tracks)
if err != nil {

8
internal/record/format_fmp4_segment.go

@ -37,7 +37,7 @@ type formatFMP4Segment struct { @@ -37,7 +37,7 @@ type formatFMP4Segment struct {
f *formatFMP4
startDTS time.Duration
fpath string
path string
fi *os.File
curPart *formatFMP4Part
}
@ -60,14 +60,14 @@ func (s *formatFMP4Segment) close() error { @@ -60,14 +60,14 @@ func (s *formatFMP4Segment) close() error {
}
if s.fi != nil {
s.f.a.wrapper.Log(logger.Debug, "closing segment %s", s.fpath)
s.f.a.agent.Log(logger.Debug, "closing segment %s", s.path)
err2 := s.fi.Close()
if err == nil {
err = err2
}
if err2 == nil {
s.f.a.wrapper.OnSegmentComplete(s.fpath)
s.f.a.agent.OnSegmentComplete(s.path)
}
}
@ -78,7 +78,7 @@ func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error @@ -78,7 +78,7 @@ func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error
if s.curPart == nil {
s.curPart = newFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts)
s.f.nextSequenceNumber++
} else if s.curPart.duration() >= s.f.a.wrapper.PartDuration {
} else if s.curPart.duration() >= s.f.a.agent.PartDuration {
err := s.curPart.close()
s.curPart = nil

2
internal/record/format_fmp4_track.go

@ -47,7 +47,7 @@ func (t *formatFMP4Track) record(sample *sample) error { @@ -47,7 +47,7 @@ func (t *formatFMP4Track) record(sample *sample) error {
if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) &&
!t.nextSample.IsNonSyncSample &&
(t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.wrapper.SegmentDuration {
(t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.agent.SegmentDuration {
err := t.f.currentSegment.close()
if err != nil {
return err

24
internal/record/format_mpegts.go

@ -59,7 +59,7 @@ func (f *formatMPEGTS) initialize() { @@ -59,7 +59,7 @@ func (f *formatMPEGTS) initialize() {
return track
}
for _, media := range f.a.wrapper.Stream.Desc().Medias {
for _, media := range f.a.agent.Stream.Desc().Medias {
for _, forma := range media.Formats {
switch forma := forma.(type) {
case *rtspformat.H265:
@ -67,7 +67,7 @@ func (f *formatMPEGTS) initialize() { @@ -67,7 +67,7 @@ func (f *formatMPEGTS) initialize() {
var dtsExtractor *h265.DTSExtractor
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
@ -95,7 +95,7 @@ func (f *formatMPEGTS) initialize() { @@ -95,7 +95,7 @@ func (f *formatMPEGTS) initialize() {
var dtsExtractor *h264.DTSExtractor
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
@ -124,7 +124,7 @@ func (f *formatMPEGTS) initialize() { @@ -124,7 +124,7 @@ func (f *formatMPEGTS) initialize() {
firstReceived := false
var lastPTS time.Duration
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
@ -154,7 +154,7 @@ func (f *formatMPEGTS) initialize() { @@ -154,7 +154,7 @@ func (f *formatMPEGTS) initialize() {
firstReceived := false
var lastPTS time.Duration
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
@ -188,7 +188,7 @@ func (f *formatMPEGTS) initialize() { @@ -188,7 +188,7 @@ func (f *formatMPEGTS) initialize() {
}(),
})
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
@ -207,7 +207,7 @@ func (f *formatMPEGTS) initialize() { @@ -207,7 +207,7 @@ func (f *formatMPEGTS) initialize() {
Config: *forma.GetConfig(),
})
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
@ -224,7 +224,7 @@ func (f *formatMPEGTS) initialize() { @@ -224,7 +224,7 @@ func (f *formatMPEGTS) initialize() {
case *rtspformat.MPEG1Audio:
track := addTrack(&mpegts.CodecMPEG1Audio{})
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
@ -243,7 +243,7 @@ func (f *formatMPEGTS) initialize() { @@ -243,7 +243,7 @@ func (f *formatMPEGTS) initialize() {
sampleRate := time.Duration(forma.SampleRate)
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
@ -269,7 +269,7 @@ func (f *formatMPEGTS) initialize() { @@ -269,7 +269,7 @@ func (f *formatMPEGTS) initialize() {
f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize)
f.mw = mpegts.NewWriter(f.bw, tracks)
f.a.wrapper.Log(logger.Info, "recording %d %s",
f.a.agent.Log(logger.Info, "recording %d %s",
len(tracks),
func() string {
if len(tracks) == 1 {
@ -292,7 +292,7 @@ func (f *formatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAcces @@ -292,7 +292,7 @@ func (f *formatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAcces
case (!f.hasVideo || isVideo) &&
randomAccess &&
(dts-f.currentSegment.startDTS) >= f.a.wrapper.SegmentDuration:
(dts-f.currentSegment.startDTS) >= f.a.agent.SegmentDuration:
err := f.currentSegment.close()
if err != nil {
return err
@ -300,7 +300,7 @@ func (f *formatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAcces @@ -300,7 +300,7 @@ func (f *formatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAcces
f.currentSegment = newFormatMPEGTSSegment(f, dts)
case (dts - f.currentSegment.lastFlush) >= f.a.wrapper.PartDuration:
case (dts - f.currentSegment.lastFlush) >= f.a.agent.PartDuration:
err := f.bw.Flush()
if err != nil {
return err

16
internal/record/format_mpegts_segment.go

@ -14,7 +14,7 @@ type formatMPEGTSSegment struct { @@ -14,7 +14,7 @@ type formatMPEGTSSegment struct {
lastFlush time.Duration
created time.Time
fpath string
path string
fi *os.File
}
@ -35,14 +35,14 @@ func (s *formatMPEGTSSegment) close() error { @@ -35,14 +35,14 @@ func (s *formatMPEGTSSegment) close() error {
err := s.f.bw.Flush()
if s.fi != nil {
s.f.a.wrapper.Log(logger.Debug, "closing segment %s", s.fpath)
s.f.a.agent.Log(logger.Debug, "closing segment %s", s.path)
err2 := s.fi.Close()
if err == nil {
err = err2
}
if err2 == nil {
s.f.a.wrapper.OnSegmentComplete(s.fpath)
s.f.a.agent.OnSegmentComplete(s.path)
}
}
@ -51,20 +51,20 @@ func (s *formatMPEGTSSegment) close() error { @@ -51,20 +51,20 @@ func (s *formatMPEGTSSegment) close() error {
func (s *formatMPEGTSSegment) Write(p []byte) (int, error) {
if s.fi == nil {
s.fpath = encodeRecordPath(&recordPathParams{time: s.created}, s.f.a.resolvedPath)
s.f.a.wrapper.Log(logger.Debug, "creating segment %s", s.fpath)
s.path = segmentPath{time: s.created}.encode(s.f.a.segmentPathFormat)
s.f.a.agent.Log(logger.Debug, "creating segment %s", s.path)
err := os.MkdirAll(filepath.Dir(s.fpath), 0o755)
err := os.MkdirAll(filepath.Dir(s.path), 0o755)
if err != nil {
return 0, err
}
fi, err := os.Create(s.fpath)
fi, err := os.Create(s.path)
if err != nil {
return 0, err
}
s.f.a.wrapper.OnSegmentCreate(s.fpath)
s.f.a.agent.OnSegmentCreate(s.path)
s.fi = fi
}

51
internal/record/record_path_test.go

@ -1,51 +0,0 @@ @@ -1,51 +0,0 @@
package record
import (
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
var recordPathCases = []struct {
name string
format string
dec *recordPathParams
enc string
}{
{
"standard",
"%path/%Y-%m-%d_%H-%M-%S-%f.mp4",
&recordPathParams{
path: "mypath",
time: time.Date(2008, 11, 0o7, 11, 22, 4, 123456000, time.Local),
},
"mypath/2008-11-07_11-22-04-123456.mp4",
},
{
"unix seconds",
"%path/%s.mp4",
&recordPathParams{
path: "mypath",
time: time.Date(2021, 12, 2, 12, 15, 23, 0, time.UTC).Local(),
},
"mypath/1638447323.mp4",
},
}
func TestRecordPathDecode(t *testing.T) {
for _, ca := range recordPathCases {
t.Run(ca.name, func(t *testing.T) {
require.Equal(t, ca.dec, decodeRecordPath(ca.format, ca.enc))
})
}
}
func TestRecordPathEncode(t *testing.T) {
for _, ca := range recordPathCases {
t.Run(ca.name, func(t *testing.T) {
require.Equal(t, ca.enc, strings.ReplaceAll(encodeRecordPath(ca.dec, ca.format), "%path", "mypath"))
})
}
}

37
internal/record/record_path.go → internal/record/segment_path.go

@ -21,12 +21,11 @@ func leadingZeros(v int, size int) string { @@ -21,12 +21,11 @@ func leadingZeros(v int, size int) string {
return out2 + out
}
type recordPathParams struct {
path string
type segmentPath struct {
time time.Time
}
func decodeRecordPath(format string, v string) *recordPathParams {
func (p *segmentPath) decode(format string, v string) bool {
re := format
for _, ch := range []uint8{
@ -90,7 +89,7 @@ func decodeRecordPath(format string, v string) *recordPathParams { @@ -90,7 +89,7 @@ func decodeRecordPath(format string, v string) *recordPathParams {
matches := r.FindStringSubmatch(v)
if matches == nil {
return nil
return false
}
values := make(map[string]string)
@ -143,27 +142,23 @@ func decodeRecordPath(format string, v string) *recordPathParams { @@ -143,27 +142,23 @@ func decodeRecordPath(format string, v string) *recordPathParams {
}
}
var t time.Time
if unixSec > 0 {
t = time.Unix(unixSec, 0)
p.time = time.Unix(unixSec, 0)
} else {
t = time.Date(year, month, day, hour, minute, second, micros*1000, time.Local)
p.time = time.Date(year, month, day, hour, minute, second, micros*1000, time.Local)
}
return &recordPathParams{
path: values["%path"],
time: t,
}
return true
}
func encodeRecordPath(params *recordPathParams, v string) string {
v = strings.ReplaceAll(v, "%Y", strconv.FormatInt(int64(params.time.Year()), 10))
v = strings.ReplaceAll(v, "%m", leadingZeros(int(params.time.Month()), 2))
v = strings.ReplaceAll(v, "%d", leadingZeros(params.time.Day(), 2))
v = strings.ReplaceAll(v, "%H", leadingZeros(params.time.Hour(), 2))
v = strings.ReplaceAll(v, "%M", leadingZeros(params.time.Minute(), 2))
v = strings.ReplaceAll(v, "%S", leadingZeros(params.time.Second(), 2))
v = strings.ReplaceAll(v, "%f", leadingZeros(params.time.Nanosecond()/1000, 6))
v = strings.ReplaceAll(v, "%s", strconv.FormatInt(params.time.Unix(), 10))
return v
func (p segmentPath) encode(format string) string {
format = strings.ReplaceAll(format, "%Y", strconv.FormatInt(int64(p.time.Year()), 10))
format = strings.ReplaceAll(format, "%m", leadingZeros(int(p.time.Month()), 2))
format = strings.ReplaceAll(format, "%d", leadingZeros(p.time.Day(), 2))
format = strings.ReplaceAll(format, "%H", leadingZeros(p.time.Hour(), 2))
format = strings.ReplaceAll(format, "%M", leadingZeros(p.time.Minute(), 2))
format = strings.ReplaceAll(format, "%S", leadingZeros(p.time.Second(), 2))
format = strings.ReplaceAll(format, "%f", leadingZeros(p.time.Nanosecond()/1000, 6))
format = strings.ReplaceAll(format, "%s", strconv.FormatInt(p.time.Unix(), 10))
return format
}

51
internal/record/segment_path_test.go

@ -0,0 +1,51 @@ @@ -0,0 +1,51 @@
package record
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
var segmentPathCases = []struct {
name string
format string
dec segmentPath
enc string
}{
{
"standard",
"%path/%Y-%m-%d_%H-%M-%S-%f.mp4",
segmentPath{
time: time.Date(2008, 11, 0o7, 11, 22, 4, 123456000, time.Local),
},
"%path/2008-11-07_11-22-04-123456.mp4",
},
{
"unix seconds",
"%path/%s.mp4",
segmentPath{
time: time.Date(2021, 12, 2, 12, 15, 23, 0, time.UTC).Local(),
},
"%path/1638447323.mp4",
},
}
func TestSegmentPathDecode(t *testing.T) {
for _, ca := range segmentPathCases {
t.Run(ca.name, func(t *testing.T) {
var dec segmentPath
ok := dec.decode(ca.format, ca.enc)
require.Equal(t, true, ok)
require.Equal(t, ca.dec, dec)
})
}
}
func TestSegmentPathEncode(t *testing.T) {
for _, ca := range segmentPathCases {
t.Run(ca.name, func(t *testing.T) {
require.Equal(t, ca.enc, ca.dec.encode(ca.format))
})
}
}
Loading…
Cancel
Save