diff --git a/internal/formatprocessor/av1.go b/internal/formatprocessor/av1.go index 0c8622d4..9886c148 100644 --- a/internal/formatprocessor/av1.go +++ b/internal/formatprocessor/av1.go @@ -47,63 +47,68 @@ func (t *formatProcessorAV1) createEncoder() error { return t.encoder.Init() } -func (t *formatProcessorAV1) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := u.(*unit.AV1) +func (t *formatProcessorAV1) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.AV1) - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - if pkt.MarshalSize() > t.udpMaxPayloadSize { - return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", - pkt.MarshalSize(), t.udpMaxPayloadSize) - } - - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } - } - - tu, err := t.decoder.Decode(pkt) - if err != nil { - if err == rtpav1.ErrNonStartingPacketAndNoPrevious || err == rtpav1.ErrMorePacketsNeeded { - return nil - } - return err - } - - tunit.TU = tu - } - - // route packet as is - return nil - } - - // encode into RTP - pkts, err := t.encoder.Encode(tunit.TU) + pkts, err := t.encoder.Encode(u.TU) if err != nil { return err } - setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS) - tunit.RTPPackets = pkts + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts return nil } -func (t *formatProcessorAV1) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.AV1{ +func (t *formatProcessorAV1) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.AV1{ Base: unit.Base{ RTPPackets: []*rtp.Packet{pkt}, NTP: ntp, PTS: pts, }, } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + tu, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpav1.ErrNonStartingPacketAndNoPrevious || err == rtpav1.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.TU = tu + } + + // route packet as is + return u, nil } diff --git a/internal/formatprocessor/base_unit.go b/internal/formatprocessor/base_unit.go deleted file mode 100644 index e8355375..00000000 --- a/internal/formatprocessor/base_unit.go +++ /dev/null @@ -1,29 +0,0 @@ -package formatprocessor - -import ( - "time" - - "github.com/pion/rtp" -) - -// BaseUnit contains fields shared across all units. -type BaseUnit struct { - RTPPackets []*rtp.Packet - NTP time.Time - PTS time.Duration -} - -// GetRTPPackets implements Unit. -func (u *BaseUnit) GetRTPPackets() []*rtp.Packet { - return u.RTPPackets -} - -// GetNTP implements Unit. -func (u *BaseUnit) GetNTP() time.Time { - return u.NTP -} - -// GetPTS implements Unit. -func (u *BaseUnit) GetPTS() time.Duration { - return u.PTS -} diff --git a/internal/formatprocessor/generic.go b/internal/formatprocessor/generic.go index 9122a1f8..0140aa80 100644 --- a/internal/formatprocessor/generic.go +++ b/internal/formatprocessor/generic.go @@ -28,29 +28,32 @@ func newGeneric( }, nil } -func (t *formatProcessorGeneric) Process(u unit.Unit, _ bool) error { - tunit := u.(*unit.Generic) +func (t *formatProcessorGeneric) ProcessUnit(_ unit.Unit) error { + return fmt.Errorf("using a generic unit without RTP is not supported") +} - pkt := tunit.RTPPackets[0] +func (t *formatProcessorGeneric) ProcessRTPPacket( + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + _ bool, +) (Unit, error) { + u := &unit.Generic{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } // remove padding pkt.Header.Padding = false pkt.PaddingSize = 0 if pkt.MarshalSize() > t.udpMaxPayloadSize { - return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", pkt.MarshalSize(), t.udpMaxPayloadSize) } - return nil -} - -func (t *formatProcessorGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.Generic{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, - PTS: pts, - }, - } + return u, nil } diff --git a/internal/formatprocessor/generic_test.go b/internal/formatprocessor/generic_test.go index 6b29d1b0..ab390fe6 100644 --- a/internal/formatprocessor/generic_test.go +++ b/internal/formatprocessor/generic_test.go @@ -2,12 +2,11 @@ package formatprocessor import ( "testing" + "time" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" "github.com/stretchr/testify/require" - - "github.com/bluenviron/mediamtx/internal/unit" ) func TestGenericRemovePadding(t *testing.T) { @@ -31,15 +30,11 @@ func TestGenericRemovePadding(t *testing.T) { SSRC: 563423, Padding: true, }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Payload: []byte{1, 2, 3, 4}, PaddingSize: 20, } - err = p.Process(&unit.Generic{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkt}, - }, - }, false) + _, err = p.ProcessRTPPacket(pkt, time.Time{}, 0, false) require.NoError(t, err) require.Equal(t, &rtp.Packet{ @@ -51,6 +46,6 @@ func TestGenericRemovePadding(t *testing.T) { Timestamp: 45343, SSRC: 563423, }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, + Payload: []byte{1, 2, 3, 4}, }, pkt) } diff --git a/internal/formatprocessor/h264.go b/internal/formatprocessor/h264.go index d2673244..ffe0d134 100644 --- a/internal/formatprocessor/h264.go +++ b/internal/formatprocessor/h264.go @@ -220,83 +220,105 @@ func (t *formatProcessorH264) remuxAccessUnit(au [][]byte) [][]byte { return filteredNALUs } -func (t *formatProcessorH264) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := u.(*unit.H264) - - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - t.updateTrackParametersFromRTPPacket(pkt) - - if t.encoder == nil { - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - // RTP packets exceed maximum size: start re-encoding them - if pkt.MarshalSize() > t.udpMaxPayloadSize { - v1 := pkt.SSRC - v2 := pkt.SequenceNumber - err := t.createEncoder(&v1, &v2) - if err != nil { - return err - } - } +func (t *formatProcessorH264) ProcessUnit(uu unit.Unit) error { + u := uu.(*unit.H264) + + t.updateTrackParametersFromAU(u.AU) + u.AU = t.remuxAccessUnit(u.AU) + + if u.AU != nil { + pkts, err := t.encoder.Encode(u.AU) + if err != nil { + return err } - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts + } + + return nil +} + +func (t *formatProcessorH264) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.H264{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + t.updateTrackParametersFromRTPPacket(pkt) + + if t.encoder == nil { + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + // RTP packets exceed maximum size: start re-encoding them + if pkt.MarshalSize() > t.udpMaxPayloadSize { + v1 := pkt.SSRC + v2 := pkt.SequenceNumber + err := t.createEncoder(&v1, &v2) + if err != nil { + return nil, err } + } + } - au, err := t.decoder.Decode(pkt) + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() if err != nil { - if err == rtph264.ErrNonStartingPacketAndNoPrevious || err == rtph264.ErrMorePacketsNeeded { - if t.encoder != nil { - tunit.RTPPackets = nil - } - return nil - } - return err + return nil, err } + } + + au, err := t.decoder.Decode(pkt) - tunit.AU = t.remuxAccessUnit(au) + if t.encoder != nil { + u.RTPPackets = nil } - // route packet as is - if t.encoder == nil { - return nil + if err != nil { + if err == rtph264.ErrNonStartingPacketAndNoPrevious || err == rtph264.ErrMorePacketsNeeded { + return u, nil + } + return nil, err } - } else { - t.updateTrackParametersFromAU(tunit.AU) - tunit.AU = t.remuxAccessUnit(tunit.AU) + + u.AU = t.remuxAccessUnit(au) + } + + // route packet as is + if t.encoder == nil { + return u, nil } // encode into RTP - if len(tunit.AU) != 0 { - pkts, err := t.encoder.Encode(tunit.AU) + if len(u.AU) != 0 { + pkts, err := t.encoder.Encode(u.AU) if err != nil { - return err + return nil, err } - setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS) - tunit.RTPPackets = pkts - } else { - tunit.RTPPackets = nil - } - return nil -} + for _, newPKT := range pkts { + newPKT.Timestamp = pkt.Timestamp + } -func (t *formatProcessorH264) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.H264{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, - PTS: pts, - }, + u.RTPPackets = pkts } + + return u, nil } diff --git a/internal/formatprocessor/h264_test.go b/internal/formatprocessor/h264_test.go index 46b02a16..9477676b 100644 --- a/internal/formatprocessor/h264_test.go +++ b/internal/formatprocessor/h264_test.go @@ -3,6 +3,7 @@ package formatprocessor import ( "bytes" "testing" + "time" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/codecs/h264" @@ -27,36 +28,23 @@ func TestH264DynamicParams(t *testing.T) { pkts, err := enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}) require.NoError(t, err) - data := &unit.H264{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - } - err = p.Process(data, true) + data, err := p.ProcessRTPPacket(pkts[0], time.Time{}, 0, true) require.NoError(t, err) require.Equal(t, [][]byte{ {byte(h264.NALUTypeIDR)}, - }, data.AU) + }, data.(*unit.H264).AU) pkts, err = enc.Encode([][]byte{{7, 4, 5, 6}}) // SPS require.NoError(t, err) - err = p.Process(&unit.H264{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - }, false) + _, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false) require.NoError(t, err) pkts, err = enc.Encode([][]byte{{8, 1}}) // PPS require.NoError(t, err) - err = p.Process(&unit.H264{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - }, false) + _, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false) require.NoError(t, err) require.Equal(t, []byte{7, 4, 5, 6}, forma.SPS) @@ -65,19 +53,14 @@ func TestH264DynamicParams(t *testing.T) { pkts, err = enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}) require.NoError(t, err) - data = &unit.H264{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - } - err = p.Process(data, true) + data, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, true) require.NoError(t, err) require.Equal(t, [][]byte{ {0x07, 4, 5, 6}, {0x08, 1}, {byte(h264.NALUTypeIDR)}, - }, data.AU) + }, data.(*unit.H264).AU) } func TestH264OversizedPackets(t *testing.T) { @@ -131,15 +114,10 @@ func TestH264OversizedPackets(t *testing.T) { Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04}, }, } { - data := &unit.H264{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkt}, - }, - } - err := p.Process(data, false) + data, err := p.ProcessRTPPacket(pkt, time.Time{}, 0, false) require.NoError(t, err) - out = append(out, data.RTPPackets...) + out = append(out, data.GetRTPPackets()...) } require.Equal(t, []*rtp.Packet{ @@ -201,7 +179,7 @@ func TestH264EmptyPacket(t *testing.T) { }, } - err = p.Process(unit, false) + err = p.ProcessUnit(unit) require.NoError(t, err) // if all NALUs have been removed, no RTP packets must be generated. diff --git a/internal/formatprocessor/h265.go b/internal/formatprocessor/h265.go index f8fe646f..f2f9079a 100644 --- a/internal/formatprocessor/h265.go +++ b/internal/formatprocessor/h265.go @@ -242,83 +242,105 @@ func (t *formatProcessorH265) remuxAccessUnit(au [][]byte) [][]byte { return filteredNALUs } -func (t *formatProcessorH265) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := u.(*unit.H265) - - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - t.updateTrackParametersFromRTPPacket(pkt) - - if t.encoder == nil { - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - // RTP packets exceed maximum size: start re-encoding them - if pkt.MarshalSize() > t.udpMaxPayloadSize { - v1 := pkt.SSRC - v2 := pkt.SequenceNumber - err := t.createEncoder(&v1, &v2) - if err != nil { - return err - } - } +func (t *formatProcessorH265) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.H265) + + t.updateTrackParametersFromAU(u.AU) + u.AU = t.remuxAccessUnit(u.AU) + + if u.AU != nil { + pkts, err := t.encoder.Encode(u.AU) + if err != nil { + return err } - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts + } + + return nil +} + +func (t *formatProcessorH265) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.H265{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + t.updateTrackParametersFromRTPPacket(pkt) + + if t.encoder == nil { + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + // RTP packets exceed maximum size: start re-encoding them + if pkt.MarshalSize() > t.udpMaxPayloadSize { + v1 := pkt.SSRC + v2 := pkt.SequenceNumber + err := t.createEncoder(&v1, &v2) + if err != nil { + return nil, err } + } + } - au, err := t.decoder.Decode(pkt) + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() if err != nil { - if err == rtph265.ErrNonStartingPacketAndNoPrevious || err == rtph265.ErrMorePacketsNeeded { - if t.encoder != nil { - tunit.RTPPackets = nil - } - return nil - } - return err + return nil, err } + } + + au, err := t.decoder.Decode(pkt) - tunit.AU = t.remuxAccessUnit(au) + if t.encoder != nil { + u.RTPPackets = nil } - // route packet as is - if t.encoder == nil { - return nil + if err != nil { + if err == rtph265.ErrNonStartingPacketAndNoPrevious || err == rtph265.ErrMorePacketsNeeded { + return u, nil + } + return nil, err } - } else { - t.updateTrackParametersFromAU(tunit.AU) - tunit.AU = t.remuxAccessUnit(tunit.AU) + + u.AU = t.remuxAccessUnit(au) + } + + // route packet as is + if t.encoder == nil { + return u, nil } // encode into RTP - if len(tunit.AU) != 0 { - pkts, err := t.encoder.Encode(tunit.AU) + if len(u.AU) != 0 { + pkts, err := t.encoder.Encode(u.AU) if err != nil { - return err + return nil, err } - setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS) - tunit.RTPPackets = pkts - } else { - tunit.RTPPackets = nil - } - return nil -} + for _, newPKT := range pkts { + newPKT.Timestamp = pkt.Timestamp + } -func (t *formatProcessorH265) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.H265{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, - PTS: pts, - }, + u.RTPPackets = pkts } + + return u, nil } diff --git a/internal/formatprocessor/h265_test.go b/internal/formatprocessor/h265_test.go index 078ca66b..c4b96cc1 100644 --- a/internal/formatprocessor/h265_test.go +++ b/internal/formatprocessor/h265_test.go @@ -3,6 +3,7 @@ package formatprocessor import ( "bytes" "testing" + "time" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/codecs/h265" @@ -26,46 +27,29 @@ func TestH265DynamicParams(t *testing.T) { pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}) require.NoError(t, err) - data := &unit.H265{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - } - err = p.Process(data, true) + data, err := p.ProcessRTPPacket(pkts[0], time.Time{}, 0, true) require.NoError(t, err) require.Equal(t, [][]byte{ {byte(h265.NALUType_CRA_NUT) << 1, 0}, - }, data.AU) + }, data.(*unit.H265).AU) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}}) require.NoError(t, err) - err = p.Process(&unit.H265{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - }, false) + _, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false) require.NoError(t, err) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}}) require.NoError(t, err) - err = p.Process(&unit.H265{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - }, false) + _, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false) require.NoError(t, err) pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}}) require.NoError(t, err) - err = p.Process(&unit.H265{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - }, false) + _, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false) require.NoError(t, err) require.Equal(t, []byte{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}, forma.VPS) @@ -75,12 +59,7 @@ func TestH265DynamicParams(t *testing.T) { pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}) require.NoError(t, err) - data = &unit.H265{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkts[0]}, - }, - } - err = p.Process(data, true) + data, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, true) require.NoError(t, err) require.Equal(t, [][]byte{ @@ -88,7 +67,7 @@ func TestH265DynamicParams(t *testing.T) { {byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}, {byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}, {byte(h265.NALUType_CRA_NUT) << 1, 0}, - }, data.AU) + }, data.(*unit.H265).AU) } func TestH265OversizedPackets(t *testing.T) { @@ -130,15 +109,10 @@ func TestH265OversizedPackets(t *testing.T) { Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4), }, } { - data := &unit.H265{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkt}, - }, - } - err = p.Process(data, false) + data, err := p.ProcessRTPPacket(pkt, time.Time{}, 0, false) require.NoError(t, err) - out = append(out, data.RTPPackets...) + out = append(out, data.GetRTPPackets()...) } require.Equal(t, []*rtp.Packet{ @@ -200,7 +174,7 @@ func TestH265EmptyPacket(t *testing.T) { }, } - err = p.Process(unit, false) + err = p.ProcessUnit(unit) require.NoError(t, err) // if all NALUs have been removed, no RTP packets must be generated. diff --git a/internal/formatprocessor/mpeg1_audio.go b/internal/formatprocessor/mpeg1_audio.go new file mode 100644 index 00000000..0cbecd1f --- /dev/null +++ b/internal/formatprocessor/mpeg1_audio.go @@ -0,0 +1,112 @@ +package formatprocessor //nolint:dupl + +import ( + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg1audio" + "github.com/pion/rtp" + + "github.com/bluenviron/mediamtx/internal/unit" +) + +type formatProcessorMPEG1Audio struct { + udpMaxPayloadSize int + format *format.MPEG1Audio + encoder *rtpmpeg1audio.Encoder + decoder *rtpmpeg1audio.Decoder +} + +func newMPEG1Audio( + udpMaxPayloadSize int, + forma *format.MPEG1Audio, + generateRTPPackets bool, +) (*formatProcessorMPEG1Audio, error) { + t := &formatProcessorMPEG1Audio{ + udpMaxPayloadSize: udpMaxPayloadSize, + format: forma, + } + + if generateRTPPackets { + err := t.createEncoder() + if err != nil { + return nil, err + } + } + + return t, nil +} + +func (t *formatProcessorMPEG1Audio) createEncoder() error { + t.encoder = &rtpmpeg1audio.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + } + return t.encoder.Init() +} + +func (t *formatProcessorMPEG1Audio) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.MPEG1Audio) + + pkts, err := t.encoder.Encode(u.Frames) + if err != nil { + return err + } + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts + + return nil +} + +func (t *formatProcessorMPEG1Audio) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.MPEG1Audio{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + frames, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpmpeg1audio.ErrNonStartingPacketAndNoPrevious || err == rtpmpeg1audio.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.Frames = frames + } + + // route packet as is + return u, nil +} diff --git a/internal/formatprocessor/mpeg1audio.go b/internal/formatprocessor/mpeg1audio.go deleted file mode 100644 index b30f4b81..00000000 --- a/internal/formatprocessor/mpeg1audio.go +++ /dev/null @@ -1,107 +0,0 @@ -package formatprocessor //nolint:dupl - -import ( - "fmt" - "time" - - "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg1audio" - "github.com/pion/rtp" - - "github.com/bluenviron/mediamtx/internal/unit" -) - -type formatProcessorMPEG1Audio struct { - udpMaxPayloadSize int - format *format.MPEG1Audio - encoder *rtpmpeg1audio.Encoder - decoder *rtpmpeg1audio.Decoder -} - -func newMPEG1Audio( - udpMaxPayloadSize int, - forma *format.MPEG1Audio, - generateRTPPackets bool, -) (*formatProcessorMPEG1Audio, error) { - t := &formatProcessorMPEG1Audio{ - udpMaxPayloadSize: udpMaxPayloadSize, - format: forma, - } - - if generateRTPPackets { - err := t.createEncoder() - if err != nil { - return nil, err - } - } - - return t, nil -} - -func (t *formatProcessorMPEG1Audio) createEncoder() error { - t.encoder = &rtpmpeg1audio.Encoder{ - PayloadMaxSize: t.udpMaxPayloadSize - 12, - } - return t.encoder.Init() -} - -func (t *formatProcessorMPEG1Audio) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := u.(*unit.MPEG1Audio) - - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - if pkt.MarshalSize() > t.udpMaxPayloadSize { - return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", - pkt.MarshalSize(), t.udpMaxPayloadSize) - } - - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } - } - - frames, err := t.decoder.Decode(pkt) - if err != nil { - if err == rtpmpeg1audio.ErrNonStartingPacketAndNoPrevious || err == rtpmpeg1audio.ErrMorePacketsNeeded { - return nil - } - return err - } - - tunit.Frames = frames - } - - // route packet as is - return nil - } - - // encode into RTP - pkts, err := t.encoder.Encode(tunit.Frames) - if err != nil { - return err - } - setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS) - tunit.RTPPackets = pkts - - return nil -} - -func (t *formatProcessorMPEG1Audio) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.MPEG1Audio{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{pkt}, - NTP: ntp, - PTS: pts, - }, - } -} diff --git a/internal/formatprocessor/mpeg4audio_generic.go b/internal/formatprocessor/mpeg4_audio_generic.go similarity index 54% rename from internal/formatprocessor/mpeg4audio_generic.go rename to internal/formatprocessor/mpeg4_audio_generic.go index ece42cbe..9d1410c6 100644 --- a/internal/formatprocessor/mpeg4audio_generic.go +++ b/internal/formatprocessor/mpeg4_audio_generic.go @@ -49,63 +49,68 @@ func (t *formatProcessorMPEG4AudioGeneric) createEncoder() error { return t.encoder.Init() } -func (t *formatProcessorMPEG4AudioGeneric) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := u.(*unit.MPEG4AudioGeneric) +func (t *formatProcessorMPEG4AudioGeneric) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.MPEG4AudioGeneric) - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - if pkt.MarshalSize() > t.udpMaxPayloadSize { - return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", - pkt.MarshalSize(), t.udpMaxPayloadSize) - } - - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil || true { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } - } - - aus, err := t.decoder.Decode(pkt) - if err != nil { - if err == rtpmpeg4audio.ErrMorePacketsNeeded { - return nil - } - return err - } - - tunit.AUs = aus - } - - // route packet as is - return nil - } - - // encode into RTP - pkts, err := t.encoder.Encode(tunit.AUs) + pkts, err := t.encoder.Encode(u.AUs) if err != nil { return err } - setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS) - tunit.RTPPackets = pkts + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts return nil } -func (t *formatProcessorMPEG4AudioGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.MPEG4AudioGeneric{ +func (t *formatProcessorMPEG4AudioGeneric) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.MPEG4AudioGeneric{ Base: unit.Base{ RTPPackets: []*rtp.Packet{pkt}, NTP: ntp, PTS: pts, }, } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + aus, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpmpeg4audio.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.AUs = aus + } + + // route packet as is + return u, nil } diff --git a/internal/formatprocessor/mpeg4audio_latm.go b/internal/formatprocessor/mpeg4_audio_latm.go similarity index 51% rename from internal/formatprocessor/mpeg4audio_latm.go rename to internal/formatprocessor/mpeg4_audio_latm.go index b7f2a8c0..4562468e 100644 --- a/internal/formatprocessor/mpeg4audio_latm.go +++ b/internal/formatprocessor/mpeg4_audio_latm.go @@ -45,63 +45,68 @@ func (t *formatProcessorMPEG4AudioLATM) createEncoder() error { return t.encoder.Init() } -func (t *formatProcessorMPEG4AudioLATM) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := u.(*unit.MPEG4AudioLATM) +func (t *formatProcessorMPEG4AudioLATM) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.MPEG4AudioLATM) - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - if pkt.MarshalSize() > t.udpMaxPayloadSize { - return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", - pkt.MarshalSize(), t.udpMaxPayloadSize) - } - - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } - } - - au, err := t.decoder.Decode(pkt) - if err != nil { - if err == rtpmpeg4audiolatm.ErrMorePacketsNeeded { - return nil - } - return err - } - - tunit.AU = au - } - - // route packet as is - return nil - } - - // encode into RTP - pkts, err := t.encoder.Encode(tunit.AU) + pkts, err := t.encoder.Encode(u.AU) if err != nil { return err } - setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS) - tunit.RTPPackets = pkts + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts return nil } -func (t *formatProcessorMPEG4AudioLATM) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.MPEG4AudioLATM{ +func (t *formatProcessorMPEG4AudioLATM) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.MPEG4AudioLATM{ Base: unit.Base{ RTPPackets: []*rtp.Packet{pkt}, NTP: ntp, PTS: pts, }, } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + au, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpmpeg4audiolatm.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.AU = au + } + + // route packet as is + return u, nil } diff --git a/internal/formatprocessor/opus.go b/internal/formatprocessor/opus.go index 04bf21b4..f2cd348a 100644 --- a/internal/formatprocessor/opus.go +++ b/internal/formatprocessor/opus.go @@ -47,67 +47,71 @@ func (t *formatProcessorOpus) createEncoder() error { return t.encoder.Init() } -func (t *formatProcessorOpus) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := u.(*unit.Opus) +func (t *formatProcessorOpus) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.Opus) - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - if pkt.MarshalSize() > t.udpMaxPayloadSize { - return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", - pkt.MarshalSize(), t.udpMaxPayloadSize) - } - - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } - } - - packet, err := t.decoder.Decode(pkt) - if err != nil { - return err - } - - tunit.Packets = [][]byte{packet} - } - - // route packet as is - return nil - } - - // encode into RTP var rtpPackets []*rtp.Packet //nolint:prealloc - pts := tunit.PTS - for _, packet := range tunit.Packets { + pts := u.PTS + + for _, packet := range u.Packets { pkt, err := t.encoder.Encode(packet) if err != nil { return err } - setTimestamp([]*rtp.Packet{pkt}, tunit.RTPPackets, t.format.ClockRate(), pts) + + ts := uint32(multiplyAndDivide(pts, time.Duration(t.format.ClockRate()), time.Second)) + pkt.Timestamp = ts + rtpPackets = append(rtpPackets, pkt) pts += opus.PacketDuration(packet) } - tunit.RTPPackets = rtpPackets + u.RTPPackets = rtpPackets return nil } -func (t *formatProcessorOpus) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.Opus{ +func (t *formatProcessorOpus) ProcessRTPPacket( + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.Opus{ Base: unit.Base{ RTPPackets: []*rtp.Packet{pkt}, NTP: ntp, PTS: pts, }, } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + packet, err := t.decoder.Decode(pkt) + if err != nil { + return nil, err + } + + u.Packets = [][]byte{packet} + } + + // route packet as is + return u, nil } diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 6125bd6d..6bbfafc6 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -18,25 +18,18 @@ func multiplyAndDivide(v, m, d time.Duration) time.Duration { return (secs*m + dec*m/d) } -func setTimestamp(newPackets []*rtp.Packet, oldPackets []*rtp.Packet, clockRate int, pts time.Duration) { - if oldPackets != nil { // get timestamp from old packets - for _, pkt := range newPackets { - pkt.Timestamp = oldPackets[0].Timestamp - } - } else { // get timestamp from PTS - for _, pkt := range newPackets { - pkt.Timestamp = uint32(multiplyAndDivide(pts, time.Duration(clockRate), time.Second)) - } - } -} - // Processor cleans and normalizes streams. type Processor interface { - // cleans and normalizes a data unit. - Process(unit.Unit, bool) error - - // wraps a RTP packet into a Unit. - UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit + // process a Unit. + ProcessUnit(unit.Unit) error + + // process a RTP packet and convert it into a unit. + ProcessRTPPacket( + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, + ) (Unit, error) } // New allocates a Processor. diff --git a/internal/formatprocessor/vp8.go b/internal/formatprocessor/vp8.go index 6b964e96..70f98ae6 100644 --- a/internal/formatprocessor/vp8.go +++ b/internal/formatprocessor/vp8.go @@ -46,63 +46,68 @@ func (t *formatProcessorVP8) createEncoder() error { return t.encoder.Init() } -func (t *formatProcessorVP8) Process(y unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := y.(*unit.VP8) +func (t *formatProcessorVP8) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.VP8) - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - if pkt.MarshalSize() > t.udpMaxPayloadSize { - return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", - pkt.MarshalSize(), t.udpMaxPayloadSize) - } - - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } - } - - frame, err := t.decoder.Decode(pkt) - if err != nil { - if err == rtpvp8.ErrNonStartingPacketAndNoPrevious || err == rtpvp8.ErrMorePacketsNeeded { - return nil - } - return err - } - - tunit.Frame = frame - } - - // route packet as is - return nil - } - - // encode into RTP - pkts, err := t.encoder.Encode(tunit.Frame) + pkts, err := t.encoder.Encode(u.Frame) if err != nil { return err } - setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS) - tunit.RTPPackets = pkts + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts return nil } -func (t *formatProcessorVP8) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.VP8{ +func (t *formatProcessorVP8) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.VP8{ Base: unit.Base{ RTPPackets: []*rtp.Packet{pkt}, NTP: ntp, PTS: pts, }, } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + frame, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpvp8.ErrNonStartingPacketAndNoPrevious || err == rtpvp8.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.Frame = frame + } + + // route packet as is + return u, nil } diff --git a/internal/formatprocessor/vp9.go b/internal/formatprocessor/vp9.go index 9b815e69..9db50a4f 100644 --- a/internal/formatprocessor/vp9.go +++ b/internal/formatprocessor/vp9.go @@ -46,63 +46,68 @@ func (t *formatProcessorVP9) createEncoder() error { return t.encoder.Init() } -func (t *formatProcessorVP9) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := u.(*unit.VP9) +func (t *formatProcessorVP9) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.VP9) - if tunit.RTPPackets != nil { - pkt := tunit.RTPPackets[0] - - // remove padding - pkt.Header.Padding = false - pkt.PaddingSize = 0 - - if pkt.MarshalSize() > t.udpMaxPayloadSize { - return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", - pkt.MarshalSize(), t.udpMaxPayloadSize) - } - - // decode from RTP - if hasNonRTSPReaders || t.decoder != nil { - if t.decoder == nil { - var err error - t.decoder, err = t.format.CreateDecoder() - if err != nil { - return err - } - } - - frame, err := t.decoder.Decode(pkt) - if err != nil { - if err == rtpvp9.ErrNonStartingPacketAndNoPrevious || err == rtpvp9.ErrMorePacketsNeeded { - return nil - } - return err - } - - tunit.Frame = frame - } - - // route packet as is - return nil - } - - // encode into RTP - pkts, err := t.encoder.Encode(tunit.Frame) + pkts, err := t.encoder.Encode(u.Frame) if err != nil { return err } - setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS) - tunit.RTPPackets = pkts + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts return nil } -func (t *formatProcessorVP9) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit { - return &unit.VP9{ +func (t *formatProcessorVP9) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.VP9{ Base: unit.Base{ RTPPackets: []*rtp.Packet{pkt}, NTP: ntp, PTS: pts, }, } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + frame, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpvp9.ErrNonStartingPacketAndNoPrevious || err == rtpvp9.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.Frame = frame + } + + // route packet as is + return u, nil } diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 0cfa14a3..0899a9e7 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -57,14 +57,34 @@ func (sf *streamFormat) removeReader(r *asyncwriter.Writer) { } func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) { + err := sf.proc.ProcessUnit(u) + if err != nil { + sf.decodeErrLogger.Log(logger.Warn, err.Error()) + return + } + + sf.writeUnitInner(s, medi, u) +} + +func (sf *streamFormat) writeRTPPacket( + s *Stream, + medi *description.Media, + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, +) { hasNonRTSPReaders := len(sf.readers) > 0 - err := sf.proc.Process(u, hasNonRTSPReaders) + u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders) if err != nil { sf.decodeErrLogger.Log(logger.Warn, err.Error()) return } + sf.writeUnitInner(s, medi, u) +} + +func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u unit.Unit) { atomic.AddUint64(s.bytesReceived, unitSize(u)) if s.rtspStream != nil { @@ -86,13 +106,3 @@ func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Uni }) } } - -func (sf *streamFormat) writeRTPPacket( - s *Stream, - medi *description.Media, - pkt *rtp.Packet, - ntp time.Time, - pts time.Duration, -) { - sf.writeUnit(s, medi, sf.proc.UnitForRTPPacket(pkt, ntp, pts)) -} diff --git a/internal/unit/mpeg1audio.go b/internal/unit/mpeg1_audio.go similarity index 100% rename from internal/unit/mpeg1audio.go rename to internal/unit/mpeg1_audio.go diff --git a/internal/unit/mpeg4audio_generic.go b/internal/unit/mpeg4_audio_generic.go similarity index 100% rename from internal/unit/mpeg4audio_generic.go rename to internal/unit/mpeg4_audio_generic.go diff --git a/internal/unit/mpeg4audio_latm.go b/internal/unit/mpeg4_audio_latm.go similarity index 100% rename from internal/unit/mpeg4audio_latm.go rename to internal/unit/mpeg4_audio_latm.go