Browse Source

fix missing H264/H265 keyframe warning message (#1825)

pull/1829/head
Alessandro Ros 3 years ago committed by GitHub
parent
commit
581a840771
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      internal/formatprocessor/av1.go
  2. 40
      internal/formatprocessor/av1_test.go
  3. 36
      internal/formatprocessor/h264.go
  4. 44
      internal/formatprocessor/h264_test.go
  5. 36
      internal/formatprocessor/h265.go
  6. 32
      internal/formatprocessor/h265_test.go

36
internal/formatprocessor/av1.go

@ -35,9 +35,10 @@ type formatProcessorAV1 struct { @@ -35,9 +35,10 @@ type formatProcessorAV1 struct {
format *formats.AV1
log logger.Writer
encoder *rtpav1.Encoder
decoder *rtpav1.Decoder
lastKeyFrameReceived time.Time
encoder *rtpav1.Encoder
decoder *rtpav1.Decoder
lastKeyFrameTimeReceived bool
lastKeyFrameTime time.Time
}
func newAV1(
@ -57,27 +58,27 @@ func newAV1( @@ -57,27 +58,27 @@ func newAV1(
PayloadMaxSize: t.udpMaxPayloadSize - 12,
}
t.encoder.Init()
t.lastKeyFrameReceived = time.Now()
}
return t, nil
}
func (t *formatProcessorAV1) checkKeyFrameInterval(containsKeyFrame bool) {
if containsKeyFrame {
t.lastKeyFrameReceived = time.Now()
} else {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no AV1 key frames received in %v, stream can't be decoded", maxKeyFrameInterval)
}
func (t *formatProcessorAV1) checkKeyFrameInterval(ntp time.Time, isKeyFrame bool) {
if !t.lastKeyFrameTimeReceived || isKeyFrame {
t.lastKeyFrameTimeReceived = true
t.lastKeyFrameTime = ntp
return
}
if ntp.Sub(t.lastKeyFrameTime) >= maxKeyFrameInterval {
t.lastKeyFrameTime = ntp
t.log.Log(logger.Warn, "no AV1 key frames received in %v, stream can't be decoded", maxKeyFrameInterval)
}
}
func (t *formatProcessorAV1) checkOBUs(obus [][]byte) {
func (t *formatProcessorAV1) checkOBUs(ntp time.Time, obus [][]byte) {
containsKeyFrame, _ := av1.ContainsKeyFrame(obus)
t.checkKeyFrameInterval(containsKeyFrame)
t.checkKeyFrameInterval(ntp, containsKeyFrame)
}
func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
@ -99,7 +100,6 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -99,7 +100,6 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error {
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
t.lastKeyFrameReceived = time.Now()
}
// DecodeUntilMarker() is necessary, otherwise Encode() generates partial groups
@ -112,7 +112,7 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -112,7 +112,7 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error {
}
tunit.OBUs = obus
t.checkOBUs(obus)
t.checkOBUs(tunit.NTP, obus)
tunit.PTS = pts
}
@ -120,7 +120,7 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -120,7 +120,7 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error {
return nil
}
t.checkOBUs(tunit.OBUs)
t.checkOBUs(tunit.NTP, tunit.OBUs)
// encode into RTP
pkts, err := t.encoder.Encode(tunit.OBUs, tunit.PTS)

40
internal/formatprocessor/av1_test.go

@ -0,0 +1,40 @@ @@ -0,0 +1,40 @@
package formatprocessor
import (
"testing"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/stretchr/testify/require"
)
func TestAV1KeyFrameWarning(t *testing.T) {
forma := &formats.AV1{
PayloadTyp: 96,
}
w := &testLogWriter{recv: make(chan string, 1)}
p, err := New(1472, forma, true, w)
require.NoError(t, err)
ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
err = p.Process(&UnitAV1{
OBUs: [][]byte{
{0x01},
},
NTP: ntp,
}, false)
require.NoError(t, err)
ntp = ntp.Add(30 * time.Second)
err = p.Process(&UnitAV1{
OBUs: [][]byte{
{0x01},
},
NTP: ntp,
}, false)
require.NoError(t, err)
logl := <-w.recv
require.Equal(t, "no AV1 key frames received in 10s, stream can't be decoded", logl)
}

36
internal/formatprocessor/h264.go

@ -92,9 +92,10 @@ type formatProcessorH264 struct { @@ -92,9 +92,10 @@ type formatProcessorH264 struct {
format *formats.H264
log logger.Writer
encoder *rtph264.Encoder
decoder *rtph264.Decoder
lastKeyFrameReceived time.Time
encoder *rtph264.Encoder
decoder *rtph264.Decoder
lastKeyFrameTimeReceived bool
lastKeyFrameTime time.Time
}
func newH264(
@ -116,7 +117,6 @@ func newH264( @@ -116,7 +117,6 @@ func newH264(
PacketizationMode: forma.PacketizationMode,
}
t.encoder.Init()
t.lastKeyFrameReceived = time.Now()
}
return t, nil
@ -173,19 +173,20 @@ func (t *formatProcessorH264) updateTrackParametersFromNALUs(nalus [][]byte) { @@ -173,19 +173,20 @@ func (t *formatProcessorH264) updateTrackParametersFromNALUs(nalus [][]byte) {
}
}
func (t *formatProcessorH264) checkKeyFrameInterval(isKeyFrame bool) {
if isKeyFrame {
t.lastKeyFrameReceived = time.Now()
} else {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no H264 key frames received in %v, stream can't be decoded")
}
func (t *formatProcessorH264) checkKeyFrameInterval(ntp time.Time, isKeyFrame bool) {
if !t.lastKeyFrameTimeReceived || isKeyFrame {
t.lastKeyFrameTimeReceived = true
t.lastKeyFrameTime = ntp
return
}
if ntp.Sub(t.lastKeyFrameTime) >= maxKeyFrameInterval {
t.lastKeyFrameTime = ntp
t.log.Log(logger.Warn, "no H264 key frames received in %v, stream can't be decoded", maxKeyFrameInterval)
}
}
func (t *formatProcessorH264) remuxAccessUnit(nalus [][]byte) [][]byte {
func (t *formatProcessorH264) remuxAccessUnit(ntp time.Time, nalus [][]byte) [][]byte {
isKeyFrame := false
n := 0
@ -212,7 +213,7 @@ func (t *formatProcessorH264) remuxAccessUnit(nalus [][]byte) [][]byte { @@ -212,7 +213,7 @@ func (t *formatProcessorH264) remuxAccessUnit(nalus [][]byte) [][]byte {
n++
}
t.checkKeyFrameInterval(isKeyFrame)
t.checkKeyFrameInterval(ntp, isKeyFrame)
if n == 0 {
return nil
@ -278,7 +279,6 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -278,7 +279,6 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error {
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
t.lastKeyFrameReceived = time.Now()
}
if t.encoder != nil {
@ -294,7 +294,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -294,7 +294,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error {
return err
}
tunit.AU = t.remuxAccessUnit(au)
tunit.AU = t.remuxAccessUnit(tunit.NTP, au)
tunit.PTS = pts
}
@ -304,7 +304,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -304,7 +304,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error {
}
} else {
t.updateTrackParametersFromNALUs(tunit.AU)
tunit.AU = t.remuxAccessUnit(tunit.AU)
tunit.AU = t.remuxAccessUnit(tunit.NTP, tunit.AU)
}
// encode into RTP

44
internal/formatprocessor/h264_test.go

@ -2,14 +2,26 @@ package formatprocessor @@ -2,14 +2,26 @@ package formatprocessor
import (
"bytes"
"fmt"
"testing"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/logger"
)
type testLogWriter struct {
recv chan string
}
func (w *testLogWriter) Log(level logger.Level, format string, args ...interface{}) {
w.recv <- fmt.Sprintf(format, args...)
}
func TestH264DynamicParams(t *testing.T) {
forma := &formats.H264{
PayloadTyp: 96,
@ -173,3 +185,35 @@ func TestH264EmptyPacket(t *testing.T) { @@ -173,3 +185,35 @@ func TestH264EmptyPacket(t *testing.T) {
// if all NALUs have been removed, no RTP packets must be generated.
require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets)
}
func TestH264KeyFrameWarning(t *testing.T) {
forma := &formats.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}
w := &testLogWriter{recv: make(chan string, 1)}
p, err := New(1472, forma, true, w)
require.NoError(t, err)
ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
err = p.Process(&UnitH264{
AU: [][]byte{
{0x01},
},
NTP: ntp,
}, false)
require.NoError(t, err)
ntp = ntp.Add(30 * time.Second)
err = p.Process(&UnitH264{
AU: [][]byte{
{0x01},
},
NTP: ntp,
}, false)
require.NoError(t, err)
logl := <-w.recv
require.Equal(t, "no H264 key frames received in 10s, stream can't be decoded", logl)
}

36
internal/formatprocessor/h265.go

@ -99,9 +99,10 @@ type formatProcessorH265 struct { @@ -99,9 +99,10 @@ type formatProcessorH265 struct {
format *formats.H265
log logger.Writer
encoder *rtph265.Encoder
decoder *rtph265.Decoder
lastKeyFrameReceived time.Time
encoder *rtph265.Encoder
decoder *rtph265.Decoder
lastKeyFrameTimeReceived bool
lastKeyFrameTime time.Time
}
func newH265(
@ -122,7 +123,6 @@ func newH265( @@ -122,7 +123,6 @@ func newH265(
PayloadType: forma.PayloadTyp,
}
t.encoder.Init()
t.lastKeyFrameReceived = time.Now()
}
return t, nil
@ -193,19 +193,20 @@ func (t *formatProcessorH265) updateTrackParametersFromNALUs(nalus [][]byte) { @@ -193,19 +193,20 @@ func (t *formatProcessorH265) updateTrackParametersFromNALUs(nalus [][]byte) {
}
}
func (t *formatProcessorH265) checkKeyFrameInterval(isKeyFrame bool) {
if isKeyFrame {
t.lastKeyFrameReceived = time.Now()
} else {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no H265 key frames received in %v, stream can't be decoded")
}
func (t *formatProcessorH265) checkKeyFrameInterval(ntp time.Time, isKeyFrame bool) {
if !t.lastKeyFrameTimeReceived || isKeyFrame {
t.lastKeyFrameTimeReceived = true
t.lastKeyFrameTime = ntp
return
}
if ntp.Sub(t.lastKeyFrameTime) >= maxKeyFrameInterval {
t.lastKeyFrameTime = ntp
t.log.Log(logger.Warn, "no H265 key frames received in %v, stream can't be decoded", maxKeyFrameInterval)
}
}
func (t *formatProcessorH265) remuxAccessUnit(nalus [][]byte) [][]byte {
func (t *formatProcessorH265) remuxAccessUnit(ntp time.Time, nalus [][]byte) [][]byte {
isKeyFrame := false
n := 0
@ -232,7 +233,7 @@ func (t *formatProcessorH265) remuxAccessUnit(nalus [][]byte) [][]byte { @@ -232,7 +233,7 @@ func (t *formatProcessorH265) remuxAccessUnit(nalus [][]byte) [][]byte {
n++
}
t.checkKeyFrameInterval(isKeyFrame)
t.checkKeyFrameInterval(ntp, isKeyFrame)
if n == 0 {
return nil
@ -299,7 +300,6 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -299,7 +300,6 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error {
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
t.lastKeyFrameReceived = time.Now()
}
if t.encoder != nil {
@ -315,7 +315,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -315,7 +315,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error {
return err
}
tunit.AU = t.remuxAccessUnit(au)
tunit.AU = t.remuxAccessUnit(tunit.NTP, au)
tunit.PTS = pts
}
@ -325,7 +325,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -325,7 +325,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error {
}
} else {
t.updateTrackParametersFromNALUs(tunit.AU)
tunit.AU = t.remuxAccessUnit(tunit.AU)
tunit.AU = t.remuxAccessUnit(tunit.NTP, tunit.AU)
}
// encode into RTP

32
internal/formatprocessor/h265_test.go

@ -3,6 +3,7 @@ package formatprocessor @@ -3,6 +3,7 @@ package formatprocessor
import (
"bytes"
"testing"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
@ -166,3 +167,34 @@ func TestH265EmptyPacket(t *testing.T) { @@ -166,3 +167,34 @@ func TestH265EmptyPacket(t *testing.T) {
// if all NALUs have been removed, no RTP packets must be generated.
require.Equal(t, []*rtp.Packet(nil), unit.RTPPackets)
}
func TestH265KeyFrameWarning(t *testing.T) {
forma := &formats.H265{
PayloadTyp: 96,
}
w := &testLogWriter{recv: make(chan string, 1)}
p, err := New(1472, forma, true, w)
require.NoError(t, err)
ntp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
err = p.Process(&UnitH265{
AU: [][]byte{
{0x01},
},
NTP: ntp,
}, false)
require.NoError(t, err)
ntp = ntp.Add(30 * time.Second)
err = p.Process(&UnitH265{
AU: [][]byte{
{0x01},
},
NTP: ntp,
}, false)
require.NoError(t, err)
logl := <-w.recv
require.Equal(t, "no H265 key frames received in 10s, stream can't be decoded", logl)
}

Loading…
Cancel
Save