Browse Source

rtmp: support publishing G711 and LPCM tracks (#2857) (#2884)

pull/2885/head
Alessandro Ros 1 year ago committed by GitHub
parent
commit
27975d8b67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      README.md
  2. 2
      go.mod
  3. 4
      go.sum
  4. 5
      internal/formatprocessor/ac3.go
  5. 5
      internal/formatprocessor/av1.go
  6. 1
      internal/formatprocessor/av1_test.go
  7. 6
      internal/formatprocessor/g711.go
  8. 62
      internal/formatprocessor/g711_test.go
  9. 10
      internal/formatprocessor/h264.go
  10. 10
      internal/formatprocessor/h265.go
  11. 8
      internal/formatprocessor/lpcm.go
  12. 37
      internal/formatprocessor/lpcm_test.go
  13. 5
      internal/formatprocessor/mjpeg.go
  14. 5
      internal/formatprocessor/mpeg1_audio.go
  15. 5
      internal/formatprocessor/mpeg1_video.go
  16. 5
      internal/formatprocessor/mpeg4_audio.go
  17. 2
      internal/formatprocessor/mpeg4_video.go
  18. 5
      internal/formatprocessor/vp8.go
  19. 5
      internal/formatprocessor/vp9.go
  20. 28
      internal/protocols/rtmp/chunk/chunk_test.go
  21. 2
      internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/582528ddfad69eb5
  22. 2
      internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/5f73a77c7f93e5f8
  23. 2
      internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/553384c8664fe971
  24. 2
      internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/582528ddfad69eb5
  25. 2
      internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/582528ddfad69eb5
  26. 2
      internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/feb2b2a8b4ba63ba
  27. 2
      internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/582528ddfad69eb5
  28. 2
      internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/caf81e9797b19c76
  29. 39
      internal/protocols/rtmp/message/audio.go
  30. 12
      internal/protocols/rtmp/message/reader_test.go
  31. 8
      internal/protocols/rtmp/message/video.go
  32. 109
      internal/protocols/rtmp/reader.go
  33. 167
      internal/protocols/rtmp/reader_test.go
  34. 47
      internal/protocols/rtmp/writer.go
  35. 6
      internal/protocols/rtmp/writer_test.go
  36. 22
      internal/servers/rtmp/conn.go

16
README.md

@ -22,11 +22,11 @@ Live streams can be published to the server with:
|--------|--------|------------|------------| |--------|--------|------------|------------|
|[SRT clients](#srt-clients)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[SRT clients](#srt-clients)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
|[SRT cameras and servers](#srt-cameras-and-servers)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[SRT cameras and servers](#srt-cameras-and-servers)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
|[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711| |[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711 (PCMA, PCMU)|
|[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| |[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711 (PCMA, PCMU)|
|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| |[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec|
|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| |[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec|
|[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G711 (PCMA, PCMU), LPCM|
|[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)|
|[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)| |[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)|
|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
@ -37,8 +37,8 @@ And can be read from the server with:
|protocol|variants|video codecs|audio codecs| |protocol|variants|video codecs|audio codecs|
|--------|--------|------------|------------| |--------|--------|------------|------------|
|[SRT](#srt)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[SRT](#srt)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
|[WebRTC](#webrtc)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| |[WebRTC](#webrtc)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711 (PCMA, PCMU)|
|[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| |[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec|
|[RTMP](#rtmp)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[RTMP](#rtmp)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)|
|[HLS](#hls)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)| |[HLS](#hls)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)|
@ -46,7 +46,7 @@ And can be recorded with:
|format|video codecs|audio codecs| |format|video codecs|audio codecs|
|------|------------|------------| |------|------------|------------|
|[fMP4](#record-streams-to-disk)|AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G711, LPCM| |[fMP4](#record-streams-to-disk)|AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G711 (PCMA, PCMU), LPCM|
|[MPEG-TS](#record-streams-to-disk)|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[MPEG-TS](#record-streams-to-disk)|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
**Features** **Features**

2
go.mod

@ -8,7 +8,7 @@ require (
github.com/alecthomas/kong v0.8.1 github.com/alecthomas/kong v0.8.1
github.com/aler9/writerseeker v1.1.0 github.com/aler9/writerseeker v1.1.0
github.com/bluenviron/gohlslib v1.2.0 github.com/bluenviron/gohlslib v1.2.0
github.com/bluenviron/gortsplib/v4 v4.6.2 github.com/bluenviron/gortsplib/v4 v4.6.3-0.20240107120136-f9eb8e573bbe
github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1 github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1
github.com/datarhei/gosrt v0.5.5 github.com/datarhei/gosrt v0.5.5
github.com/fsnotify/fsnotify v1.7.0 github.com/fsnotify/fsnotify v1.7.0

4
go.sum

@ -22,8 +22,8 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI=
github.com/bluenviron/gohlslib v1.2.0 h1:Hrx2/n/AcmKKIV+MjZLKs5kmW+O7xCdUSPJQoS39JKw= github.com/bluenviron/gohlslib v1.2.0 h1:Hrx2/n/AcmKKIV+MjZLKs5kmW+O7xCdUSPJQoS39JKw=
github.com/bluenviron/gohlslib v1.2.0/go.mod h1:kG/Sjebsxnf5asMGaGcQ0aSvtFGNChJPgctds2wDHOI= github.com/bluenviron/gohlslib v1.2.0/go.mod h1:kG/Sjebsxnf5asMGaGcQ0aSvtFGNChJPgctds2wDHOI=
github.com/bluenviron/gortsplib/v4 v4.6.2 h1:CGIsxpnUFvSlIxnSFS0oFSSfwsHMmBCmYcrGAtIcwXc= github.com/bluenviron/gortsplib/v4 v4.6.3-0.20240107120136-f9eb8e573bbe h1:eVx4BU4mF26UK/SQlsnIkeCIraQR8AcJf42ymYR3pQE=
github.com/bluenviron/gortsplib/v4 v4.6.2/go.mod h1:dN1YjyPNMfy/NwC17Ga6MiIMiUoQfg5GL7LGsVHa0Jo= github.com/bluenviron/gortsplib/v4 v4.6.3-0.20240107120136-f9eb8e573bbe/go.mod h1:UqdkRR5YvKHP/wHEQQySJFKJm6tIZcftdP7cNszIZ1g=
github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1 h1:f8XDAHvgPbT+n5qf73REdUo9kLmGpP4HNgphKI/8fGE= github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1 h1:f8XDAHvgPbT+n5qf73REdUo9kLmGpP4HNgphKI/8fGE=
github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1/go.mod h1:Ij/kE1LEucSjryNBVTyPL/gBI0d6/Css3f5PyrM957w= github.com/bluenviron/mediacommon v1.6.1-0.20231228213201-7bb211dba7e1/go.mod h1:Ij/kE1LEucSjryNBVTyPL/gBI0d6/Css3f5PyrM957w=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=

5
internal/formatprocessor/ac3.go

@ -53,14 +53,13 @@ func (t *formatProcessorAC3) ProcessUnit(uu unit.Unit) error { //nolint:dupl
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

5
internal/formatprocessor/av1.go

@ -55,14 +55,13 @@ func (t *formatProcessorAV1) ProcessUnit(uu unit.Unit) error { //nolint:dupl
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

1
internal/formatprocessor/av1_test.go

@ -1 +0,0 @@
package formatprocessor

6
internal/formatprocessor/g711.go

@ -41,6 +41,7 @@ func newG711(
func (t *formatProcessorG711) createEncoder() error { func (t *formatProcessorG711) createEncoder() error {
t.encoder = &rtpsimpleaudio.Encoder{ t.encoder = &rtpsimpleaudio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: t.format.PayloadType(),
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -52,11 +53,10 @@ func (t *formatProcessorG711) ProcessUnit(uu unit.Unit) error { //nolint:dupl
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = []*rtp.Packet{pkt}
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
pkt.Timestamp += ts u.RTPPackets[0].Timestamp += ts
u.RTPPackets = []*rtp.Packet{pkt}
return nil return nil
} }

62
internal/formatprocessor/g711_test.go

@ -0,0 +1,62 @@
package formatprocessor
import (
"testing"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/unit"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestG611Encode(t *testing.T) {
t.Run("alaw", func(t *testing.T) {
forma := &format.G711{
MULaw: false,
}
p, err := New(1472, forma, true)
require.NoError(t, err)
unit := &unit.G711{
Samples: []byte{1, 2, 3, 4},
}
err = p.ProcessUnit(unit)
require.NoError(t, err)
require.Equal(t, []*rtp.Packet{{
Header: rtp.Header{
Version: 2,
PayloadType: 8,
SequenceNumber: unit.RTPPackets[0].SequenceNumber,
SSRC: unit.RTPPackets[0].SSRC,
},
Payload: []byte{1, 2, 3, 4},
}}, unit.RTPPackets)
})
t.Run("mulaw", func(t *testing.T) {
forma := &format.G711{
MULaw: true,
}
p, err := New(1472, forma, true)
require.NoError(t, err)
unit := &unit.G711{
Samples: []byte{1, 2, 3, 4},
}
err = p.ProcessUnit(unit)
require.NoError(t, err)
require.Equal(t, []*rtp.Packet{{
Header: rtp.Header{
Version: 2,
PayloadType: 0,
SequenceNumber: unit.RTPPackets[0].SequenceNumber,
SSRC: unit.RTPPackets[0].SSRC,
},
Payload: []byte{1, 2, 3, 4},
}}, unit.RTPPackets)
})
}

10
internal/formatprocessor/h264.go

@ -224,13 +224,12 @@ func (t *formatProcessorH264) ProcessUnit(uu unit.Unit) error {
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
} }
return nil return nil
@ -306,12 +305,11 @@ func (t *formatProcessorH264) ProcessRTPPacket( //nolint:dupl
if err != nil { if err != nil {
return nil, err return nil, err
} }
u.RTPPackets = pkts
for _, newPKT := range pkts { for _, newPKT := range u.RTPPackets {
newPKT.Timestamp = pkt.Timestamp newPKT.Timestamp = pkt.Timestamp
} }
u.RTPPackets = pkts
} }
return u, nil return u, nil

10
internal/formatprocessor/h265.go

@ -243,13 +243,12 @@ func (t *formatProcessorH265) ProcessUnit(uu unit.Unit) error { //nolint:dupl
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
} }
return nil return nil
@ -325,12 +324,11 @@ func (t *formatProcessorH265) ProcessRTPPacket( //nolint:dupl
if err != nil { if err != nil {
return nil, err return nil, err
} }
u.RTPPackets = pkts
for _, newPKT := range pkts { for _, newPKT := range u.RTPPackets {
newPKT.Timestamp = pkt.Timestamp newPKT.Timestamp = pkt.Timestamp
} }
u.RTPPackets = pkts
} }
return u, nil return u, nil

8
internal/formatprocessor/lpcm.go

@ -41,6 +41,9 @@ func newLPCM(
func (t *formatProcessorLPCM) createEncoder() error { func (t *formatProcessorLPCM) createEncoder() error {
t.encoder = &rtplpcm.Encoder{ t.encoder = &rtplpcm.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12, PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp,
BitDepth: t.format.BitDepth,
ChannelCount: t.format.ChannelCount,
} }
return t.encoder.Init() return t.encoder.Init()
} }
@ -52,14 +55,13 @@ func (t *formatProcessorLPCM) ProcessUnit(uu unit.Unit) error { //nolint:dupl
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

37
internal/formatprocessor/lpcm_test.go

@ -0,0 +1,37 @@
package formatprocessor
import (
"testing"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/unit"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestLPCMEncode(t *testing.T) {
forma := &format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
ChannelCount: 2,
}
p, err := New(1472, forma, true)
require.NoError(t, err)
unit := &unit.LPCM{
Samples: []byte{1, 2, 3, 4},
}
err = p.ProcessUnit(unit)
require.NoError(t, err)
require.Equal(t, []*rtp.Packet{{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: unit.RTPPackets[0].SequenceNumber,
SSRC: unit.RTPPackets[0].SSRC,
},
Payload: []byte{1, 2, 3, 4},
}}, unit.RTPPackets)
}

5
internal/formatprocessor/mjpeg.go

@ -54,14 +54,13 @@ func (t *formatProcessorMJPEG) ProcessUnit(uu unit.Unit) error { //nolint:dupl
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

5
internal/formatprocessor/mpeg1_audio.go

@ -53,14 +53,13 @@ func (t *formatProcessorMPEG1Audio) ProcessUnit(uu unit.Unit) error { //nolint:d
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

5
internal/formatprocessor/mpeg1_video.go

@ -54,14 +54,13 @@ func (t *formatProcessorMPEG1Video) ProcessUnit(uu unit.Unit) error { //nolint:d
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

5
internal/formatprocessor/mpeg4_audio.go

@ -57,14 +57,13 @@ func (t *formatProcessorMPEG4Audio) ProcessUnit(uu unit.Unit) error { //nolint:d
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

2
internal/formatprocessor/mpeg4_video.go

@ -94,7 +94,7 @@ func (t *formatProcessorMPEG4Video) ProcessUnit(uu unit.Unit) error { //nolint:d
} }
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }

5
internal/formatprocessor/vp8.go

@ -54,14 +54,13 @@ func (t *formatProcessorVP8) ProcessUnit(uu unit.Unit) error { //nolint:dupl
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

5
internal/formatprocessor/vp9.go

@ -54,14 +54,13 @@ func (t *formatProcessorVP9) ProcessUnit(uu unit.Unit) error { //nolint:dupl
if err != nil { if err != nil {
return err return err
} }
u.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts { for _, pkt := range u.RTPPackets {
pkt.Timestamp += ts pkt.Timestamp += ts
} }
u.RTPPackets = pkts
return nil return nil
} }

28
internal/protocols/rtmp/chunk/chunk_test.go

@ -156,3 +156,31 @@ func TestChunkMarshal(t *testing.T) {
}) })
} }
} }
func FuzzChunk0Read(f *testing.F) {
f.Fuzz(func(t *testing.T, b []byte) {
var chunk Chunk0
chunk.Read(bytes.NewReader(b), 65536, false) //nolint:errcheck
})
}
func FuzzChunk1Read(f *testing.F) {
f.Fuzz(func(t *testing.T, b []byte) {
var chunk Chunk1
chunk.Read(bytes.NewReader(b), 65536, false) //nolint:errcheck
})
}
func FuzzChunk2Read(f *testing.F) {
f.Fuzz(func(t *testing.T, b []byte) {
var chunk Chunk2
chunk.Read(bytes.NewReader(b), 65536, false) //nolint:errcheck
})
}
func FuzzChunk3Read(f *testing.F) {
f.Fuzz(func(t *testing.T, b []byte) {
var chunk Chunk3
chunk.Read(bytes.NewReader(b), 65536, true) //nolint:errcheck
})
}

2
internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/582528ddfad69eb5 vendored

@ -0,0 +1,2 @@
go test fuzz v1
[]byte("0")

2
internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk0Read/5f73a77c7f93e5f8 vendored

@ -0,0 +1,2 @@
go test fuzz v1
[]byte("0\xff\xff\xff00000000")

2
internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/553384c8664fe971 vendored

@ -0,0 +1,2 @@
go test fuzz v1
[]byte("0\xff\xff\xff0000")

2
internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk1Read/582528ddfad69eb5 vendored

@ -0,0 +1,2 @@
go test fuzz v1
[]byte("0")

2
internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/582528ddfad69eb5 vendored

@ -0,0 +1,2 @@
go test fuzz v1
[]byte("0")

2
internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk2Read/feb2b2a8b4ba63ba vendored

@ -0,0 +1,2 @@
go test fuzz v1
[]byte("0\xff\xff\xff")

2
internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/582528ddfad69eb5 vendored

@ -0,0 +1,2 @@
go test fuzz v1
[]byte("0")

2
internal/protocols/rtmp/chunk/testdata/fuzz/FuzzChunk3Read/caf81e9797b19c76 vendored

@ -0,0 +1,2 @@
go test fuzz v1
[]byte("")

39
internal/protocols/rtmp/message/audio.go

@ -12,12 +12,35 @@ const (
AudioChunkStreamID = 4 AudioChunkStreamID = 4
) )
// supported audio codecs // audio codecs
const ( const (
CodecMPEG1Audio = 2 CodecMPEG1Audio = 2
CodecLPCM = 3
CodecPCMA = 7
CodecPCMU = 8
CodecMPEG4Audio = 10 CodecMPEG4Audio = 10
) )
// audio rates
const (
Rate5512 = 0
Rate11025 = 1
Rate22050 = 2
Rate44100 = 3
)
// audio depths
const (
Depth8 = 0
Depth16 = 1
)
// audio channels
const (
ChannelsMono = 0
ChannelsStereo = 1
)
// AudioAACType is the AAC type of a Audio. // AudioAACType is the AAC type of a Audio.
type AudioAACType uint8 type AudioAACType uint8
@ -52,7 +75,7 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error {
m.Codec = raw.Body[0] >> 4 m.Codec = raw.Body[0] >> 4
switch m.Codec { switch m.Codec {
case CodecMPEG1Audio, CodecMPEG4Audio: case CodecMPEG4Audio, CodecMPEG1Audio, CodecPCMA, CodecPCMU, CodecLPCM:
default: default:
return fmt.Errorf("unsupported audio codec: %d", m.Codec) return fmt.Errorf("unsupported audio codec: %d", m.Codec)
} }
@ -61,9 +84,7 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error {
m.Depth = (raw.Body[0] >> 1) & 0x01 m.Depth = (raw.Body[0] >> 1) & 0x01
m.Channels = raw.Body[0] & 0x01 m.Channels = raw.Body[0] & 0x01
if m.Codec == CodecMPEG1Audio { if m.Codec == CodecMPEG4Audio {
m.Payload = raw.Body[1:]
} else {
m.AACType = AudioAACType(raw.Body[1]) m.AACType = AudioAACType(raw.Body[1])
switch m.AACType { switch m.AACType {
case AudioAACTypeConfig, AudioAACTypeAU: case AudioAACTypeConfig, AudioAACTypeAU:
@ -72,6 +93,8 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error {
} }
m.Payload = raw.Body[2:] m.Payload = raw.Body[2:]
} else {
m.Payload = raw.Body[1:]
} }
return nil return nil
@ -93,11 +116,11 @@ func (m Audio) Marshal() (*rawmessage.Message, error) {
body[0] = m.Codec<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels body[0] = m.Codec<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels
if m.Codec == CodecMPEG1Audio { if m.Codec == CodecMPEG4Audio {
copy(body[1:], m.Payload)
} else {
body[1] = uint8(m.AACType) body[1] = uint8(m.AACType)
copy(body[2:], m.Payload) copy(body[2:], m.Payload)
} else {
copy(body[1:], m.Payload)
} }
return &rawmessage.Message{ return &rawmessage.Message{

12
internal/protocols/rtmp/message/reader_test.go

@ -33,9 +33,9 @@ var readWriterCases = []struct {
DTS: 6013806 * time.Millisecond, DTS: 6013806 * time.Millisecond,
MessageStreamID: 4534543, MessageStreamID: 4534543,
Codec: CodecMPEG1Audio, Codec: CodecMPEG1Audio,
Rate: flvio.SOUND_44Khz, Rate: Rate44100,
Depth: flvio.SOUND_16BIT, Depth: Depth16,
Channels: flvio.SOUND_STEREO, Channels: ChannelsStereo,
Payload: []byte{0x01, 0x02, 0x03, 0x04}, Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, },
[]byte{ []byte{
@ -50,9 +50,9 @@ var readWriterCases = []struct {
DTS: 6013806 * time.Millisecond, DTS: 6013806 * time.Millisecond,
MessageStreamID: 4534543, MessageStreamID: 4534543,
Codec: CodecMPEG4Audio, Codec: CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: Rate44100,
Depth: flvio.SOUND_16BIT, Depth: Depth16,
Channels: flvio.SOUND_STEREO, Channels: ChannelsStereo,
AACType: AudioAACTypeAU, AACType: AudioAACTypeAU,
Payload: []byte{0x5A, 0xC0, 0x77, 0x40}, Payload: []byte{0x5A, 0xC0, 0x77, 0x40},
}, },

8
internal/protocols/rtmp/message/video.go

@ -4,8 +4,6 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage"
) )
@ -51,7 +49,7 @@ func (m *Video) Unmarshal(raw *rawmessage.Message) error {
return fmt.Errorf("invalid body size") return fmt.Errorf("invalid body size")
} }
m.IsKeyFrame = (raw.Body[0] >> 4) == flvio.FRAME_KEY m.IsKeyFrame = (raw.Body[0] >> 4) == 1
m.Codec = raw.Body[0] & 0x0F m.Codec = raw.Body[0] & 0x0F
switch m.Codec { switch m.Codec {
@ -83,9 +81,9 @@ func (m Video) Marshal() (*rawmessage.Message, error) {
body := make([]byte, m.marshalBodySize()) body := make([]byte, m.marshalBodySize())
if m.IsKeyFrame { if m.IsKeyFrame {
body[0] = flvio.FRAME_KEY << 4 body[0] = 1 << 4
} else { } else {
body[0] = flvio.FRAME_INTER << 4 body[0] = 2 << 4
} }
body[0] |= m.Codec body[0] |= m.Codec
body[1] = uint8(m.Type) body[1] = uint8(m.Type)

109
internal/protocols/rtmp/reader.go

@ -37,6 +37,12 @@ type OnDataMPEG4AudioFunc func(pts time.Duration, au []byte)
// OnDataMPEG1AudioFunc is the prototype of the callback passed to OnDataMPEG1Audio(). // OnDataMPEG1AudioFunc is the prototype of the callback passed to OnDataMPEG1Audio().
type OnDataMPEG1AudioFunc func(pts time.Duration, frame []byte) type OnDataMPEG1AudioFunc func(pts time.Duration, frame []byte)
// OnDataG711Func is the prototype of the callback passed to OnDataG711().
type OnDataG711Func func(pts time.Duration, samples []byte)
// OnDataLPCMFunc is the prototype of the callback passed to OnDataLPCM().
type OnDataLPCMFunc func(pts time.Duration, samples []byte)
func hasVideo(md flvio.AMFMap) (bool, error) { func hasVideo(md flvio.AMFMap) (bool, error) {
v, ok := md.GetV("videocodecid") v, ok := md.GetV("videocodecid")
if !ok { if !ok {
@ -81,11 +87,25 @@ func hasAudio(md flvio.AMFMap, audioTrack *format.Format) (bool, error) {
case 0: case 0:
return false, nil return false, nil
case message.CodecMPEG4Audio, message.CodecLPCM:
return true, nil
case message.CodecMPEG1Audio: case message.CodecMPEG1Audio:
*audioTrack = &format.MPEG1Audio{} *audioTrack = &format.MPEG1Audio{}
return true, nil return true, nil
case message.CodecMPEG4Audio: case message.CodecPCMA:
v, ok := md.GetV("stereo")
if ok && v == true {
return false, fmt.Errorf("stereo PCMA is not supported")
}
return true, nil
case message.CodecPCMU:
v, ok := md.GetV("stereo")
if ok && v == true {
return false, fmt.Errorf("stereo PCMU is not supported")
}
return true, nil return true, nil
} }
@ -95,7 +115,7 @@ func hasAudio(md flvio.AMFMap, audioTrack *format.Format) (bool, error) {
} }
} }
return false, fmt.Errorf("unsupported audio codec %v", v) return false, fmt.Errorf("unsupported audio codec: %v", v)
} }
func h265FindNALU(array []mp4.HEVCNaluArray, typ h265.NALUType) []byte { func h265FindNALU(array []mp4.HEVCNaluArray, typ h265.NALUType) []byte {
@ -244,6 +264,10 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma
} }
case *message.ExtendedSequenceStart: case *message.ExtendedSequenceStart:
if !hasVideo {
return nil, nil, fmt.Errorf("unexpected video packet")
}
if videoTrack == nil { if videoTrack == nil {
switch msg.FourCC { switch msg.FourCC {
case message.FourCCHEVC: case message.FourCCHEVC:
@ -302,12 +326,46 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma
return nil, nil, fmt.Errorf("unexpected audio packet") return nil, nil, fmt.Errorf("unexpected audio packet")
} }
if audioTrack == nil && if audioTrack == nil {
msg.Codec == message.CodecMPEG4Audio && switch {
msg.AACType == message.AudioAACTypeConfig { case msg.Codec == message.CodecMPEG4Audio &&
audioTrack, err = trackFromAACDecoderConfig(msg.Payload) msg.AACType == message.AudioAACTypeConfig:
if err != nil { audioTrack, err = trackFromAACDecoderConfig(msg.Payload)
return nil, nil, err if err != nil {
return nil, nil, err
}
case msg.Codec == message.CodecPCMA:
if msg.Channels == message.ChannelsStereo {
return nil, nil, fmt.Errorf("stereo PCMA is not supported")
}
audioTrack = &format.G711{MULaw: false}
case msg.Codec == message.CodecPCMU:
if msg.Channels == message.ChannelsStereo {
return nil, nil, fmt.Errorf("stereo PCMU is not supported")
}
audioTrack = &format.G711{MULaw: true}
case msg.Codec == message.CodecLPCM:
audioTrack = &format.LPCM{
PayloadTyp: 96,
BitDepth: func() int {
if msg.Depth == message.Depth16 {
return 16
}
return 8
}(),
SampleRate: audioRateRTMPToInt(msg.Rate),
ChannelCount: func() int {
if msg.Channels == message.ChannelsMono {
return 1
}
return 2
}(),
}
} }
} }
} }
@ -580,6 +638,41 @@ func (r *Reader) OnDataMPEG1Audio(cb OnDataMPEG1AudioFunc) {
} }
} }
// OnDataG711 sets a callback that is called when G711 data is received.
func (r *Reader) OnDataG711(cb OnDataG711Func) {
r.onDataAudio = func(msg *message.Audio) error {
cb(msg.DTS, msg.Payload)
return nil
}
}
// OnDataLPCM sets a callback that is called when LPCM data is received.
func (r *Reader) OnDataLPCM(cb OnDataLPCMFunc) {
bitDepth := r.audioTrack.(*format.LPCM).BitDepth
if bitDepth == 16 {
r.onDataAudio = func(msg *message.Audio) error {
le := len(msg.Payload)
if le%2 != 0 {
return fmt.Errorf("invalid payload length: %d", le)
}
// convert from little endian to big endian
for i := 0; i < le; i += 2 {
msg.Payload[i], msg.Payload[i+1] = msg.Payload[i+1], msg.Payload[i]
}
cb(msg.DTS, msg.Payload)
return nil
}
} else {
r.onDataAudio = func(msg *message.Audio) error {
cb(msg.DTS, msg.Payload)
return nil
}
}
}
// Read reads data. // Read reads data.
func (r *Reader) Read() error { func (r *Reader) Read() error {
msg, err := r.conn.Read() msg, err := r.conn.Read()

167
internal/protocols/rtmp/reader_test.go

@ -109,7 +109,7 @@ func TestReadTracks(t *testing.T) {
messages []message.Message messages []message.Message
}{ }{
{ {
"video+audio", "h264 + aac",
&format.H264{ &format.H264{
PayloadTyp: 96, PayloadTyp: 96,
SPS: h264SPS, SPS: h264SPS,
@ -172,9 +172,9 @@ func TestReadTracks(t *testing.T) {
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio, Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: message.Rate44100,
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: flvio.SOUND_STEREO, Channels: message.ChannelsStereo,
AACType: message.AudioAACTypeConfig, AACType: message.AudioAACTypeConfig,
Payload: func() []byte { Payload: func() []byte {
enc, err := mpeg4audio.Config{ enc, err := mpeg4audio.Config{
@ -189,7 +189,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"video", "h264",
&format.H264{ &format.H264{
PayloadTyp: 96, PayloadTyp: 96,
SPS: h264SPS, SPS: h264SPS,
@ -241,7 +241,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"issue mediamtx/386 (missing metadata), video+audio", "h264 + aac, issue mediamtx/386 (missing metadata)",
&format.H264{ &format.H264{
PayloadTyp: 96, PayloadTyp: 96,
SPS: h264SPS, SPS: h264SPS,
@ -292,9 +292,9 @@ func TestReadTracks(t *testing.T) {
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio, Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: message.Rate44100,
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: flvio.SOUND_STEREO, Channels: message.ChannelsStereo,
AACType: message.AudioAACTypeConfig, AACType: message.AudioAACTypeConfig,
Payload: func() []byte { Payload: func() []byte {
enc, err := mpeg4audio.Config{ enc, err := mpeg4audio.Config{
@ -309,7 +309,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"issue mediamtx/386 (missing metadata), audio", "aac, issue mediamtx/386 (missing metadata)",
nil, nil,
&format.MPEG4Audio{ &format.MPEG4Audio{
PayloadTyp: 96, PayloadTyp: 96,
@ -327,9 +327,9 @@ func TestReadTracks(t *testing.T) {
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio, Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: message.Rate44100,
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: flvio.SOUND_STEREO, Channels: message.ChannelsStereo,
AACType: message.AudioAACTypeConfig, AACType: message.AudioAACTypeConfig,
Payload: func() []byte { Payload: func() []byte {
enc, err := mpeg4audio.Config{ enc, err := mpeg4audio.Config{
@ -345,9 +345,9 @@ func TestReadTracks(t *testing.T) {
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio, Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: message.Rate44100,
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: flvio.SOUND_STEREO, Channels: message.ChannelsStereo,
AACType: message.AudioAACTypeConfig, AACType: message.AudioAACTypeConfig,
Payload: func() []byte { Payload: func() []byte {
enc, err := mpeg4audio.Config{ enc, err := mpeg4audio.Config{
@ -363,7 +363,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"obs studio pre 29.1 h265", "h265 + aac, obs studio pre 29.1 h265",
&format.H265{ &format.H265{
PayloadTyp: 96, PayloadTyp: 96,
VPS: h265VPS, VPS: h265VPS,
@ -428,9 +428,9 @@ func TestReadTracks(t *testing.T) {
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio, Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: message.Rate44100,
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: flvio.SOUND_STEREO, Channels: message.ChannelsStereo,
AACType: message.AudioAACTypeConfig, AACType: message.AudioAACTypeConfig,
Payload: func() []byte { Payload: func() []byte {
enc, err := mpeg4audio.Config{ enc, err := mpeg4audio.Config{
@ -445,7 +445,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"issue mediamtx/2232 (xsplit broadcaster)", "h265, issue mediamtx/2232 (xsplit broadcaster)",
&format.H265{ &format.H265{
PayloadTyp: 96, PayloadTyp: 96,
VPS: h265VPS, VPS: h265VPS,
@ -494,7 +494,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"obs 30", "h265, obs 30.0",
&format.H265{ &format.H265{
PayloadTyp: 96, PayloadTyp: 96,
VPS: h265VPS, VPS: h265VPS,
@ -543,7 +543,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"ffmpeg av1", "av1, ffmpeg",
&format.AV1{ &format.AV1{
PayloadTyp: 96, PayloadTyp: 96,
}, },
@ -604,7 +604,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"issue mediamtx/2289 (missing videocodecid)", "h264 + aac, issue mediamtx/2289 (missing videocodecid)",
&format.H264{ &format.H264{
PayloadTyp: 96, PayloadTyp: 96,
SPS: []byte{ SPS: []byte{
@ -680,7 +680,7 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
{ {
"issue mediamtx/2352 (missing audio)", "h264, issue mediamtx/2352",
&format.H264{ &format.H264{
PayloadTyp: 96, PayloadTyp: 96,
SPS: h264SPS, SPS: h264SPS,
@ -762,6 +762,123 @@ func TestReadTracks(t *testing.T) {
}, },
}, },
}, },
{
"mpeg-1 audio",
nil,
&format.MPEG1Audio{},
[]message.Message{
&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{K: "duration", V: 0},
{K: "audiocodecid", V: 2},
{K: "encoder", V: "Lavf58.45.100"},
{K: "filesize", V: 0},
},
},
},
},
},
{
"pcma",
nil,
&format.G711{},
[]message.Message{
&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{K: "duration", V: 0},
{K: "audiocodecid", V: 7},
{K: "encoder", V: "Lavf58.45.100"},
{K: "filesize", V: 0},
},
},
},
&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecPCMA,
Rate: message.Rate5512,
Depth: message.Depth16,
Channels: message.ChannelsMono,
Payload: []byte{1, 2, 3, 4},
},
},
},
{
"pcmu",
nil,
&format.G711{
MULaw: true,
},
[]message.Message{
&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{K: "duration", V: 0},
{K: "audiocodecid", V: 8},
{K: "encoder", V: "Lavf58.45.100"},
{K: "filesize", V: 0},
},
},
},
&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecPCMU,
Rate: message.Rate5512,
Depth: message.Depth16,
Channels: message.ChannelsMono,
Payload: []byte{1, 2, 3, 4},
},
},
},
{
"lpcm gstreamer",
nil,
&format.LPCM{
PayloadTyp: 96,
BitDepth: 16,
SampleRate: 44100,
ChannelCount: 2,
},
[]message.Message{
&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{K: "duration", V: 0},
{K: "audiocodecid", V: 3},
{K: "filesize", V: 0},
},
},
},
&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecLPCM,
Rate: message.Rate44100,
Depth: message.Depth16,
Channels: message.ChannelsStereo,
Payload: []byte{1, 2, 3, 4},
},
},
},
} { } {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer var buf bytes.Buffer

47
internal/protocols/rtmp/writer.go

@ -13,24 +13,37 @@ import (
"github.com/bluenviron/mediamtx/internal/protocols/rtmp/message" "github.com/bluenviron/mediamtx/internal/protocols/rtmp/message"
) )
func mpeg1AudioRate(sr int) uint8 { func audioRateRTMPToInt(v uint8) int {
switch sr { switch v {
case 5500: case message.Rate5512:
return flvio.SOUND_5_5Khz return 5512
case message.Rate11025:
return 11025
case message.Rate22050:
return 22050
default:
return 44100
}
}
func audioRateIntToRTMP(v int) uint8 {
switch v {
case 5512:
return message.Rate5512
case 11025: case 11025:
return flvio.SOUND_11Khz return message.Rate11025
case 22050: case 22050:
return flvio.SOUND_22Khz return message.Rate22050
default: default:
return flvio.SOUND_44Khz return message.Rate44100
} }
} }
func mpeg1AudioChannels(m mpeg1audio.ChannelMode) uint8 { func mpeg1AudioChannels(m mpeg1audio.ChannelMode) uint8 {
if m == mpeg1audio.ChannelModeMono { if m == mpeg1audio.ChannelModeMono {
return flvio.SOUND_MONO return message.ChannelsMono
} }
return flvio.SOUND_STEREO return message.ChannelsStereo
} }
// Writer is a wrapper around Conn that provides utilities to mux outgoing data. // Writer is a wrapper around Conn that provides utilities to mux outgoing data.
@ -141,9 +154,9 @@ func (w *Writer) writeTracks(videoTrack format.Format, audioTrack format.Format)
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio, Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: message.Rate44100,
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: flvio.SOUND_STEREO, Channels: message.ChannelsStereo,
AACType: message.AudioAACTypeConfig, AACType: message.AudioAACTypeConfig,
Payload: enc, Payload: enc,
}) })
@ -180,9 +193,9 @@ func (w *Writer) WriteMPEG4Audio(pts time.Duration, au []byte) error {
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio, Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: message.Rate44100,
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: flvio.SOUND_STEREO, Channels: message.ChannelsStereo,
AACType: message.AudioAACTypeAU, AACType: message.AudioAACTypeAU,
Payload: au, Payload: au,
DTS: pts, DTS: pts,
@ -195,8 +208,8 @@ func (w *Writer) WriteMPEG1Audio(pts time.Duration, h *mpeg1audio.FrameHeader, f
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG1Audio, Codec: message.CodecMPEG1Audio,
Rate: mpeg1AudioRate(h.SampleRate), Rate: audioRateIntToRTMP(h.SampleRate),
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: mpeg1AudioChannels(h.ChannelMode), Channels: mpeg1AudioChannels(h.ChannelMode),
Payload: frame, Payload: frame,
DTS: pts, DTS: pts,

6
internal/protocols/rtmp/writer_test.go

@ -89,9 +89,9 @@ func TestWriteTracks(t *testing.T) {
ChunkStreamID: message.AudioChunkStreamID, ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000, MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio, Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz, Rate: message.Rate44100,
Depth: flvio.SOUND_16BIT, Depth: message.Depth16,
Channels: flvio.SOUND_STEREO, Channels: message.ChannelsStereo,
AACType: message.AudioAACTypeConfig, AACType: message.AudioAACTypeConfig,
Payload: []byte{0x12, 0x10}, Payload: []byte{0x12, 0x10},
}, msg) }, msg)

22
internal/servers/rtmp/conn.go

@ -524,6 +524,28 @@ func (c *conn) runPublish(conn *rtmp.Conn, u *url.URL) error {
}) })
}) })
case *format.G711:
r.OnDataG711(func(pts time.Duration, samples []byte) {
stream.WriteUnit(audioMedia, audioFormat, &unit.G711{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Samples: samples,
})
})
case *format.LPCM:
r.OnDataLPCM(func(pts time.Duration, samples []byte) {
stream.WriteUnit(audioMedia, audioFormat, &unit.LPCM{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Samples: samples,
})
})
default: default:
return fmt.Errorf("unsupported audio codec: %T", audioFormat) return fmt.Errorf("unsupported audio codec: %T", audioFormat)
} }

Loading…
Cancel
Save