Browse Source

add option to set max size of outgoing UDP packets (#1588) (#1601)

pull/1633/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
5b61983fa6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      apidocs/openapi.yaml
  2. 9
      internal/conf/conf.go
  3. 2
      internal/core/core.go
  4. 10
      internal/core/path.go
  5. 4
      internal/core/path_manager.go
  6. 3
      internal/core/stream.go
  7. 8
      internal/core/stream_format.go
  8. 7
      internal/core/stream_media.go
  9. 3
      internal/core/udp_source.go
  10. 23
      internal/formatprocessor/generic.go
  11. 2
      internal/formatprocessor/generic_test.go
  12. 6
      internal/formatprocessor/h264.go
  13. 6
      internal/formatprocessor/h264_test.go
  14. 6
      internal/formatprocessor/h265.go
  15. 6
      internal/formatprocessor/h265_test.go
  16. 7
      internal/formatprocessor/mpeg4audio.go
  17. 7
      internal/formatprocessor/opus.go
  18. 20
      internal/formatprocessor/processor.go
  19. 7
      internal/formatprocessor/vp8.go
  20. 7
      internal/formatprocessor/vp9.go
  21. 3
      rtsp-simple-server.yml

2
apidocs/openapi.yaml

@ -33,6 +33,8 @@ components: @@ -33,6 +33,8 @@ components:
type: string
readBufferCount:
type: integer
udpMaxPayloadSize:
type: integer
externalAuthenticationURL:
type: string
api:

9
internal/conf/conf.go

@ -175,6 +175,7 @@ type Conf struct { @@ -175,6 +175,7 @@ type Conf struct {
ReadTimeout StringDuration `json:"readTimeout"`
WriteTimeout StringDuration `json:"writeTimeout"`
ReadBufferCount int `json:"readBufferCount"`
UDPMaxPayloadSize int `json:"udpMaxPayloadSize"`
ExternalAuthenticationURL string `json:"externalAuthenticationURL"`
API bool `json:"api"`
APIAddress string `json:"apiAddress"`
@ -285,7 +286,13 @@ func (conf *Conf) CheckAndFillMissing() error { @@ -285,7 +286,13 @@ func (conf *Conf) CheckAndFillMissing() error {
conf.ReadBufferCount = 512
}
if (conf.ReadBufferCount & (conf.ReadBufferCount - 1)) != 0 {
return fmt.Errorf("'ReadBufferCount' must be a power of two")
return fmt.Errorf("'readBufferCount' must be a power of two")
}
if conf.UDPMaxPayloadSize == 0 {
conf.UDPMaxPayloadSize = 1500
}
if conf.UDPMaxPayloadSize > 1500 {
return fmt.Errorf("'udpMaxPayloadSize' must be less than 1500")
}
if conf.ExternalAuthenticationURL != "" {
if !strings.HasPrefix(conf.ExternalAuthenticationURL, "http://") &&

2
internal/core/core.go

@ -244,6 +244,7 @@ func (p *Core) createResources(initial bool) error { @@ -244,6 +244,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.ReadTimeout,
p.conf.WriteTimeout,
p.conf.ReadBufferCount,
p.conf.UDPMaxPayloadSize,
p.conf.Paths,
p.externalCmdPool,
p.metrics,
@ -486,6 +487,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -486,6 +487,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize ||
closeMetrics
if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.pathManager.confReload(newConf.Paths)

10
internal/core/path.go

@ -191,6 +191,7 @@ type path struct { @@ -191,6 +191,7 @@ type path struct {
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
udpMaxPayloadSize int
confName string
conf *conf.PathConf
name string
@ -240,6 +241,7 @@ func newPath( @@ -240,6 +241,7 @@ func newPath(
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
udpMaxPayloadSize int,
confName string,
cnf *conf.PathConf,
name string,
@ -255,6 +257,7 @@ func newPath( @@ -255,6 +257,7 @@ func newPath(
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
udpMaxPayloadSize: udpMaxPayloadSize,
confName: confName,
conf: cnf,
name: name,
@ -632,7 +635,12 @@ func (pa *path) onDemandPublisherStop() { @@ -632,7 +635,12 @@ func (pa *path) onDemandPublisherStop() {
}
func (pa *path) sourceSetReady(medias media.Medias, allocateEncoder bool) error {
stream, err := newStream(medias, allocateEncoder, pa.bytesReceived)
stream, err := newStream(
pa.udpMaxPayloadSize,
medias,
allocateEncoder,
pa.bytesReceived,
)
if err != nil {
return err
}

4
internal/core/path_manager.go

@ -44,6 +44,7 @@ type pathManager struct { @@ -44,6 +44,7 @@ type pathManager struct {
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
udpMaxPayloadSize int
pathConfs map[string]*conf.PathConf
externalCmdPool *externalcmd.Pool
metrics *metrics
@ -74,6 +75,7 @@ func newPathManager( @@ -74,6 +75,7 @@ func newPathManager(
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
udpMaxPayloadSize int,
pathConfs map[string]*conf.PathConf,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
@ -86,6 +88,7 @@ func newPathManager( @@ -86,6 +88,7 @@ func newPathManager(
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
udpMaxPayloadSize: udpMaxPayloadSize,
pathConfs: pathConfs,
externalCmdPool: externalCmdPool,
metrics: metrics,
@ -303,6 +306,7 @@ func (pm *pathManager) createPath( @@ -303,6 +306,7 @@ func (pm *pathManager) createPath(
pm.readTimeout,
pm.writeTimeout,
pm.readBufferCount,
pm.udpMaxPayloadSize,
pathConfName,
pathConf,
name,

3
internal/core/stream.go

@ -15,6 +15,7 @@ type stream struct { @@ -15,6 +15,7 @@ type stream struct {
}
func newStream(
udpMaxPayloadSize int,
medias media.Medias,
generateRTPPackets bool,
bytesReceived *uint64,
@ -28,7 +29,7 @@ func newStream( @@ -28,7 +29,7 @@ func newStream(
for _, media := range s.rtspStream.Medias() {
var err error
s.smedias[media], err = newStreamMedia(media, generateRTPPackets)
s.smedias[media], err = newStreamMedia(udpMaxPayloadSize, media, generateRTPPackets)
if err != nil {
return nil, err
}

8
internal/core/stream_format.go

@ -16,8 +16,12 @@ type streamFormat struct { @@ -16,8 +16,12 @@ type streamFormat struct {
nonRTSPReaders map[reader]func(formatprocessor.Unit)
}
func newStreamFormat(forma format.Format, generateRTPPackets bool) (*streamFormat, error) {
proc, err := formatprocessor.New(forma, generateRTPPackets)
func newStreamFormat(
udpMaxPayloadSize int,
forma format.Format,
generateRTPPackets bool,
) (*streamFormat, error) {
proc, err := formatprocessor.New(udpMaxPayloadSize, forma, generateRTPPackets)
if err != nil {
return nil, err
}

7
internal/core/stream_media.go

@ -9,14 +9,17 @@ type streamMedia struct { @@ -9,14 +9,17 @@ type streamMedia struct {
formats map[format.Format]*streamFormat
}
func newStreamMedia(medi *media.Media, generateRTPPackets bool) (*streamMedia, error) {
func newStreamMedia(udpMaxPayloadSize int,
medi *media.Media,
generateRTPPackets bool,
) (*streamMedia, error) {
sm := &streamMedia{
formats: make(map[format.Format]*streamFormat),
}
for _, forma := range medi.Formats {
var err error
sm.formats[forma], err = newStreamFormat(forma, generateRTPPackets)
sm.formats[forma], err = newStreamFormat(udpMaxPayloadSize, forma, generateRTPPackets)
if err != nil {
return nil, err
}

3
internal/core/udp_source.go

@ -21,6 +21,7 @@ import ( @@ -21,6 +21,7 @@ import (
const (
multicastTTL = 16
udpMTU = 1472
)
var opusDurations = [32]int{
@ -127,7 +128,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -127,7 +128,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
}
midbuffer := make([]byte, 0, 1472) // UDP MTU
midbuffer := make([]byte, 0, udpMTU)
midbufferPos := 0
readPacket := func(buf []byte) (int, error) {

23
internal/formatprocessor/generic.go

@ -8,11 +8,6 @@ import ( @@ -8,11 +8,6 @@ import (
"github.com/pion/rtp"
)
const (
// 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header)
maxPacketSize = 1472
)
// UnitGeneric is a generic data unit.
type UnitGeneric struct {
RTPPackets []*rtp.Packet
@ -29,14 +24,22 @@ func (d *UnitGeneric) GetNTP() time.Time { @@ -29,14 +24,22 @@ func (d *UnitGeneric) GetNTP() time.Time {
return d.NTP
}
type formatProcessorGeneric struct{}
type formatProcessorGeneric struct {
udpMaxPayloadSize int
}
func newGeneric(forma format.Format, generateRTPPackets bool) (*formatProcessorGeneric, error) {
func newGeneric(
udpMaxPayloadSize int,
forma format.Format,
generateRTPPackets bool,
) (*formatProcessorGeneric, error) {
if generateRTPPackets {
return nil, fmt.Errorf("we don't know how to generate RTP packets of format %+v", forma)
}
return &formatProcessorGeneric{}, nil
return &formatProcessorGeneric{
udpMaxPayloadSize: udpMaxPayloadSize,
}, nil
}
func (t *formatProcessorGeneric) Process(unit Unit, hasNonRTSPReaders bool) error {
@ -48,9 +51,9 @@ func (t *formatProcessorGeneric) Process(unit Unit, hasNonRTSPReaders bool) erro @@ -48,9 +51,9 @@ func (t *formatProcessorGeneric) Process(unit Unit, hasNonRTSPReaders bool) erro
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > maxPacketSize {
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), maxPacketSize)
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
return nil

2
internal/formatprocessor/generic_test.go

@ -15,7 +15,7 @@ func TestGenericRemovePadding(t *testing.T) { @@ -15,7 +15,7 @@ func TestGenericRemovePadding(t *testing.T) {
}
forma.Init()
p, err := New(forma, false)
p, err := New(1472, forma, false)
require.NoError(t, err)
pkt := &rtp.Packet{

6
internal/formatprocessor/h264.go

@ -86,6 +86,7 @@ func (d *UnitH264) GetNTP() time.Time { @@ -86,6 +86,7 @@ func (d *UnitH264) GetNTP() time.Time {
}
type formatProcessorH264 struct {
udpMaxPayloadSize int
format *format.H264
encoder *rtph264.Encoder
@ -93,10 +94,12 @@ type formatProcessorH264 struct { @@ -93,10 +94,12 @@ type formatProcessorH264 struct {
}
func newH264(
udpMaxPayloadSize int,
forma *format.H264,
allocateEncoder bool,
) (*formatProcessorH264, error) {
t := &formatProcessorH264{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
@ -211,11 +214,12 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -211,11 +214,12 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error {
pkt.PaddingSize = 0
// RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > maxPacketSize {
if pkt.MarshalSize() > t.udpMaxPayloadSize {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
v3 := pkt.Timestamp
t.encoder = &rtph264.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: pkt.PayloadType,
SSRC: &v1,
InitialSequenceNumber: &v2,

6
internal/formatprocessor/h264_test.go

@ -16,7 +16,7 @@ func TestH264DynamicParams(t *testing.T) { @@ -16,7 +16,7 @@ func TestH264DynamicParams(t *testing.T) {
PacketizationMode: 1,
}
p, err := New(forma, false)
p, err := New(1472, forma, false)
require.NoError(t, err)
enc := forma.CreateEncoder()
@ -61,7 +61,7 @@ func TestH264OversizedPackets(t *testing.T) { @@ -61,7 +61,7 @@ func TestH264OversizedPackets(t *testing.T) {
PacketizationMode: 1,
}
p, err := New(forma, false)
p, err := New(1472, forma, false)
require.NoError(t, err)
var out []*rtp.Packet
@ -158,7 +158,7 @@ func TestH264EmptyPacket(t *testing.T) { @@ -158,7 +158,7 @@ func TestH264EmptyPacket(t *testing.T) {
PacketizationMode: 1,
}
p, err := New(forma, true)
p, err := New(1472, forma, true)
require.NoError(t, err)
unit := &UnitH264{

6
internal/formatprocessor/h265.go

@ -93,6 +93,7 @@ func (d *UnitH265) GetNTP() time.Time { @@ -93,6 +93,7 @@ func (d *UnitH265) GetNTP() time.Time {
}
type formatProcessorH265 struct {
udpMaxPayloadSize int
format *format.H265
encoder *rtph265.Encoder
@ -100,10 +101,12 @@ type formatProcessorH265 struct { @@ -100,10 +101,12 @@ type formatProcessorH265 struct {
}
func newH265(
udpMaxPayloadSize int,
forma *format.H265,
allocateEncoder bool,
) (*formatProcessorH265, error) {
t := &formatProcessorH265{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
@ -232,11 +235,12 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -232,11 +235,12 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error {
pkt.PaddingSize = 0
// RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > maxPacketSize {
if pkt.MarshalSize() > t.udpMaxPayloadSize {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
v3 := pkt.Timestamp
t.encoder = &rtph265.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: pkt.PayloadType,
SSRC: &v1,
InitialSequenceNumber: &v2,

6
internal/formatprocessor/h265_test.go

@ -15,7 +15,7 @@ func TestH265DynamicParams(t *testing.T) { @@ -15,7 +15,7 @@ func TestH265DynamicParams(t *testing.T) {
PayloadTyp: 96,
}
p, err := New(forma, false)
p, err := New(1472, forma, false)
require.NoError(t, err)
enc := forma.CreateEncoder()
@ -66,7 +66,7 @@ func TestH265OversizedPackets(t *testing.T) { @@ -66,7 +66,7 @@ func TestH265OversizedPackets(t *testing.T) {
PPS: []byte{byte(h265.NALUType_PPS_NUT) << 1, 16, 17, 18},
}
p, err := New(forma, false)
p, err := New(1472, forma, false)
require.NoError(t, err)
var out []*rtp.Packet
@ -150,7 +150,7 @@ func TestH265EmptyPacket(t *testing.T) { @@ -150,7 +150,7 @@ func TestH265EmptyPacket(t *testing.T) {
PayloadTyp: 96,
}
p, err := New(forma, true)
p, err := New(1472, forma, true)
require.NoError(t, err)
unit := &UnitH265{

7
internal/formatprocessor/mpeg4audio.go

@ -28,16 +28,19 @@ func (d *UnitMPEG4Audio) GetNTP() time.Time { @@ -28,16 +28,19 @@ func (d *UnitMPEG4Audio) GetNTP() time.Time {
}
type formatProcessorMPEG4Audio struct {
udpMaxPayloadSize int
format *format.MPEG4Audio
encoder *rtpmpeg4audio.Encoder
decoder *rtpmpeg4audio.Decoder
}
func newMPEG4Audio(
udpMaxPayloadSize int,
forma *format.MPEG4Audio,
allocateEncoder bool,
) (*formatProcessorMPEG4Audio, error) {
t := &formatProcessorMPEG4Audio{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
@ -58,9 +61,9 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e @@ -58,9 +61,9 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > maxPacketSize {
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), maxPacketSize)
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP

7
internal/formatprocessor/opus.go

@ -28,16 +28,19 @@ func (d *UnitOpus) GetNTP() time.Time { @@ -28,16 +28,19 @@ func (d *UnitOpus) GetNTP() time.Time {
}
type formatProcessorOpus struct {
udpMaxPayloadSize int
format *format.Opus
encoder *rtpsimpleaudio.Encoder
decoder *rtpsimpleaudio.Decoder
}
func newOpus(
udpMaxPayloadSize int,
forma *format.Opus,
allocateEncoder bool,
) (*formatProcessorOpus, error) {
t := &formatProcessorOpus{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
@ -58,9 +61,9 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -58,9 +61,9 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error {
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > maxPacketSize {
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), maxPacketSize)
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP

20
internal/formatprocessor/processor.go

@ -12,27 +12,31 @@ type Processor interface { @@ -12,27 +12,31 @@ type Processor interface {
}
// New allocates a Processor.
func New(forma format.Format, generateRTPPackets bool) (Processor, error) {
func New(
udpMaxPayloadSize int,
forma format.Format,
generateRTPPackets bool,
) (Processor, error) {
switch forma := forma.(type) {
case *format.H264:
return newH264(forma, generateRTPPackets)
return newH264(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.H265:
return newH265(forma, generateRTPPackets)
return newH265(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.VP8:
return newVP8(forma, generateRTPPackets)
return newVP8(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.VP9:
return newVP9(forma, generateRTPPackets)
return newVP9(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.MPEG4Audio:
return newMPEG4Audio(forma, generateRTPPackets)
return newMPEG4Audio(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.Opus:
return newOpus(forma, generateRTPPackets)
return newOpus(udpMaxPayloadSize, forma, generateRTPPackets)
default:
return newGeneric(forma, generateRTPPackets)
return newGeneric(udpMaxPayloadSize, forma, generateRTPPackets)
}
}

7
internal/formatprocessor/vp8.go

@ -28,16 +28,19 @@ func (d *UnitVP8) GetNTP() time.Time { @@ -28,16 +28,19 @@ func (d *UnitVP8) GetNTP() time.Time {
}
type formatProcessorVP8 struct {
udpMaxPayloadSize int
format *format.VP8
encoder *rtpvp8.Encoder
decoder *rtpvp8.Decoder
}
func newVP8(
udpMaxPayloadSize int,
forma *format.VP8,
allocateEncoder bool,
) (*formatProcessorVP8, error) {
t := &formatProcessorVP8{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
@ -58,9 +61,9 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -58,9 +61,9 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error {
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > maxPacketSize {
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), maxPacketSize)
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP

7
internal/formatprocessor/vp9.go

@ -28,16 +28,19 @@ func (d *UnitVP9) GetNTP() time.Time { @@ -28,16 +28,19 @@ func (d *UnitVP9) GetNTP() time.Time {
}
type formatProcessorVP9 struct {
udpMaxPayloadSize int
format *format.VP9
encoder *rtpvp9.Encoder
decoder *rtpvp9.Decoder
}
func newVP9(
udpMaxPayloadSize int,
forma *format.VP9,
allocateEncoder bool,
) (*formatProcessorVP9, error) {
t := &formatProcessorVP9{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
@ -58,9 +61,9 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -58,9 +61,9 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error {
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > maxPacketSize {
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), maxPacketSize)
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP

3
rtsp-simple-server.yml

@ -16,6 +16,9 @@ writeTimeout: 10s @@ -16,6 +16,9 @@ writeTimeout: 10s
# Number of read buffers.
# A higher value allows a wider throughput, a lower value allows to save RAM.
readBufferCount: 512
# Maximum size of payload of outgoing UDP packets.
# This can be decreased to avoid fragmentation on networks with a low UDP MTU.
udpMaxPayloadSize: 1472
# HTTP URL to perform external authentication.
# Every time a user wants to authenticate, the server calls this URL

Loading…
Cancel
Save