diff --git a/internal/formatprocessor/av1.go b/internal/formatprocessor/av1.go index 8e847307..3425e396 100644 --- a/internal/formatprocessor/av1.go +++ b/internal/formatprocessor/av1.go @@ -35,9 +35,10 @@ type formatProcessorAV1 struct { format *formats.AV1 log logger.Writer - encoder *rtpav1.Encoder - decoder *rtpav1.Decoder - lastKeyFrameReceived time.Time + encoder *rtpav1.Encoder + decoder *rtpav1.Decoder + lastKeyFrameTimeReceived bool + lastKeyFrameTime time.Time } func newAV1( @@ -57,27 +58,27 @@ func newAV1( PayloadMaxSize: t.udpMaxPayloadSize - 12, } t.encoder.Init() - t.lastKeyFrameReceived = time.Now() } return t, nil } -func (t *formatProcessorAV1) checkKeyFrameInterval(containsKeyFrame bool) { - if containsKeyFrame { - t.lastKeyFrameReceived = time.Now() - } else { - now := time.Now() - if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval { - t.lastKeyFrameReceived = now - t.log.Log(logger.Warn, "no AV1 key frames received in %v, stream can't be decoded", maxKeyFrameInterval) - } +func (t *formatProcessorAV1) checkKeyFrameInterval(ntp time.Time, isKeyFrame bool) { + if !t.lastKeyFrameTimeReceived || isKeyFrame { + t.lastKeyFrameTimeReceived = true + t.lastKeyFrameTime = ntp + return + } + + if ntp.Sub(t.lastKeyFrameTime) >= maxKeyFrameInterval { + t.lastKeyFrameTime = ntp + t.log.Log(logger.Warn, "no AV1 key frames received in %v, stream can't be decoded", maxKeyFrameInterval) } } -func (t *formatProcessorAV1) checkOBUs(obus [][]byte) { +func (t *formatProcessorAV1) checkOBUs(ntp time.Time, obus [][]byte) { containsKeyFrame, _ := av1.ContainsKeyFrame(obus) - t.checkKeyFrameInterval(containsKeyFrame) + t.checkKeyFrameInterval(ntp, containsKeyFrame) } func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl @@ -99,7 +100,6 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { if hasNonRTSPReaders || t.decoder != nil { if t.decoder == nil { t.decoder = t.format.CreateDecoder() - t.lastKeyFrameReceived = time.Now() } // DecodeUntilMarker() is necessary, otherwise Encode() generates partial groups @@ -112,7 +112,7 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { } tunit.OBUs = obus - t.checkOBUs(obus) + t.checkOBUs(tunit.NTP, obus) tunit.PTS = pts } @@ -120,7 +120,7 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { return nil } - t.checkOBUs(tunit.OBUs) + t.checkOBUs(tunit.NTP, tunit.OBUs) // encode into RTP pkts, err := t.encoder.Encode(tunit.OBUs, tunit.PTS) diff --git a/internal/formatprocessor/av1_test.go b/internal/formatprocessor/av1_test.go new file mode 100644 index 00000000..49eb2ce0 --- /dev/null +++ b/internal/formatprocessor/av1_test.go @@ -0,0 +1,40 @@ +package formatprocessor + +import ( + "testing" + "time" + + "github.com/bluenviron/gortsplib/v3/pkg/formats" + "github.com/stretchr/testify/require" +) + +func TestAV1KeyFrameWarning(t *testing.T) { + forma := &formats.AV1{ + PayloadTyp: 96, + } + + w := &testLogWriter{recv: make(chan string, 1)} + p, err := New(1472, forma, true, w) + require.NoError(t, err) + + ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + err = p.Process(&UnitAV1{ + OBUs: [][]byte{ + {0x01}, + }, + NTP: ntp, + }, false) + require.NoError(t, err) + + ntp = ntp.Add(30 * time.Second) + err = p.Process(&UnitAV1{ + OBUs: [][]byte{ + {0x01}, + }, + NTP: ntp, + }, false) + require.NoError(t, err) + + logl := <-w.recv + require.Equal(t, "no AV1 key frames received in 10s, stream can't be decoded", logl) +} diff --git a/internal/formatprocessor/h264.go b/internal/formatprocessor/h264.go index 6673b871..41d88524 100644 --- a/internal/formatprocessor/h264.go +++ b/internal/formatprocessor/h264.go @@ -92,9 +92,10 @@ type formatProcessorH264 struct { format *formats.H264 log logger.Writer - encoder *rtph264.Encoder - decoder *rtph264.Decoder - lastKeyFrameReceived time.Time + encoder *rtph264.Encoder + decoder *rtph264.Decoder + lastKeyFrameTimeReceived bool + lastKeyFrameTime time.Time } func newH264( @@ -116,7 +117,6 @@ func newH264( PacketizationMode: forma.PacketizationMode, } t.encoder.Init() - t.lastKeyFrameReceived = time.Now() } return t, nil @@ -173,19 +173,20 @@ func (t *formatProcessorH264) updateTrackParametersFromNALUs(nalus [][]byte) { } } -func (t *formatProcessorH264) checkKeyFrameInterval(isKeyFrame bool) { - if isKeyFrame { - t.lastKeyFrameReceived = time.Now() - } else { - now := time.Now() - if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval { - t.lastKeyFrameReceived = now - t.log.Log(logger.Warn, "no H264 key frames received in %v, stream can't be decoded") - } +func (t *formatProcessorH264) checkKeyFrameInterval(ntp time.Time, isKeyFrame bool) { + if !t.lastKeyFrameTimeReceived || isKeyFrame { + t.lastKeyFrameTimeReceived = true + t.lastKeyFrameTime = ntp + return + } + + if ntp.Sub(t.lastKeyFrameTime) >= maxKeyFrameInterval { + t.lastKeyFrameTime = ntp + t.log.Log(logger.Warn, "no H264 key frames received in %v, stream can't be decoded", maxKeyFrameInterval) } } -func (t *formatProcessorH264) remuxAccessUnit(nalus [][]byte) [][]byte { +func (t *formatProcessorH264) remuxAccessUnit(ntp time.Time, nalus [][]byte) [][]byte { isKeyFrame := false n := 0 @@ -212,7 +213,7 @@ func (t *formatProcessorH264) remuxAccessUnit(nalus [][]byte) [][]byte { n++ } - t.checkKeyFrameInterval(isKeyFrame) + t.checkKeyFrameInterval(ntp, isKeyFrame) if n == 0 { return nil @@ -278,7 +279,6 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil { if t.decoder == nil { t.decoder = t.format.CreateDecoder() - t.lastKeyFrameReceived = time.Now() } if t.encoder != nil { @@ -294,7 +294,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { return err } - tunit.AU = t.remuxAccessUnit(au) + tunit.AU = t.remuxAccessUnit(tunit.NTP, au) tunit.PTS = pts } @@ -304,7 +304,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { } } else { t.updateTrackParametersFromNALUs(tunit.AU) - tunit.AU = t.remuxAccessUnit(tunit.AU) + tunit.AU = t.remuxAccessUnit(tunit.NTP, tunit.AU) } // encode into RTP diff --git a/internal/formatprocessor/h264_test.go b/internal/formatprocessor/h264_test.go index 268624b4..7b2b4cf9 100644 --- a/internal/formatprocessor/h264_test.go +++ b/internal/formatprocessor/h264_test.go @@ -2,14 +2,26 @@ package formatprocessor import ( "bytes" + "fmt" "testing" + "time" "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/pion/rtp" "github.com/stretchr/testify/require" + + "github.com/bluenviron/mediamtx/internal/logger" ) +type testLogWriter struct { + recv chan string +} + +func (w *testLogWriter) Log(level logger.Level, format string, args ...interface{}) { + w.recv <- fmt.Sprintf(format, args...) +} + func TestH264DynamicParams(t *testing.T) { forma := &formats.H264{ PayloadTyp: 96, @@ -173,3 +185,35 @@ func TestH264EmptyPacket(t *testing.T) { // if all NALUs have been removed, no RTP packets must be generated. require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets) } + +func TestH264KeyFrameWarning(t *testing.T) { + forma := &formats.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + } + + w := &testLogWriter{recv: make(chan string, 1)} + p, err := New(1472, forma, true, w) + require.NoError(t, err) + + ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + err = p.Process(&UnitH264{ + AU: [][]byte{ + {0x01}, + }, + NTP: ntp, + }, false) + require.NoError(t, err) + + ntp = ntp.Add(30 * time.Second) + err = p.Process(&UnitH264{ + AU: [][]byte{ + {0x01}, + }, + NTP: ntp, + }, false) + require.NoError(t, err) + + logl := <-w.recv + require.Equal(t, "no H264 key frames received in 10s, stream can't be decoded", logl) +} diff --git a/internal/formatprocessor/h265.go b/internal/formatprocessor/h265.go index 30168934..e46dee15 100644 --- a/internal/formatprocessor/h265.go +++ b/internal/formatprocessor/h265.go @@ -99,9 +99,10 @@ type formatProcessorH265 struct { format *formats.H265 log logger.Writer - encoder *rtph265.Encoder - decoder *rtph265.Decoder - lastKeyFrameReceived time.Time + encoder *rtph265.Encoder + decoder *rtph265.Decoder + lastKeyFrameTimeReceived bool + lastKeyFrameTime time.Time } func newH265( @@ -122,7 +123,6 @@ func newH265( PayloadType: forma.PayloadTyp, } t.encoder.Init() - t.lastKeyFrameReceived = time.Now() } return t, nil @@ -193,19 +193,20 @@ func (t *formatProcessorH265) updateTrackParametersFromNALUs(nalus [][]byte) { } } -func (t *formatProcessorH265) checkKeyFrameInterval(isKeyFrame bool) { - if isKeyFrame { - t.lastKeyFrameReceived = time.Now() - } else { - now := time.Now() - if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval { - t.lastKeyFrameReceived = now - t.log.Log(logger.Warn, "no H265 key frames received in %v, stream can't be decoded") - } +func (t *formatProcessorH265) checkKeyFrameInterval(ntp time.Time, isKeyFrame bool) { + if !t.lastKeyFrameTimeReceived || isKeyFrame { + t.lastKeyFrameTimeReceived = true + t.lastKeyFrameTime = ntp + return + } + + if ntp.Sub(t.lastKeyFrameTime) >= maxKeyFrameInterval { + t.lastKeyFrameTime = ntp + t.log.Log(logger.Warn, "no H265 key frames received in %v, stream can't be decoded", maxKeyFrameInterval) } } -func (t *formatProcessorH265) remuxAccessUnit(nalus [][]byte) [][]byte { +func (t *formatProcessorH265) remuxAccessUnit(ntp time.Time, nalus [][]byte) [][]byte { isKeyFrame := false n := 0 @@ -232,7 +233,7 @@ func (t *formatProcessorH265) remuxAccessUnit(nalus [][]byte) [][]byte { n++ } - t.checkKeyFrameInterval(isKeyFrame) + t.checkKeyFrameInterval(ntp, isKeyFrame) if n == 0 { return nil @@ -299,7 +300,6 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil { if t.decoder == nil { t.decoder = t.format.CreateDecoder() - t.lastKeyFrameReceived = time.Now() } if t.encoder != nil { @@ -315,7 +315,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { return err } - tunit.AU = t.remuxAccessUnit(au) + tunit.AU = t.remuxAccessUnit(tunit.NTP, au) tunit.PTS = pts } @@ -325,7 +325,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { } } else { t.updateTrackParametersFromNALUs(tunit.AU) - tunit.AU = t.remuxAccessUnit(tunit.AU) + tunit.AU = t.remuxAccessUnit(tunit.NTP, tunit.AU) } // encode into RTP diff --git a/internal/formatprocessor/h265_test.go b/internal/formatprocessor/h265_test.go index 14af29ea..ea470ab5 100644 --- a/internal/formatprocessor/h265_test.go +++ b/internal/formatprocessor/h265_test.go @@ -3,6 +3,7 @@ package formatprocessor import ( "bytes" "testing" + "time" "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/mediacommon/pkg/codecs/h265" @@ -166,3 +167,34 @@ func TestH265EmptyPacket(t *testing.T) { // if all NALUs have been removed, no RTP packets must be generated. require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets) } + +func TestH265KeyFrameWarning(t *testing.T) { + forma := &formats.H265{ + PayloadTyp: 96, + } + + w := &testLogWriter{recv: make(chan string, 1)} + p, err := New(1472, forma, true, w) + require.NoError(t, err) + + ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + err = p.Process(&UnitH265{ + AU: [][]byte{ + {0x01}, + }, + NTP: ntp, + }, false) + require.NoError(t, err) + + ntp = ntp.Add(30 * time.Second) + err = p.Process(&UnitH265{ + AU: [][]byte{ + {0x01}, + }, + NTP: ntp, + }, false) + require.NoError(t, err) + + logl := <-w.recv + require.Equal(t, "no H265 key frames received in 10s, stream can't be decoded", logl) +}