Browse Source

record: use reception time instead of current time in segment names (#2925)

pull/2928/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
9b270adc03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      internal/record/agent_instance.go
  2. 34
      internal/record/agent_test.go
  3. 2
      internal/record/cleaner.go
  4. 19
      internal/record/cleaner_test.go
  5. 43
      internal/record/format_fmp4.go
  6. 17
      internal/record/format_fmp4_part.go
  7. 26
      internal/record/format_fmp4_segment.go
  8. 14
      internal/record/format_fmp4_track.go
  9. 49
      internal/record/format_mpegts.go
  10. 28
      internal/record/format_mpegts_segment.go

1
internal/record/agent_instance.go

@ -17,6 +17,7 @@ type OnSegmentFunc = func(string)
type sample struct { type sample struct {
*fmp4.PartSample *fmp4.PartSample
dts time.Duration dts time.Duration
ntp time.Time
} }
type agentInstance struct { type agentInstance struct {

34
internal/record/agent_test.go

@ -73,11 +73,12 @@ func TestAgent(t *testing.T) {
}, },
}} }}
writeToStream := func(stream *stream.Stream) { writeToStream := func(stream *stream.Stream, ntp time.Time) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{ stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{
Base: unit.Base{ Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second, PTS: (50 + time.Duration(i)) * time.Second,
NTP: ntp.Add(time.Duration(i) * 60 * time.Second),
}, },
AU: [][]byte{ AU: [][]byte{
{ // VPS { // VPS
@ -144,24 +145,6 @@ func TestAgent(t *testing.T) {
for _, ca := range []string{"fmp4", "mpegts"} { for _, ca := range []string{"fmp4", "mpegts"} {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
n := 0
timeNow = func() time.Time {
n++
switch n {
case 1:
return time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC)
case 2:
return time.Date(2009, 0o5, 20, 22, 15, 25, 0, time.UTC)
case 3:
return time.Date(2010, 0o5, 20, 22, 15, 25, 0, time.UTC)
default:
return time.Date(2011, 0o5, 20, 22, 15, 25, 0, time.UTC)
}
}
stream, err := stream.New( stream, err := stream.New(
1460, 1460,
desc, desc,
@ -206,7 +189,7 @@ func TestAgent(t *testing.T) {
} }
w.Initialize() w.Initialize()
writeToStream(stream) writeToStream(stream, time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC))
// simulate a write error // simulate a write error
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{ stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{
@ -233,12 +216,12 @@ func TestAgent(t *testing.T) {
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-16-25-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
writeToStream(stream) writeToStream(stream, time.Date(2010, 0o5, 20, 22, 15, 25, 0, time.UTC))
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -247,7 +230,7 @@ func TestAgent(t *testing.T) {
_, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2011-05-20_22-15-25-000000."+ext)) _, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-16-25-000000."+ext))
require.NoError(t, err) require.NoError(t, err)
}) })
} }
@ -278,10 +261,6 @@ func TestAgentFMP4NegativeDTS(t *testing.T) {
}, },
}} }}
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC)
}
stream, err := stream.New( stream, err := stream.New(
1460, 1460,
desc, desc,
@ -313,6 +292,7 @@ func TestAgentFMP4NegativeDTS(t *testing.T) {
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{
Base: unit.Base{ Base: unit.Base{
PTS: -50*time.Millisecond + (time.Duration(i) * 200 * time.Millisecond), PTS: -50*time.Millisecond + (time.Duration(i) * 200 * time.Millisecond),
NTP: time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC),
}, },
AU: [][]byte{ AU: [][]byte{
{ // SPS { // SPS

2
internal/record/cleaner.go

@ -12,6 +12,8 @@ import (
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
var timeNow = time.Now
func commonPath(v string) string { func commonPath(v string) string {
common := "" common := ""
remaining := v remaining := v

19
internal/record/cleaner_test.go

@ -6,34 +6,33 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/stretchr/testify/require"
) )
func TestCleaner(t *testing.T) { func TestCleaner(t *testing.T) {
timeNow = func() time.Time { timeNow = func() time.Time {
return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC) return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.Local)
} }
dir, err := os.MkdirTemp("", "mediamtx-cleaner") dir, err := os.MkdirTemp("", "mediamtx-cleaner")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
recordPath := filepath.Join(dir, "_-+*?^$()[]{}|_%path/%Y-%m-%d_%H-%M-%S-%f") const specialChars = "_-+*?^$()[]{}|"
err = os.Mkdir(filepath.Join(dir, "_-+*?^$()[]{}|_mypath"), 0o755) err = os.Mkdir(filepath.Join(dir, specialChars+"_mypath"), 0o755)
require.NoError(t, err) require.NoError(t, err)
err = os.WriteFile(filepath.Join(dir, "_-+*?^$()[]{}|_mypath", "2008-05-20_22-15-25-000125.mp4"), []byte{1}, 0o644) err = os.WriteFile(filepath.Join(dir, specialChars+"_mypath", "2008-05-20_22-15-25-000125.mp4"), []byte{1}, 0o644)
require.NoError(t, err) require.NoError(t, err)
err = os.WriteFile(filepath.Join(dir, "_-+*?^$()[]{}|_mypath", "2009-05-20_22-15-25-000427.mp4"), []byte{1}, 0o644) err = os.WriteFile(filepath.Join(dir, specialChars+"_mypath", "2009-05-20_22-15-25-000427.mp4"), []byte{1}, 0o644)
require.NoError(t, err) require.NoError(t, err)
c := &Cleaner{ c := &Cleaner{
Entries: []CleanerEntry{{ Entries: []CleanerEntry{{
PathFormat: recordPath, PathFormat: filepath.Join(dir, specialChars+"_%path/%Y-%m-%d_%H-%M-%S-%f"),
Format: conf.RecordFormatFMP4, Format: conf.RecordFormatFMP4,
DeleteAfter: 10 * time.Second, DeleteAfter: 10 * time.Second,
}}, }},
@ -44,9 +43,9 @@ func TestCleaner(t *testing.T) {
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
_, err = os.Stat(filepath.Join(dir, "_-+*?^$()[]{}|_mypath", "2008-05-20_22-15-25-000125.mp4")) _, err = os.Stat(filepath.Join(dir, specialChars+"_mypath", "2008-05-20_22-15-25-000125.mp4"))
require.Error(t, err) require.Error(t, err)
_, err = os.Stat(filepath.Join(dir, "_-+*?^$()[]{}|_mypath", "2009-05-20_22-15-25-000427.mp4")) _, err = os.Stat(filepath.Join(dir, specialChars+"_mypath", "2009-05-20_22-15-25-000427.mp4"))
require.NoError(t, err) require.NoError(t, err)
} }

43
internal/record/format_fmp4.go

@ -193,6 +193,7 @@ func (f *formatFMP4) initialize() {
return track.record(&sample{ return track.record(&sample{
PartSample: sampl, PartSample: sampl,
dts: tunit.PTS, dts: tunit.PTS,
ntp: tunit.NTP,
}) })
}) })
@ -265,6 +266,7 @@ func (f *formatFMP4) initialize() {
Payload: tunit.Frame, Payload: tunit.Frame,
}, },
dts: tunit.PTS, dts: tunit.PTS,
ntp: tunit.NTP,
}) })
}) })
@ -364,6 +366,7 @@ func (f *formatFMP4) initialize() {
return track.record(&sample{ return track.record(&sample{
PartSample: sampl, PartSample: sampl,
dts: dts, dts: dts,
ntp: tunit.NTP,
}) })
}) })
@ -442,6 +445,7 @@ func (f *formatFMP4) initialize() {
return track.record(&sample{ return track.record(&sample{
PartSample: sampl, PartSample: sampl,
dts: dts, dts: dts,
ntp: tunit.NTP,
}) })
}) })
@ -503,6 +507,7 @@ func (f *formatFMP4) initialize() {
IsNonSyncSample: !randomAccess, IsNonSyncSample: !randomAccess,
}, },
dts: tunit.PTS, dts: tunit.PTS,
ntp: tunit.NTP,
}) })
}) })
@ -555,6 +560,7 @@ func (f *formatFMP4) initialize() {
IsNonSyncSample: !randomAccess, IsNonSyncSample: !randomAccess,
}, },
dts: tunit.PTS, dts: tunit.PTS,
ntp: tunit.NTP,
}) })
}) })
@ -589,6 +595,7 @@ func (f *formatFMP4) initialize() {
Payload: tunit.Frame, Payload: tunit.Frame,
}, },
dts: tunit.PTS, dts: tunit.PTS,
ntp: tunit.NTP,
}) })
}) })
@ -609,20 +616,21 @@ func (f *formatFMP4) initialize() {
return nil return nil
} }
pts := tunit.PTS var dt time.Duration
for _, packet := range tunit.Packets { for _, packet := range tunit.Packets {
err := track.record(&sample{ err := track.record(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: packet, Payload: packet,
}, },
dts: pts, dts: tunit.PTS + dt,
ntp: tunit.NTP.Add(dt),
}) })
if err != nil { if err != nil {
return err return err
} }
pts += opus.PacketDuration(packet) dt += opus.PacketDuration(packet)
} }
return nil return nil
@ -643,14 +651,15 @@ func (f *formatFMP4) initialize() {
} }
for i, au := range tunit.AUs { for i, au := range tunit.AUs {
auPTS := tunit.PTS + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* dt := time.Duration(i) * mpeg4audio.SamplesPerAccessUnit *
time.Second/sampleRate time.Second / sampleRate
err := track.record(&sample{ err := track.record(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: au, Payload: au,
}, },
dts: auPTS, dts: tunit.PTS + dt,
ntp: tunit.NTP.Add(dt),
}) })
if err != nil { if err != nil {
return err return err
@ -675,7 +684,7 @@ func (f *formatFMP4) initialize() {
return nil return nil
} }
pts := tunit.PTS var dt time.Duration
for _, frame := range tunit.Frames { for _, frame := range tunit.Frames {
var h mpeg1audio.FrameHeader var h mpeg1audio.FrameHeader
@ -695,13 +704,14 @@ func (f *formatFMP4) initialize() {
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: frame, Payload: frame,
}, },
dts: pts, dts: tunit.PTS + tunit.PTS,
ntp: tunit.NTP,
}) })
if err != nil { if err != nil {
return err return err
} }
pts += time.Duration(h.SampleCount()) * dt += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate) time.Second / time.Duration(h.SampleRate)
} }
@ -729,9 +739,7 @@ func (f *formatFMP4) initialize() {
return nil return nil
} }
pts := tunit.PTS for i, frame := range tunit.Frames {
for _, frame := range tunit.Frames {
var syncInfo ac3.SyncInfo var syncInfo ac3.SyncInfo
err := syncInfo.Unmarshal(frame) err := syncInfo.Unmarshal(frame)
if err != nil { if err != nil {
@ -757,18 +765,19 @@ func (f *formatFMP4) initialize() {
updateCodecs() updateCodecs()
} }
dt := time.Duration(i) * time.Duration(ac3.SamplesPerFrame) *
time.Second / time.Duration(codec.SampleRate)
err = track.record(&sample{ err = track.record(&sample{
PartSample: &fmp4.PartSample{ PartSample: &fmp4.PartSample{
Payload: frame, Payload: frame,
}, },
dts: pts, dts: tunit.PTS + dt,
ntp: tunit.NTP.Add(dt),
}) })
if err != nil { if err != nil {
return err return err
} }
pts += time.Duration(ac3.SamplesPerFrame) *
time.Second / time.Duration(codec.SampleRate)
} }
return nil return nil
@ -804,6 +813,7 @@ func (f *formatFMP4) initialize() {
Payload: out, Payload: out,
}, },
dts: tunit.PTS, dts: tunit.PTS,
ntp: tunit.NTP,
}) })
}) })
@ -827,6 +837,7 @@ func (f *formatFMP4) initialize() {
Payload: tunit.Samples, Payload: tunit.Samples,
}, },
dts: tunit.PTS, dts: tunit.PTS,
ntp: tunit.NTP,
}) })
}) })
} }

17
internal/record/format_fmp4_part.go

@ -44,28 +44,17 @@ type formatFMP4Part struct {
sequenceNumber uint32 sequenceNumber uint32
startDTS time.Duration startDTS time.Duration
created time.Time
partTracks map[*formatFMP4Track]*fmp4.PartTrack partTracks map[*formatFMP4Track]*fmp4.PartTrack
endDTS time.Duration endDTS time.Duration
} }
func newFormatFMP4Part( func (p *formatFMP4Part) initialize() {
s *formatFMP4Segment, p.partTracks = make(map[*formatFMP4Track]*fmp4.PartTrack)
sequenceNumber uint32,
startDTS time.Duration,
) *formatFMP4Part {
return &formatFMP4Part{
s: s,
startDTS: startDTS,
sequenceNumber: sequenceNumber,
created: timeNow(),
partTracks: make(map[*formatFMP4Track]*fmp4.PartTrack),
}
} }
func (p *formatFMP4Part) close() error { func (p *formatFMP4Part) close() error {
if p.s.fi == nil { if p.s.fi == nil {
p.s.path = path(p.created).encode(p.s.f.a.pathFormat) p.s.path = path(p.s.startNTP).encode(p.s.f.a.pathFormat)
p.s.f.a.agent.Log(logger.Debug, "creating segment %s", p.s.path) p.s.f.a.agent.Log(logger.Debug, "creating segment %s", p.s.path)
err := os.MkdirAll(filepath.Dir(p.s.path), 0o755) err := os.MkdirAll(filepath.Dir(p.s.path), 0o755)

26
internal/record/format_fmp4_segment.go

@ -11,8 +11,6 @@ import (
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
var timeNow = time.Now
func writeInit(f io.Writer, tracks []*formatFMP4Track) error { func writeInit(f io.Writer, tracks []*formatFMP4Track) error {
fmp4Tracks := make([]*fmp4.InitTrack, len(tracks)) fmp4Tracks := make([]*fmp4.InitTrack, len(tracks))
for i, track := range tracks { for i, track := range tracks {
@ -36,20 +34,14 @@ func writeInit(f io.Writer, tracks []*formatFMP4Track) error {
type formatFMP4Segment struct { type formatFMP4Segment struct {
f *formatFMP4 f *formatFMP4
startDTS time.Duration startDTS time.Duration
startNTP time.Time
path string path string
fi *os.File fi *os.File
curPart *formatFMP4Part curPart *formatFMP4Part
} }
func newFormatFMP4Segment( func (s *formatFMP4Segment) initialize() {
f *formatFMP4,
startDTS time.Duration,
) *formatFMP4Segment {
return &formatFMP4Segment{
f: f,
startDTS: startDTS,
}
} }
func (s *formatFMP4Segment) close() error { func (s *formatFMP4Segment) close() error {
@ -76,7 +68,12 @@ func (s *formatFMP4Segment) close() error {
func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error { func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error {
if s.curPart == nil { if s.curPart == nil {
s.curPart = newFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts) s.curPart = &formatFMP4Part{
s: s,
sequenceNumber: s.f.nextSequenceNumber,
startDTS: sample.dts,
}
s.curPart.initialize()
s.f.nextSequenceNumber++ s.f.nextSequenceNumber++
} else if s.curPart.duration() >= s.f.a.agent.PartDuration { } else if s.curPart.duration() >= s.f.a.agent.PartDuration {
err := s.curPart.close() err := s.curPart.close()
@ -86,7 +83,12 @@ func (s *formatFMP4Segment) record(track *formatFMP4Track, sample *sample) error
return err return err
} }
s.curPart = newFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts) s.curPart = &formatFMP4Part{
s: s,
sequenceNumber: s.f.nextSequenceNumber,
startDTS: sample.dts,
}
s.curPart.initialize()
s.f.nextSequenceNumber++ s.f.nextSequenceNumber++
} }

14
internal/record/format_fmp4_track.go

@ -24,7 +24,12 @@ func (t *formatFMP4Track) record(sample *sample) error {
sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale)) sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale))
if t.f.currentSegment == nil { if t.f.currentSegment == nil {
t.f.currentSegment = newFormatFMP4Segment(t.f, sample.dts) t.f.currentSegment = &formatFMP4Segment{
f: t.f,
startDTS: sample.dts,
startNTP: sample.ntp,
}
t.f.currentSegment.initialize()
// BaseTime is negative, this is not supported by fMP4. Reject the sample silently. // BaseTime is negative, this is not supported by fMP4. Reject the sample silently.
} else if (sample.dts - t.f.currentSegment.startDTS) < 0 { } else if (sample.dts - t.f.currentSegment.startDTS) < 0 {
return nil return nil
@ -43,7 +48,12 @@ func (t *formatFMP4Track) record(sample *sample) error {
return err return err
} }
t.f.currentSegment = newFormatFMP4Segment(t.f, t.nextSample.dts) t.f.currentSegment = &formatFMP4Segment{
f: t.f,
startDTS: t.nextSample.dts,
startNTP: t.nextSample.ntp,
}
t.f.currentSegment.initialize()
} }
return nil return nil

49
internal/record/format_mpegts.go

@ -91,7 +91,7 @@ func (f *formatMPEGTS) initialize() {
return err return err
} }
return f.recordH26x(track, dts, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) return f.recordH26x(track, tunit.PTS, dts, tunit.NTP, randomAccess, tunit.AU)
}) })
case *rtspformat.H264: case *rtspformat.H264:
@ -119,7 +119,7 @@ func (f *formatMPEGTS) initialize() {
return err return err
} }
return f.recordH26x(track, dts, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU) return f.recordH26x(track, tunit.PTS, dts, tunit.NTP, idrPresent, tunit.AU)
}) })
case *rtspformat.MPEG4Video: case *rtspformat.MPEG4Video:
@ -144,7 +144,7 @@ func (f *formatMPEGTS) initialize() {
f.hasVideo = true f.hasVideo = true
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
err := f.setupSegment(tunit.PTS, true, randomAccess) err := f.setupSegment(tunit.PTS, tunit.NTP, true, randomAccess)
if err != nil { if err != nil {
return err return err
} }
@ -174,7 +174,7 @@ func (f *formatMPEGTS) initialize() {
f.hasVideo = true f.hasVideo = true
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8})
err := f.setupSegment(tunit.PTS, true, randomAccess) err := f.setupSegment(tunit.PTS, tunit.NTP, true, randomAccess)
if err != nil { if err != nil {
return err return err
} }
@ -198,7 +198,7 @@ func (f *formatMPEGTS) initialize() {
return nil return nil
} }
err := f.setupSegment(tunit.PTS, false, true) err := f.setupSegment(tunit.PTS, tunit.NTP, false, true)
if err != nil { if err != nil {
return err return err
} }
@ -217,7 +217,7 @@ func (f *formatMPEGTS) initialize() {
return nil return nil
} }
err := f.setupSegment(tunit.PTS, false, true) err := f.setupSegment(tunit.PTS, tunit.NTP, false, true)
if err != nil { if err != nil {
return err return err
} }
@ -234,7 +234,7 @@ func (f *formatMPEGTS) initialize() {
return nil return nil
} }
err := f.setupSegment(tunit.PTS, false, true) err := f.setupSegment(tunit.PTS, tunit.NTP, false, true)
if err != nil { if err != nil {
return err return err
} }
@ -283,11 +283,20 @@ func (f *formatMPEGTS) close() {
} }
} }
func (f *formatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAccess bool) error { func (f *formatMPEGTS) setupSegment(
dts time.Duration,
ntp time.Time,
isVideo bool,
randomAccess bool,
) error {
switch { switch {
case f.currentSegment == nil: case f.currentSegment == nil:
f.currentSegment = newFormatMPEGTSSegment(f, dts) f.currentSegment = &formatMPEGTSSegment{
f: f,
startDTS: dts,
startNTP: ntp,
}
f.currentSegment.initialize()
case (!f.hasVideo || isVideo) && case (!f.hasVideo || isVideo) &&
randomAccess && randomAccess &&
(dts-f.currentSegment.startDTS) >= f.a.agent.SegmentDuration: (dts-f.currentSegment.startDTS) >= f.a.agent.SegmentDuration:
@ -296,7 +305,12 @@ func (f *formatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAcces
return err return err
} }
f.currentSegment = newFormatMPEGTSSegment(f, dts) f.currentSegment = &formatMPEGTSSegment{
f: f,
startDTS: dts,
startNTP: ntp,
}
f.currentSegment.initialize()
case (dts - f.currentSegment.lastFlush) >= f.a.agent.PartDuration: case (dts - f.currentSegment.lastFlush) >= f.a.agent.PartDuration:
err := f.bw.Flush() err := f.bw.Flush()
@ -310,15 +324,20 @@ func (f *formatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAcces
return nil return nil
} }
func (f *formatMPEGTS) recordH26x(track *mpegts.Track, goDTS time.Duration, func (f *formatMPEGTS) recordH26x(
pts int64, dts int64, randomAccess bool, au [][]byte, track *mpegts.Track,
pts time.Duration,
dts time.Duration,
ntp time.Time,
randomAccess bool,
au [][]byte,
) error { ) error {
f.hasVideo = true f.hasVideo = true
err := f.setupSegment(goDTS, true, randomAccess) err := f.setupSegment(dts, ntp, true, randomAccess)
if err != nil { if err != nil {
return err return err
} }
return f.mw.WriteH26x(track, pts, dts, randomAccess, au) return f.mw.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), randomAccess, au)
} }

28
internal/record/format_mpegts_segment.go

@ -9,26 +9,18 @@ import (
) )
type formatMPEGTSSegment struct { type formatMPEGTSSegment struct {
f *formatMPEGTS f *formatMPEGTS
startDTS time.Duration startDTS time.Duration
lastFlush time.Duration startNTP time.Time
created time.Time lastFlush time.Duration
path string path string
fi *os.File fi *os.File
} }
func newFormatMPEGTSSegment(f *formatMPEGTS, startDTS time.Duration) *formatMPEGTSSegment { func (s *formatMPEGTSSegment) initialize() {
s := &formatMPEGTSSegment{ s.lastFlush = s.startDTS
f: f, s.f.dw.setTarget(s)
startDTS: startDTS,
lastFlush: startDTS,
created: timeNow(),
}
f.dw.setTarget(s)
return s
} }
func (s *formatMPEGTSSegment) close() error { func (s *formatMPEGTSSegment) close() error {
@ -51,7 +43,7 @@ func (s *formatMPEGTSSegment) close() error {
func (s *formatMPEGTSSegment) Write(p []byte) (int, error) { func (s *formatMPEGTSSegment) Write(p []byte) (int, error) {
if s.fi == nil { if s.fi == nil {
s.path = path(s.created).encode(s.f.a.pathFormat) s.path = path(s.startNTP).encode(s.f.a.pathFormat)
s.f.a.agent.Log(logger.Debug, "creating segment %s", s.path) s.f.a.agent.Log(logger.Debug, "creating segment %s", s.path)
err := os.MkdirAll(filepath.Dir(s.path), 0o755) err := os.MkdirAll(filepath.Dir(s.path), 0o755)

Loading…
Cancel
Save