Browse Source

restart recordings in case of errors (#2439) (#2571)

pull/2572/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
3ebc585539
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      internal/core/path.go
  2. 1
      internal/httpserv/wrapped_server_test.go
  3. 156
      internal/record/agent.go
  4. 87
      internal/record/agent_instance.go
  5. 246
      internal/record/agent_test.go
  6. 1
      internal/record/rec_format.go
  7. 39
      internal/record/rec_format_fmp4.go
  8. 6
      internal/record/rec_format_fmp4_part.go
  9. 6
      internal/record/rec_format_fmp4_segment.go
  10. 2
      internal/record/rec_format_fmp4_track.go
  11. 34
      internal/record/rec_format_mpegts.go
  12. 10
      internal/record/rec_format_mpegts_segment.go

25
internal/core/path.go

@ -917,15 +917,15 @@ func (pa *path) setNotReady() { @@ -917,15 +917,15 @@ func (pa *path) setNotReady() {
}
func (pa *path) startRecording() {
pa.recordAgent = record.NewAgent(
pa.writeQueueSize,
pa.conf.RecordPath,
pa.conf.RecordFormat,
time.Duration(pa.conf.RecordPartDuration),
time.Duration(pa.conf.RecordSegmentDuration),
pa.name,
pa.stream,
func(segmentPath string) {
pa.recordAgent = &record.Agent{
WriteQueueSize: pa.writeQueueSize,
RecordPath: pa.conf.RecordPath,
Format: pa.conf.RecordFormat,
PartDuration: time.Duration(pa.conf.RecordPartDuration),
SegmentDuration: time.Duration(pa.conf.RecordSegmentDuration),
PathName: pa.name,
Stream: pa.stream,
OnSegmentCreate: func(segmentPath string) {
if pa.conf.RunOnRecordSegmentCreate != "" {
env := pa.externalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath
@ -939,7 +939,7 @@ func (pa *path) startRecording() { @@ -939,7 +939,7 @@ func (pa *path) startRecording() {
nil)
}
},
func(segmentPath string) {
OnSegmentComplete: func(segmentPath string) {
if pa.conf.RunOnRecordSegmentComplete != "" {
env := pa.externalCmdEnv()
env["MTX_SEGMENT_PATH"] = segmentPath
@ -953,8 +953,9 @@ func (pa *path) startRecording() { @@ -953,8 +953,9 @@ func (pa *path) startRecording() {
nil)
}
},
pa,
)
Parent: pa,
}
pa.recordAgent.Initialize()
}
func (pa *path) executeRemoveReader(r reader) {

1
internal/httpserv/wrapped_server_test.go

@ -14,7 +14,6 @@ import ( @@ -14,7 +14,6 @@ import (
type testLogger struct{}
func (testLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
// fmt.Printf(format, args...)
}
func TestFilterEmptyPath(t *testing.T) {

156
internal/record/agent.go

@ -1,124 +1,84 @@ @@ -1,124 +1,84 @@
package record
import (
"context"
"strings"
"time"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)
// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)
type sample struct {
*fmp4.PartSample
dts time.Duration
}
// Agent saves streams on disk.
// Agent is a record agent.
type Agent struct {
path string
partDuration time.Duration
segmentDuration time.Duration
stream *stream.Stream
onSegmentCreate OnSegmentFunc
onSegmentComplete OnSegmentFunc
parent logger.Writer
ctx context.Context
ctxCancel func()
writer *asyncwriter.Writer
format recFormat
done chan struct{}
WriteQueueSize int
RecordPath string
Format conf.RecordFormat
PartDuration time.Duration
SegmentDuration time.Duration
PathName string
Stream *stream.Stream
OnSegmentCreate OnSegmentFunc
OnSegmentComplete OnSegmentFunc
Parent logger.Writer
restartPause time.Duration
currentInstance *agentInstance
terminate chan struct{}
done chan struct{}
}
// NewAgent allocates an Agent.
func NewAgent(
writeQueueSize int,
path string,
format conf.RecordFormat,
partDuration time.Duration,
segmentDuration time.Duration,
pathName string,
stream *stream.Stream,
onSegmentCreate OnSegmentFunc,
onSegmentComplete OnSegmentFunc,
parent logger.Writer,
) *Agent {
path = strings.ReplaceAll(path, "%path", pathName)
switch format {
case conf.RecordFormatMPEGTS:
path += ".ts"
default:
path += ".mp4"
// Initialize initializes Agent.
func (w *Agent) Initialize() {
if w.restartPause == 0 {
w.restartPause = 2 * time.Second
}
ctx, ctxCancel := context.WithCancel(context.Background())
a := &Agent{
path: path,
partDuration: partDuration,
segmentDuration: segmentDuration,
stream: stream,
onSegmentCreate: onSegmentCreate,
onSegmentComplete: onSegmentComplete,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
done: make(chan struct{}),
}
a.writer = asyncwriter.New(writeQueueSize, a)
w.terminate = make(chan struct{})
w.done = make(chan struct{})
switch format {
case conf.RecordFormatMPEGTS:
a.format = newRecFormatMPEGTS(a)
default:
a.format = newRecFormatFMP4(a)
w.currentInstance = &agentInstance{
wrapper: w,
}
w.currentInstance.initialize()
go a.run()
return a
}
// Close closes the Agent.
func (a *Agent) Close() {
a.Log(logger.Info, "recording stopped")
a.ctxCancel()
<-a.done
go w.run()
}
// Log is the main logging function.
func (a *Agent) Log(level logger.Level, format string, args ...interface{}) {
a.parent.Log(level, "[record] "+format, args...)
func (w *Agent) Log(level logger.Level, format string, args ...interface{}) {
w.Parent.Log(level, "[record] "+format, args...)
}
func (a *Agent) run() {
defer close(a.done)
a.writer.Start()
select {
case err := <-a.writer.Error():
a.Log(logger.Error, err.Error())
a.stream.RemoveReader(a.writer)
// Close closes the agent.
func (w *Agent) Close() {
w.Log(logger.Info, "recording stopped")
close(w.terminate)
<-w.done
}
case <-a.ctx.Done():
a.stream.RemoveReader(a.writer)
a.writer.Stop()
func (w *Agent) run() {
defer close(w.done)
for {
select {
case <-w.currentInstance.done:
w.currentInstance.close()
case <-w.terminate:
w.currentInstance.close()
return
}
select {
case <-time.After(w.restartPause):
case <-w.terminate:
return
}
w.currentInstance = &agentInstance{
wrapper: w,
}
w.currentInstance.initialize()
}
a.format.close()
}

87
internal/record/agent_instance.go

@ -0,0 +1,87 @@ @@ -0,0 +1,87 @@
package record
import (
"strings"
"time"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
)
// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)
type sample struct {
*fmp4.PartSample
dts time.Duration
}
type agentInstance struct {
wrapper *Agent
resolvedPath string
writer *asyncwriter.Writer
format recFormat
terminate chan struct{}
done chan struct{}
}
func (a *agentInstance) initialize() {
a.resolvedPath = strings.ReplaceAll(a.wrapper.RecordPath, "%path", a.wrapper.PathName)
switch a.wrapper.Format {
case conf.RecordFormatMPEGTS:
a.resolvedPath += ".ts"
default:
a.resolvedPath += ".mp4"
}
a.terminate = make(chan struct{})
a.done = make(chan struct{})
a.writer = asyncwriter.New(a.wrapper.WriteQueueSize, a.wrapper)
switch a.wrapper.Format {
case conf.RecordFormatMPEGTS:
a.format = &recFormatMPEGTS{
a: a,
}
a.format.initialize()
default:
a.format = &recFormatFMP4{
a: a,
}
a.format.initialize()
}
go a.run()
}
func (a *agentInstance) close() {
close(a.terminate)
<-a.done
}
func (a *agentInstance) run() {
defer close(a.done)
a.writer.Start()
select {
case err := <-a.writer.Error():
a.wrapper.Log(logger.Error, err.Error())
a.wrapper.Stream.RemoveReader(a.writer)
case <-a.terminate:
a.wrapper.Stream.RemoveReader(a.writer)
a.writer.Stop()
}
a.format.close()
}

246
internal/record/agent_test.go

@ -24,47 +24,111 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { @@ -24,47 +24,111 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
}
func TestAgent(t *testing.T) {
desc := &description.Session{Medias: []*description.Media{
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
}},
},
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
},
{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}},
},
}}
writeToStream := func(stream *stream.Stream) {
for i := 0; i < 3; i++ {
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // VPS
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20,
0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24,
},
{ // SPS
0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03,
0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d,
0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88,
0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9,
0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc,
0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a,
0x02, 0x02, 0x02, 0x01,
},
{ // PPS
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
},
{byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR
},
})
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // SPS
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
{ // PPS
0x08, 0x06, 0x07, 0x08,
},
{5}, // IDR
},
})
stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AUs: [][]byte{{1, 2, 3, 4}},
})
}
}
for _, ca := range []string{"fmp4", "mpegts"} {
t.Run(ca, func(t *testing.T) {
n := 0
timeNow = func() time.Time {
n++
if n >= 2 {
return time.Date(2008, 0o5, 20, 22, 15, 25, 125000, time.UTC)
switch n {
case 1:
return time.Date(2008, 0o5, 20, 22, 15, 25, 0, time.UTC)
case 2:
return time.Date(2009, 0o5, 20, 22, 15, 25, 0, time.UTC)
case 3:
return time.Date(2010, 0o5, 20, 22, 15, 25, 0, time.UTC)
default:
return time.Date(2011, 0o5, 20, 22, 15, 25, 0, time.UTC)
}
return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC)
}
desc := &description.Session{Medias: []*description.Media{
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
}},
},
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
},
{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}},
},
}}
stream, err := stream.New(
1460,
desc,
@ -80,8 +144,8 @@ func TestAgent(t *testing.T) { @@ -80,8 +144,8 @@ func TestAgent(t *testing.T) {
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
segCreated := make(chan struct{}, 2)
segDone := make(chan struct{}, 2)
segCreated := make(chan struct{}, 4)
segDone := make(chan struct{}, 4)
var f conf.RecordFormat
if ca == "fmp4" {
@ -90,83 +154,42 @@ func TestAgent(t *testing.T) { @@ -90,83 +154,42 @@ func TestAgent(t *testing.T) {
f = conf.RecordFormatMPEGTS
}
a := NewAgent(
1024,
recordPath,
f,
100*time.Millisecond,
1*time.Second,
"mypath",
stream,
func(fpath string) {
w := &Agent{
WriteQueueSize: 1024,
RecordPath: recordPath,
Format: f,
PartDuration: 100 * time.Millisecond,
SegmentDuration: 1 * time.Second,
PathName: "mypath",
Stream: stream,
OnSegmentCreate: func(fpath string) {
segCreated <- struct{}{}
},
func(fpath string) {
OnSegmentComplete: func(fpath string) {
segDone <- struct{}{}
},
&nilLogger{},
)
for i := 0; i < 3; i++ {
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // VPS
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20,
0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24,
},
{ // SPS
0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03,
0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d,
0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88,
0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9,
0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc,
0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a,
0x02, 0x02, 0x02, 0x01,
},
{ // PPS
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
},
{byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR
},
})
Parent: &nilLogger{},
restartPause: 1 * time.Millisecond,
}
w.Initialize()
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // SPS
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
{ // PPS
0x08, 0x06, 0x07, 0x08,
},
{5}, // IDR
},
})
writeToStream(stream)
stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AUs: [][]byte{{1, 2, 3, 4}},
})
}
// simulate a write error
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{
Base: unit.Base{
PTS: 0,
},
AU: [][]byte{
{5}, // IDR
},
})
for i := 0; i < 2; i++ {
<-segCreated
<-segDone
}
a.Close()
var ext string
if ca == "fmp4" {
ext = "mp4"
@ -174,10 +197,29 @@ func TestAgent(t *testing.T) { @@ -174,10 +197,29 @@ func TestAgent(t *testing.T) {
ext = "ts"
}
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125."+ext))
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000000."+ext))
require.NoError(t, err)
time.Sleep(50 * time.Millisecond)
writeToStream(stream)
time.Sleep(50 * time.Millisecond)
w.Close()
for i := 0; i < 2; i++ {
<-segCreated
<-segDone
}
_, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427."+ext))
_, err = os.Stat(filepath.Join(dir, "mypath", "2011-05-20_22-15-25-000000."+ext))
require.NoError(t, err)
})
}

1
internal/record/rec_format.go

@ -1,5 +1,6 @@ @@ -1,5 +1,6 @@
package record
type recFormat interface {
initialize()
close()
}

39
internal/record/rec_format_fmp4.go

@ -97,18 +97,15 @@ func jpegExtractSize(image []byte) (int, int, error) { @@ -97,18 +97,15 @@ func jpegExtractSize(image []byte) (int, int, error) {
}
type recFormatFMP4 struct {
a *Agent
a *agentInstance
tracks []*recFormatFMP4Track
hasVideo bool
currentSegment *recFormatFMP4Segment
nextSequenceNumber uint32
}
func newRecFormatFMP4(a *Agent) recFormat {
f := &recFormatFMP4{
a: a,
}
func (f *recFormatFMP4) initialize() {
nextID := 1
addTrack := func(codec fmp4.Codec) *recFormatFMP4Track {
@ -135,7 +132,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -135,7 +132,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
}
}
for _, media := range a.stream.Desc().Medias {
for _, media := range f.a.wrapper.Stream.Desc().Medias {
for _, forma := range media.Formats {
switch forma := forma.(type) {
case *format.AV1:
@ -148,7 +145,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -148,7 +145,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
firstReceived := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AV1)
if tunit.TU == nil {
return nil
@ -205,7 +202,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -205,7 +202,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
firstReceived := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.VP9)
if tunit.Frame == nil {
return nil
@ -302,7 +299,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -302,7 +299,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
var dtsExtractor *h265.DTSExtractor
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
@ -387,7 +384,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -387,7 +384,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
var dtsExtractor *h264.DTSExtractor
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
@ -463,7 +460,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -463,7 +460,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
firstReceived := false
var lastPTS time.Duration
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
@ -515,7 +512,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -515,7 +512,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
firstReceived := false
var lastPTS time.Duration
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
@ -563,7 +560,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -563,7 +560,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
parsed := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MJPEG)
if tunit.Frame == nil {
return nil
@ -599,7 +596,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -599,7 +596,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
}
track := addTrack(codec)
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
@ -632,7 +629,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -632,7 +629,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
sampleRate := time.Duration(forma.ClockRate())
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
@ -665,7 +662,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -665,7 +662,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
parsed := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
@ -719,7 +716,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -719,7 +716,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
parsed := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
@ -785,7 +782,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -785,7 +782,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
}
track := addTrack(codec)
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.LPCM)
if tunit.Samples == nil {
return nil
@ -802,7 +799,7 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -802,7 +799,7 @@ func newRecFormatFMP4(a *Agent) recFormat {
}
}
a.Log(logger.Info, "recording %d %s",
f.a.wrapper.Log(logger.Info, "recording %d %s",
len(f.tracks),
func() string {
if len(f.tracks) == 1 {
@ -810,8 +807,6 @@ func newRecFormatFMP4(a *Agent) recFormat { @@ -810,8 +807,6 @@ func newRecFormatFMP4(a *Agent) recFormat {
}
return "tracks"
}())
return f
}
func (f *recFormatFMP4) close() {

6
internal/record/rec_format_fmp4_part.go

@ -65,8 +65,8 @@ func newRecFormatFMP4Part( @@ -65,8 +65,8 @@ func newRecFormatFMP4Part(
func (p *recFormatFMP4Part) close() error {
if p.s.fi == nil {
p.s.fpath = encodeRecordPath(&recordPathParams{time: p.created}, p.s.f.a.path)
p.s.f.a.Log(logger.Debug, "creating segment %s", p.s.fpath)
p.s.fpath = encodeRecordPath(&recordPathParams{time: p.created}, p.s.f.a.resolvedPath)
p.s.f.a.wrapper.Log(logger.Debug, "creating segment %s", p.s.fpath)
err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755)
if err != nil {
@ -78,7 +78,7 @@ func (p *recFormatFMP4Part) close() error { @@ -78,7 +78,7 @@ func (p *recFormatFMP4Part) close() error {
return err
}
p.s.f.a.onSegmentCreate(p.s.fpath)
p.s.f.a.wrapper.OnSegmentCreate(p.s.fpath)
err = writeInit(fi, p.s.f.tracks)
if err != nil {

6
internal/record/rec_format_fmp4_segment.go

@ -60,14 +60,14 @@ func (s *recFormatFMP4Segment) close() error { @@ -60,14 +60,14 @@ func (s *recFormatFMP4Segment) close() error {
}
if s.fi != nil {
s.f.a.Log(logger.Debug, "closing segment %s", s.fpath)
s.f.a.wrapper.Log(logger.Debug, "closing segment %s", s.fpath)
err2 := s.fi.Close()
if err == nil {
err = err2
}
if err2 == nil {
s.f.a.onSegmentComplete(s.fpath)
s.f.a.wrapper.OnSegmentComplete(s.fpath)
}
}
@ -78,7 +78,7 @@ func (s *recFormatFMP4Segment) record(track *recFormatFMP4Track, sample *sample) @@ -78,7 +78,7 @@ func (s *recFormatFMP4Segment) record(track *recFormatFMP4Track, sample *sample)
if s.curPart == nil {
s.curPart = newRecFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts)
s.f.nextSequenceNumber++
} else if s.curPart.duration() >= s.f.a.partDuration {
} else if s.curPart.duration() >= s.f.a.wrapper.PartDuration {
err := s.curPart.close()
s.curPart = nil

2
internal/record/rec_format_fmp4_track.go

@ -44,7 +44,7 @@ func (t *recFormatFMP4Track) record(sample *sample) error { @@ -44,7 +44,7 @@ func (t *recFormatFMP4Track) record(sample *sample) error {
if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) &&
!t.nextSample.IsNonSyncSample &&
(t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.segmentDuration {
(t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.wrapper.SegmentDuration {
err := t.f.currentSegment.close()
if err != nil {
return err

34
internal/record/rec_format_mpegts.go

@ -39,7 +39,7 @@ func (d *dynamicWriter) setTarget(w io.Writer) { @@ -39,7 +39,7 @@ func (d *dynamicWriter) setTarget(w io.Writer) {
}
type recFormatMPEGTS struct {
a *Agent
a *agentInstance
dw *dynamicWriter
bw *bufio.Writer
@ -48,11 +48,7 @@ type recFormatMPEGTS struct { @@ -48,11 +48,7 @@ type recFormatMPEGTS struct {
currentSegment *recFormatMPEGTSSegment
}
func newRecFormatMPEGTS(a *Agent) recFormat {
f := &recFormatMPEGTS{
a: a,
}
func (f *recFormatMPEGTS) initialize() {
var tracks []*mpegts.Track
addTrack := func(codec mpegts.Codec) *mpegts.Track {
@ -63,7 +59,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -63,7 +59,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
return track
}
for _, media := range a.stream.Desc().Medias {
for _, media := range f.a.wrapper.Stream.Desc().Medias {
for _, forma := range media.Formats {
switch forma := forma.(type) {
case *format.H265:
@ -71,7 +67,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -71,7 +67,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
var dtsExtractor *h265.DTSExtractor
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
@ -99,7 +95,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -99,7 +95,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
var dtsExtractor *h264.DTSExtractor
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
@ -128,7 +124,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -128,7 +124,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
firstReceived := false
var lastPTS time.Duration
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
@ -158,7 +154,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -158,7 +154,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
firstReceived := false
var lastPTS time.Duration
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
@ -192,7 +188,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -192,7 +188,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
}(),
})
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
@ -211,7 +207,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -211,7 +207,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
Config: *forma.GetConfig(),
})
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
@ -228,7 +224,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -228,7 +224,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
case *format.MPEG1Audio:
track := addTrack(&mpegts.CodecMPEG1Audio{})
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
@ -247,7 +243,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -247,7 +243,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
sampleRate := time.Duration(forma.SampleRate)
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
f.a.wrapper.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
@ -273,7 +269,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -273,7 +269,7 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize)
f.mw = mpegts.NewWriter(f.bw, tracks)
a.Log(logger.Info, "recording %d %s",
f.a.wrapper.Log(logger.Info, "recording %d %s",
len(tracks),
func() string {
if len(tracks) == 1 {
@ -281,8 +277,6 @@ func newRecFormatMPEGTS(a *Agent) recFormat { @@ -281,8 +277,6 @@ func newRecFormatMPEGTS(a *Agent) recFormat {
}
return "tracks"
}())
return f
}
func (f *recFormatMPEGTS) close() {
@ -298,7 +292,7 @@ func (f *recFormatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAc @@ -298,7 +292,7 @@ func (f *recFormatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAc
case (!f.hasVideo || isVideo) &&
randomAccess &&
(dts-f.currentSegment.startDTS) >= f.a.segmentDuration:
(dts-f.currentSegment.startDTS) >= f.a.wrapper.SegmentDuration:
err := f.currentSegment.close()
if err != nil {
return err
@ -306,7 +300,7 @@ func (f *recFormatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAc @@ -306,7 +300,7 @@ func (f *recFormatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAc
f.currentSegment = newRecFormatMPEGTSSegment(f, dts)
case (dts - f.currentSegment.lastFlush) >= f.a.partDuration:
case (dts - f.currentSegment.lastFlush) >= f.a.wrapper.PartDuration:
err := f.bw.Flush()
if err != nil {
return err

10
internal/record/rec_format_mpegts_segment.go

@ -35,14 +35,14 @@ func (s *recFormatMPEGTSSegment) close() error { @@ -35,14 +35,14 @@ func (s *recFormatMPEGTSSegment) close() error {
err := s.f.bw.Flush()
if s.fi != nil {
s.f.a.Log(logger.Debug, "closing segment %s", s.fpath)
s.f.a.wrapper.Log(logger.Debug, "closing segment %s", s.fpath)
err2 := s.fi.Close()
if err == nil {
err = err2
}
if err2 == nil {
s.f.a.onSegmentComplete(s.fpath)
s.f.a.wrapper.OnSegmentComplete(s.fpath)
}
}
@ -51,8 +51,8 @@ func (s *recFormatMPEGTSSegment) close() error { @@ -51,8 +51,8 @@ func (s *recFormatMPEGTSSegment) close() error {
func (s *recFormatMPEGTSSegment) Write(p []byte) (int, error) {
if s.fi == nil {
s.fpath = encodeRecordPath(&recordPathParams{time: s.created}, s.f.a.path)
s.f.a.Log(logger.Debug, "creating segment %s", s.fpath)
s.fpath = encodeRecordPath(&recordPathParams{time: s.created}, s.f.a.resolvedPath)
s.f.a.wrapper.Log(logger.Debug, "creating segment %s", s.fpath)
err := os.MkdirAll(filepath.Dir(s.fpath), 0o755)
if err != nil {
@ -64,7 +64,7 @@ func (s *recFormatMPEGTSSegment) Write(p []byte) (int, error) { @@ -64,7 +64,7 @@ func (s *recFormatMPEGTSSegment) Write(p []byte) (int, error) {
return 0, err
}
s.f.a.onSegmentCreate(s.fpath)
s.f.a.wrapper.OnSegmentCreate(s.fpath)
s.fi = fi
}

Loading…
Cancel
Save