Browse Source

implement resizing of oversized H265 RTP packets

pull/1242/head
aler9 2 years ago
parent
commit
6524130ab9
  1. 2
      go.mod
  2. 4
      go.sum
  3. 22
      internal/core/formatprocessor_h264.go
  4. 41
      internal/core/formatprocessor_h265.go
  5. 18
      internal/core/formatprocessor_mpeg4audio.go
  6. 488
      internal/core/rtsp_source_test.go

2
go.mod

@ -5,7 +5,7 @@ go 1.18 @@ -5,7 +5,7 @@ go 1.18
require (
code.cloudfoundry.org/bytefmt v0.0.0-20211005130812-5bb3c17173e5
github.com/abema/go-mp4 v0.8.0
github.com/aler9/gortsplib/v2 v2.0.0-20221214135702-6141afcfc4c5
github.com/aler9/gortsplib/v2 v2.0.0-20221214165733-d43cb0455e33
github.com/asticode/go-astits v1.10.1-0.20220319093903-4abe66a9b757
github.com/fsnotify/fsnotify v1.4.9
github.com/gin-gonic/gin v1.8.1

4
go.sum

@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib/v2 v2.0.0-20221214135702-6141afcfc4c5 h1:QHF5GXc8Q23sCYYAfD24mwOJt1CX9by3KY5gYolYEaw=
github.com/aler9/gortsplib/v2 v2.0.0-20221214135702-6141afcfc4c5/go.mod h1:zJ+fWtakOMN6cKV169EMNVBLPTITArrJKu/fyM+dov8=
github.com/aler9/gortsplib/v2 v2.0.0-20221214165733-d43cb0455e33 h1:7r2VpQoRSYOCU9qSXit9A4RKI7ufdI5UAxDHHjZ1Occ=
github.com/aler9/gortsplib/v2 v2.0.0-20221214165733-d43cb0455e33/go.mod h1:zJ+fWtakOMN6cKV169EMNVBLPTITArrJKu/fyM+dov8=
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9ToSVV7sRTBYZ1GGOZUpq4+5H3SN0UZq4=
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=

22
internal/core/formatprocessor_h264.go

@ -188,17 +188,7 @@ func (t *formatProcessorH264) remuxNALUs(nalus [][]byte) [][]byte { @@ -188,17 +188,7 @@ func (t *formatProcessorH264) remuxNALUs(nalus [][]byte) [][]byte {
return filteredNALUs
}
func (t *formatProcessorH264) generateRTPPackets(tdata *dataH264) error {
pkts, err := t.encoder.Encode(tdata.nalus, tdata.pts)
if err != nil {
return err
}
tdata.rtpPackets = pkts
return nil
}
func (t *formatProcessorH264) process(dat data, hasNonRTSPReaders bool) error {
func (t *formatProcessorH264) process(dat data, hasNonRTSPReaders bool) error { //nolint:dupl
tdata := dat.(*dataH264)
if tdata.rtpPackets != nil {
@ -210,7 +200,7 @@ func (t *formatProcessorH264) process(dat data, hasNonRTSPReaders bool) error { @@ -210,7 +200,7 @@ func (t *formatProcessorH264) process(dat data, hasNonRTSPReaders bool) error {
pkt.Header.Padding = false
pkt.PaddingSize = 0
// we need to re-encode since RTP packets exceed maximum size
// RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > maxPacketSize {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
@ -258,5 +248,11 @@ func (t *formatProcessorH264) process(dat data, hasNonRTSPReaders bool) error { @@ -258,5 +248,11 @@ func (t *formatProcessorH264) process(dat data, hasNonRTSPReaders bool) error {
tdata.nalus = t.remuxNALUs(tdata.nalus)
}
return t.generateRTPPackets(tdata)
pkts, err := t.encoder.Encode(tdata.nalus, tdata.pts)
if err != nil {
return err
}
tdata.rtpPackets = pkts
return nil
}

41
internal/core/formatprocessor_h265.go

@ -2,7 +2,6 @@ package core @@ -2,7 +2,6 @@ package core
import (
"bytes"
"fmt"
"time"
"github.com/aler9/gortsplib/v2/pkg/format"
@ -137,17 +136,7 @@ func (t *formatProcessorH265) remuxNALUs(nalus [][]byte) [][]byte { @@ -137,17 +136,7 @@ func (t *formatProcessorH265) remuxNALUs(nalus [][]byte) [][]byte {
return nalus
}
func (t *formatProcessorH265) generateRTPPackets(tdata *dataH265) error {
pkts, err := t.encoder.Encode(tdata.nalus, tdata.pts)
if err != nil {
return err
}
tdata.rtpPackets = pkts
return nil
}
func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error {
func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error { //nolint:dupl
tdata := dat.(*dataH265)
if tdata.rtpPackets != nil {
@ -159,10 +148,19 @@ func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error { @@ -159,10 +148,19 @@ func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error {
pkt.Header.Padding = false
pkt.PaddingSize = 0
// TODO: re-encode if oversized instead of printing errors
// RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > maxPacketSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), maxPacketSize)
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
v3 := pkt.Timestamp
t.encoder = &rtph265.Encoder{
PayloadType: pkt.PayloadType,
SSRC: &v1,
InitialSequenceNumber: &v2,
InitialTimestamp: &v3,
MaxDONDiff: t.format.MaxDONDiff,
}
t.encoder.Init()
}
}
@ -172,7 +170,10 @@ func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error { @@ -172,7 +170,10 @@ func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error {
t.decoder = t.format.CreateDecoder()
}
nalus, pts, err := t.decoder.Decode(pkt)
tdata.rtpPackets = nil
// DecodeUntilMarker() is necessary, otherwise Encode() generates partial groups
nalus, pts, err := t.decoder.DecodeUntilMarker(pkt)
if err != nil {
if err == rtph265.ErrNonStartingPacketAndNoPrevious || err == rtph265.ErrMorePacketsNeeded {
return nil
@ -195,5 +196,11 @@ func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error { @@ -195,5 +196,11 @@ func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error {
tdata.nalus = t.remuxNALUs(tdata.nalus)
}
return t.generateRTPPackets(tdata)
pkts, err := t.encoder.Encode(tdata.nalus, tdata.pts)
if err != nil {
return err
}
tdata.rtpPackets = pkts
return nil
}

18
internal/core/formatprocessor_mpeg4audio.go

@ -45,16 +45,6 @@ func newFormatProcessorMPEG4Audio( @@ -45,16 +45,6 @@ func newFormatProcessorMPEG4Audio(
return t, nil
}
func (t *formatProcessorMPEG4Audio) generateRTPPackets(tdata *dataMPEG4Audio) error {
pkts, err := t.encoder.Encode(tdata.aus, tdata.pts)
if err != nil {
return err
}
tdata.rtpPackets = pkts
return nil
}
func (t *formatProcessorMPEG4Audio) process(dat data, hasNonRTSPReaders bool) error {
tdata := dat.(*dataMPEG4Audio)
@ -92,5 +82,11 @@ func (t *formatProcessorMPEG4Audio) process(dat data, hasNonRTSPReaders bool) er @@ -92,5 +82,11 @@ func (t *formatProcessorMPEG4Audio) process(dat data, hasNonRTSPReaders bool) er
return nil
}
return t.generateRTPPackets(tdata)
pkts, err := t.encoder.Encode(tdata.aus, tdata.pts)
if err != nil {
return err
}
tdata.rtpPackets = pkts
return nil
}

488
internal/core/rtsp_source_test.go

@ -516,238 +516,314 @@ func TestRTSPSourceRemovePadding(t *testing.T) { @@ -516,238 +516,314 @@ func TestRTSPSourceRemovePadding(t *testing.T) {
}
func TestRTSPSourceOversizedPackets(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:8555")
require.NoError(t, err)
defer l.Close()
for _, ca := range []string{"h264", "h265"} {
t.Run(ca, func(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:8555")
require.NoError(t, err)
defer l.Close()
connected := make(chan struct{})
connected := make(chan struct{})
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
nconn, err := l.Accept()
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
nconn, err := l.Accept()
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
req, err := conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
})
require.NoError(t, err)
req, err := conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
})
require.NoError(t, err)
medias := media.Medias{testMediaH264}
byts, _ := medias.Marshal(false).Marshal()
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: byts,
})
require.NoError(t, err)
medias := media.Medias{testMediaH264}
byts, _ := medias.Marshal(false).Marshal()
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
var inTH headers.Transport
err = inTH.Unmarshal(req.Header["Transport"])
require.NoError(t, err)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: byts,
})
require.NoError(t, err)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: inTH.InterleavedIDs,
}.Marshal(),
},
})
require.NoError(t, err)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
var inTH headers.Transport
err = inTH.Unmarshal(req.Header["Transport"])
require.NoError(t, err)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: inTH.InterleavedIDs,
}.Marshal(),
},
})
require.NoError(t, err)
<-connected
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
byts, _ = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}.Marshal()
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 0,
Payload: byts,
}, make([]byte, 1024))
require.NoError(t, err)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err)
byts, _ = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: append([]byte{0x1c, 0b10000000}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4)...),
}.Marshal()
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 0,
Payload: byts,
}, make([]byte, 2048))
require.NoError(t, err)
<-connected
var tosend []*rtp.Packet
if ca == "h264" {
tosend = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: append([]byte{0x1c, 0b10000000}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4)...),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04},
},
}
} else {
tosend = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4),
},
}
}
byts, _ = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04},
}.Marshal()
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 0,
Payload: byts,
}, make([]byte, 1024))
require.NoError(t, err)
for _, pkt := range tosend {
byts, _ = pkt.Marshal()
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 0,
Payload: byts,
}, make([]byte, 2048))
require.NoError(t, err)
}
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err)
}()
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err)
}()
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://127.0.0.1:8555/teststream\n" +
" sourceProtocol: tcp\n")
require.Equal(t, true, ok)
defer p.Close()
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://127.0.0.1:8555/teststream\n" +
" sourceProtocol: tcp\n")
require.Equal(t, true, ok)
defer p.Close()
time.Sleep(1 * time.Second)
time.Sleep(1 * time.Second)
packetRecv := make(chan struct{})
i := 0
c := gortsplib.Client{}
c := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/proxied")
require.NoError(t, err)
u, err := url.Parse("rtsp://127.0.0.1:8554/proxied")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
require.NoError(t, err)
packetRecv := make(chan struct{})
i := 0
var expected []*rtp.Packet
if ca == "h264" {
expected = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
append([]byte{0x1c, 0x80}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364)...),
[]byte{0x01, 0x02}...,
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
[]byte{0x1c, 0x40, 0x03, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 136)...,
),
},
}
} else {
expected = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
append([]byte{0x1c, 0x81, 0x02, 0x03, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 363)...),
[]byte{0x01, 0x02, 0x03}...,
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
[]byte{0x1c, 0x41, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 135)...,
),
},
}
}
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
switch i {
case 0:
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, pkt)
case 1:
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
append([]byte{0x1c, 0x80}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364)...),
[]byte{0x01, 0x02}...,
),
}, pkt)
case 2:
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
[]byte{0x1c, 0x40, 0x03, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 136)...,
),
}, pkt)
close(packetRecv)
}
i++
})
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
require.Equal(t, expected[i], pkt)
i++
if i >= len(expected) {
close(packetRecv)
}
})
_, err = c.Play(nil)
require.NoError(t, err)
_, err = c.Play(nil)
require.NoError(t, err)
close(connected)
<-packetRecv
close(connected)
<-packetRecv
})
}
}

Loading…
Cancel
Save