Browse Source

support publishing H265 and AV1 tracks with Enhanced RTMP (#1393) (#1446) (#1621) (#1756)

pull/1768/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
e8124e2f56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      README.md
  2. 6
      go.mod
  3. 8
      go.sum
  4. 100
      internal/core/rtmp_conn.go
  5. 12
      internal/core/rtmp_server_test.go
  6. 9
      internal/core/rtmp_source.go
  7. 6
      internal/core/rtmp_source_test.go
  8. 3
      internal/core/webrtc_conn.go
  9. 133
      internal/formatprocessor/av1.go
  10. 2
      internal/formatprocessor/h264.go
  11. 2
      internal/formatprocessor/h265.go
  12. 3
      internal/formatprocessor/processor.go
  13. 6
      internal/rtmp/chunk/chunk0.go
  14. 2
      internal/rtmp/chunk/chunk0_test.go
  15. 6
      internal/rtmp/chunk/chunk1.go
  16. 2
      internal/rtmp/chunk/chunk1_test.go
  17. 24
      internal/rtmp/chunk/messagetype.go
  18. 476
      internal/rtmp/conn.go
  19. 992
      internal/rtmp/conn_test.go
  20. 11
      internal/rtmp/message/acknowledge.go
  21. 31
      internal/rtmp/message/audio.go
  22. 11
      internal/rtmp/message/command_amf0.go
  23. 11
      internal/rtmp/message/data_amf0.go
  24. 71
      internal/rtmp/message/extended_coded_frames.go
  25. 44
      internal/rtmp/message/extended_frames_x.go
  26. 24
      internal/rtmp/message/extended_metadata.go
  27. 24
      internal/rtmp/message/extended_mpeg2ts_sequence_start.go
  28. 28
      internal/rtmp/message/extended_sequence_end.go
  29. 26
      internal/rtmp/message/extended_sequence_start.go
  30. 60
      internal/rtmp/message/message.go
  31. 12
      internal/rtmp/message/msg_usercontrol.go
  32. 105
      internal/rtmp/message/reader.go
  33. 73
      internal/rtmp/message/reader_test.go
  34. 8
      internal/rtmp/message/readwriter.go
  35. 6
      internal/rtmp/message/readwriter_test.go
  36. 13
      internal/rtmp/message/set_chunk_size.go
  37. 13
      internal/rtmp/message/set_peer_bandwidth.go
  38. 13
      internal/rtmp/message/set_window_ack_size.go
  39. 11
      internal/rtmp/message/user_control_ping_request.go
  40. 11
      internal/rtmp/message/user_control_ping_response.go
  41. 11
      internal/rtmp/message/user_control_set_buffer_length.go
  42. 11
      internal/rtmp/message/user_control_stream_begin.go
  43. 11
      internal/rtmp/message/user_control_stream_dry.go
  44. 11
      internal/rtmp/message/user_control_stream_eof.go
  45. 11
      internal/rtmp/message/user_control_stream_is_recorded.go
  46. 36
      internal/rtmp/message/video.go
  47. 4
      internal/rtmp/message/writer.go
  48. 4
      internal/rtmp/rawmessage/message.go
  49. 2
      internal/rtmp/rawmessage/reader.go
  50. 30
      internal/rtmp/rawmessage/reader_test.go
  51. 2
      internal/rtmp/rawmessage/writer.go
  52. 4
      internal/rtmp/rawmessage/writer_test.go
  53. 37
      internal/rtmp/tracks/boxes_av1.go
  54. 396
      internal/rtmp/tracks/read.go
  55. 484
      internal/rtmp/tracks/read_test.go
  56. 107
      internal/rtmp/tracks/write.go
  57. 96
      internal/rtmp/tracks/write_test.go

32
README.md

@ -8,24 +8,24 @@ @@ -8,24 +8,24 @@
Live streams can be published to the server with:
|protocol|variants|codecs|
|--------|--------|------|
|RTSP clients (FFmpeg, GStreamer, etc)|UDP, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 Video, M-JPEG, MPEG-4 Video (H263, Xvid), MPEG-2 Audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 Video, M-JPEG, MPEG-4 Video (H263, Xvid), MPEG-2 Audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTMP clients (OBS Studio)|RTMP, RTMPS|H264, H265, MPEG-2 Audio (MP3), MPEG-4 Audio (AAC)|
|RTMP servers and cameras|RTMP, RTMPS|H264, MPEG-2 Audio (MP3), MPEG-4 Audio (AAC)|
|HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG-4 Audio (AAC), Opus|
|UDP/MPEG-TS streams|Unicast, broadcast, multicast|H264, H265, MPEG-4 Audio (AAC), Opus|
|Raspberry Pi Cameras||H264|
|protocol|variants|video codecs|audio codecs|
|--------|--------|------------|------------|
|RTSP clients (FFmpeg, GStreamer)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec|
|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec|
|RTMP clients (OBS Studio)|RTMP, RTMPS, Enhanced RTMP|AV1, H265, H264|MPEG-4 Audio (AAC), MPEG-2 Audio (MP3)|
|RTMP servers and cameras|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-2 Audio (MP3)|
|HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)|
|UDP/MPEG-TS streams|Unicast, broadcast, multicast|H265, H264|Opus, MPEG-4 Audio (AAC)|
|Raspberry Pi Cameras||H264||
And can be read from the server with:
|protocol|variants|codecs|
|--------|--------|------|
|RTSP|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 Video, M-JPEG, MPEG-4 Video (H263, Xvid), MPEG-2 Audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTMP|RTMP, RTMPS|H264, MPEG-2 Audio (MP3), MPEG-4 Audio (AAC)|
|HLS|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG-4 Audio (AAC), Opus|
|WebRTC||H264, VP8, VP9, Opus, G711, G722|
|protocol|variants|video codecs|audio codecs|
|--------|--------|------------|------------|
|RTSP|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec|
|RTMP|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-2 Audio (MP3)|
|HLS|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)|
|WebRTC||VP9, VP8, H264|Opus, G722, G711|
Features:
@ -1201,7 +1201,9 @@ For more advanced options, you can create and serve a custom web page by startin @@ -1201,7 +1201,9 @@ For more advanced options, you can create and serve a custom web page by startin
* [RTSP/RTP/RTCP standards](https://github.com/bluenviron/gortsplib#standards)
* [HLS standards](https://github.com/bluenviron/gohlslib#standards)
* [Codec standards](https://github.com/bluenviron/mediacommon#standards)
* [RTMP specification](https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf)
* [Enhanced RTMP](https://raw.githubusercontent.com/veovera/enhanced-rtmp/main/enhanced-rtmp-v1.pdf)
* [Golang project layout](https://github.com/golang-standards/project-layout)
## Links

6
go.mod

@ -4,11 +4,12 @@ go 1.20 @@ -4,11 +4,12 @@ go 1.20
require (
code.cloudfoundry.org/bytefmt v0.0.0
github.com/abema/go-mp4 v0.10.1
github.com/alecthomas/kong v0.7.1
github.com/asticode/go-astits v1.11.0
github.com/bluenviron/gohlslib v0.2.3
github.com/bluenviron/gortsplib/v3 v3.4.0
github.com/bluenviron/mediacommon v0.4.2
github.com/bluenviron/gortsplib/v3 v3.5.0
github.com/bluenviron/mediacommon v0.5.0
github.com/fsnotify/fsnotify v1.6.0
github.com/gin-gonic/gin v1.9.0
github.com/google/uuid v1.3.0
@ -27,7 +28,6 @@ require ( @@ -27,7 +28,6 @@ require (
)
require (
github.com/abema/go-mp4 v0.10.1 // indirect
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 // indirect
github.com/asticode/go-astikit v0.30.0 // indirect
github.com/bytedance/sonic v1.8.0 // indirect

8
go.sum

@ -14,10 +14,10 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z @@ -14,10 +14,10 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/gohlslib v0.2.3 h1:vZmpjh2qWHaCvwwha04tgu8Kz9p4CuSBRLayD2yf89A=
github.com/bluenviron/gohlslib v0.2.3/go.mod h1:loD97sTtBh/nBcw8yZJgXc71A6XQb0FsDWXFRkl7Yj4=
github.com/bluenviron/gortsplib/v3 v3.4.0 h1:N4ticlV5YqRFDNvU52CRJgBQ0hHnxerDLfsd5wf5GI0=
github.com/bluenviron/gortsplib/v3 v3.4.0/go.mod h1:Th3S/suqfnpV81y31YpE1hcOP9odMqvIjOB7RV1+2lU=
github.com/bluenviron/mediacommon v0.4.2 h1:rdghY3g70+fdviapO2hL6CHpOGeTd7KbH1aEZnMwh88=
github.com/bluenviron/mediacommon v0.4.2/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bluenviron/gortsplib/v3 v3.5.0 h1:8d6DYcwVhghObgBFOnoJwK6xf1ZiAQ8Vi7DRv6DGLdw=
github.com/bluenviron/gortsplib/v3 v3.5.0/go.mod h1:gc6Z8pBUMC9QBqYxcOY9eVxjDPOrmFcwVH61Xs3Gu2A=
github.com/bluenviron/mediacommon v0.5.0 h1:YsVFlEknaXWhZGfz+Y1QbuzXLMVSmHODc7OnRqZoITY=
github.com/bluenviron/mediacommon v0.5.0/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.8.0 h1:ea0Xadu+sHlu7x5O3gKhRpQ1IKiMrSiHttPF0ybECuA=
github.com/bytedance/sonic v1.8.0/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=

100
internal/core/rtmp_conn.go

@ -13,6 +13,7 @@ import ( @@ -13,6 +13,7 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/gortsplib/v3/pkg/ringbuffer"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg2audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
@ -46,10 +47,10 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -46,10 +47,10 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
switch format.(type) {
case *formats.H264:
return func(msg interface{}) error {
tmsg := msg.(*message.MsgVideo)
tmsg := msg.(*message.Video)
switch tmsg.Type {
case message.MsgVideoTypeConfig:
case message.VideoTypeConfig:
var conf h264conf.Conf
err := conf.Unmarshal(tmsg.Payload)
if err != nil {
@ -67,7 +68,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -67,7 +68,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
NTP: time.Now(),
})
case message.MsgVideoTypeAU:
case message.VideoTypeAU:
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to decode AVCC: %v", err)
@ -85,25 +86,68 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -85,25 +86,68 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
case *formats.H265:
return func(msg interface{}) error {
tmsg := msg.(*message.MsgVideo)
switch tmsg := msg.(type) {
case *message.Video:
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to decode AVCC: %v", err)
}
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to decode AVCC: %v", err)
stream.writeUnit(medi, format, &formatprocessor.UnitH265{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
})
case *message.ExtendedFramesX:
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to decode AVCC: %v", err)
}
stream.writeUnit(medi, format, &formatprocessor.UnitH265{
PTS: tmsg.DTS,
AU: au,
NTP: time.Now(),
})
case *message.ExtendedCodedFrames:
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to decode AVCC: %v", err)
}
stream.writeUnit(medi, format, &formatprocessor.UnitH265{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
})
}
stream.writeUnit(medi, format, &formatprocessor.UnitH265{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
})
return nil
}
case *formats.AV1:
return func(msg interface{}) error {
if tmsg, ok := msg.(*message.ExtendedCodedFrames); ok {
obus, err := av1.BitstreamUnmarshal(tmsg.Payload, true)
if err != nil {
return fmt.Errorf("unable to decode bitstream: %v", err)
}
stream.writeUnit(medi, format, &formatprocessor.UnitAV1{
PTS: tmsg.DTS,
OBUs: obus,
NTP: time.Now(),
})
}
return nil
}
case *formats.MPEG2Audio:
return func(msg interface{}) error {
tmsg := msg.(*message.MsgAudio)
tmsg := msg.(*message.Audio)
stream.writeUnit(medi, format, &formatprocessor.UnitMPEG2Audio{
PTS: tmsg.DTS,
@ -116,9 +160,9 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -116,9 +160,9 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
case *formats.MPEG4Audio:
return func(msg interface{}) error {
tmsg := msg.(*message.MsgAudio)
tmsg := msg.(*message.Audio)
if tmsg.AACType == message.MsgAudioAACTypeAU {
if tmsg.AACType == message.AudioAACTypeAU {
stream.writeUnit(medi, format, &formatprocessor.UnitMPEG4Audio{
PTS: tmsg.DTS,
AUs: [][]byte{tmsg.Payload},
@ -496,12 +540,12 @@ func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBu @@ -496,12 +540,12 @@ func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBu
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = c.conn.WriteMessage(&message.MsgVideo{
ChunkStreamID: message.MsgVideoChunkStreamID,
err = c.conn.WriteMessage(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: idrPresent,
Type: message.MsgVideoTypeAU,
Type: message.VideoTypeAU,
Payload: avcc,
DTS: dts,
PTSDelta: pts - dts,
@ -557,14 +601,14 @@ func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBu @@ -557,14 +601,14 @@ func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBu
for i, au := range tunit.AUs {
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err := c.conn.WriteMessage(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
err := c.conn.WriteMessage(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.MsgAudioAACTypeAU,
AACType: message.AudioAACTypeAU,
Payload: au,
DTS: pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormatMPEG4.ClockRate()),
@ -635,8 +679,8 @@ func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBu @@ -635,8 +679,8 @@ func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBu
rate = flvio.SOUND_22Khz
}
msg := &message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
msg := &message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG2Audio,
Rate: rate,
@ -751,23 +795,23 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -751,23 +795,23 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
return err
}
switch tmsg := msg.(type) {
case *message.MsgVideo:
switch msg.(type) {
case *message.Video, *message.ExtendedFramesX, *message.ExtendedCodedFrames:
if videoFormat == nil {
return fmt.Errorf("received a video packet, but track is not set up")
}
err := videoWriteFunc(tmsg)
err := videoWriteFunc(msg)
if err != nil {
c.Log(logger.Warn, "%v", err)
}
case *message.MsgAudio:
case *message.Audio:
if audioFormat == nil {
return fmt.Errorf("received an audio packet, but track is not set up")
}
err := audioWriteFunc(tmsg)
err := audioWriteFunc(msg)
if err != nil {
c.Log(logger.Warn, "%v", err)
}

12
internal/core/rtmp_server_test.go

@ -145,12 +145,12 @@ func TestRTMPServerPublishRead(t *testing.T) { @@ -145,12 +145,12 @@ func TestRTMPServerPublishRead(t *testing.T) {
require.Equal(t, videoTrack, videoTrack1)
require.Equal(t, audioTrack, audioTrack2)
err = conn1.WriteMessage(&message.MsgVideo{
ChunkStreamID: message.MsgVideoChunkStreamID,
err = conn1.WriteMessage(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.MsgVideoTypeAU,
Type: message.VideoTypeAU,
Payload: []byte{
0x00, 0x00, 0x00, 0x04, 0x05, 0x02, 0x03, 0x04, // IDR 1
0x00, 0x00, 0x00, 0x04, 0x05, 0x02, 0x03, 0x04, // IDR 2
@ -160,12 +160,12 @@ func TestRTMPServerPublishRead(t *testing.T) { @@ -160,12 +160,12 @@ func TestRTMPServerPublishRead(t *testing.T) {
msg1, err := conn2.ReadMessage()
require.NoError(t, err)
require.Equal(t, &message.MsgVideo{
ChunkStreamID: message.MsgVideoChunkStreamID,
require.Equal(t, &message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.MsgVideoTypeAU,
Type: message.VideoTypeAU,
Payload: []byte{
0x00, 0x00, 0x00, 0x19, // SPS
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,

9
internal/core/rtmp_source.go

@ -113,8 +113,9 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -113,8 +113,9 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
return err
}
if _, ok := videoFormat.(*formats.H265); ok {
return fmt.Errorf("proxying H265 streams with RTMP is not supported")
switch videoFormat.(type) {
case *formats.H265, *formats.AV1:
return fmt.Errorf("proxying H265 or AV1 tracks with RTMP is not supported")
}
var medias media.Medias
@ -165,7 +166,7 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -165,7 +166,7 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
}
switch tmsg := msg.(type) {
case *message.MsgVideo:
case *message.Video:
if videoFormat == nil {
return fmt.Errorf("received an H264 packet, but track is not set up")
}
@ -175,7 +176,7 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -175,7 +176,7 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
s.Log(logger.Warn, "%v", err)
}
case *message.MsgAudio:
case *message.Audio:
if audioFormat == nil {
return fmt.Errorf("received an AAC packet, but track is not set up")
}

6
internal/core/rtmp_source_test.go

@ -86,12 +86,12 @@ func TestRTMPSource(t *testing.T) { @@ -86,12 +86,12 @@ func TestRTMPSource(t *testing.T) {
<-connected
err = conn.WriteMessage(&message.MsgVideo{
ChunkStreamID: message.MsgVideoChunkStreamID,
err = conn.WriteMessage(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.MsgVideoTypeAU,
Type: message.VideoTypeAU,
Payload: []byte{0x00, 0x00, 0x00, 0x04, 0x05, 0x02, 0x03, 0x04},
})
require.NoError(t, err)

3
internal/core/webrtc_conn.go

@ -46,6 +46,7 @@ func newPeerConnection(configuration webrtc.Configuration, @@ -46,6 +46,7 @@ func newPeerConnection(configuration webrtc.Configuration,
options ...func(*webrtc.API),
) (*webrtc.PeerConnection, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, err
}
@ -328,7 +329,7 @@ func (c *webRTCConn) runInner(ctx context.Context) error { @@ -328,7 +329,7 @@ func (c *webRTCConn) runInner(ctx context.Context) error {
if tracks == nil {
return fmt.Errorf(
"the stream doesn't contain any supported codec (which are currently VP9, VP8, H264, Opus, G722, G711)")
"the stream doesn't contain any supported codec, which are currently H264, VP8, VP9, G711, G722, Opus")
}
err = c.wsconn.WriteJSON(c.genICEServers())

133
internal/formatprocessor/av1.go

@ -0,0 +1,133 @@ @@ -0,0 +1,133 @@
package formatprocessor
import (
"fmt"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/formats/rtpav1"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/pion/rtp"
"github.com/aler9/mediamtx/internal/logger"
)
// UnitAV1 is an AV1 data unit.
type UnitAV1 struct {
RTPPackets []*rtp.Packet
NTP time.Time
PTS time.Duration
OBUs [][]byte
}
// GetRTPPackets implements Unit.
func (d *UnitAV1) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitAV1) GetNTP() time.Time {
return d.NTP
}
type formatProcessorAV1 struct {
udpMaxPayloadSize int
format *formats.AV1
log logger.Writer
encoder *rtpav1.Encoder
decoder *rtpav1.Decoder
lastKeyFrameReceived time.Time
}
func newAV1(
udpMaxPayloadSize int,
forma *formats.AV1,
generateRTPPackets bool,
log logger.Writer,
) (*formatProcessorAV1, error) {
t := &formatProcessorAV1{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
log: log,
}
if generateRTPPackets {
t.encoder = &rtpav1.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
}
t.encoder.Init()
t.lastKeyFrameReceived = time.Now()
}
return t, nil
}
func (t *formatProcessorAV1) checkKeyFrameInterval(containsKeyFrame bool) {
if containsKeyFrame {
t.lastKeyFrameReceived = time.Now()
} else {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no AV1 key frames received in %v, stream can't be decoded", maxKeyFrameInterval)
}
}
}
func (t *formatProcessorAV1) checkOBUs(obus [][]byte) {
containsKeyFrame, _ := av1.ContainsKeyFrame(obus)
t.checkKeyFrameInterval(containsKeyFrame)
}
func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitAV1)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
t.lastKeyFrameReceived = time.Now()
}
// DecodeUntilMarker() is necessary, otherwise Encode() generates partial groups
obus, pts, err := t.decoder.DecodeUntilMarker(pkt)
if err != nil {
if err == rtpav1.ErrNonStartingPacketAndNoPrevious || err == rtpav1.ErrMorePacketsNeeded {
return nil
}
return err
}
tunit.OBUs = obus
t.checkOBUs(obus)
tunit.PTS = pts
}
// route packet as is
return nil
}
t.checkOBUs(tunit.OBUs)
// encode into RTP
pkts, err := t.encoder.Encode(tunit.OBUs, tunit.PTS)
if err != nil {
return err
}
tunit.RTPPackets = pkts
return nil
}

2
internal/formatprocessor/h264.go

@ -180,7 +180,7 @@ func (t *formatProcessorH264) checkKeyFrameInterval(isKeyFrame bool) { @@ -180,7 +180,7 @@ func (t *formatProcessorH264) checkKeyFrameInterval(isKeyFrame bool) {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no key frames received in %v, stream can't be decoded")
t.log.Log(logger.Warn, "no H264 key frames received in %v, stream can't be decoded")
}
}
}

2
internal/formatprocessor/h265.go

@ -200,7 +200,7 @@ func (t *formatProcessorH265) checkKeyFrameInterval(isKeyFrame bool) { @@ -200,7 +200,7 @@ func (t *formatProcessorH265) checkKeyFrameInterval(isKeyFrame bool) {
now := time.Now()
if now.Sub(t.lastKeyFrameReceived) >= maxKeyFrameInterval {
t.lastKeyFrameReceived = now
t.log.Log(logger.Warn, "no key frames received in %v, stream can't be decoded")
t.log.Log(logger.Warn, "no H265 key frames received in %v, stream can't be decoded")
}
}
}

3
internal/formatprocessor/processor.go

@ -39,6 +39,9 @@ func New( @@ -39,6 +39,9 @@ func New(
case *formats.VP9:
return newVP9(udpMaxPayloadSize, forma, generateRTPPackets, log)
case *formats.AV1:
return newAV1(udpMaxPayloadSize, forma, generateRTPPackets, log)
case *formats.MPEG2Audio:
return newMPEG2Audio(udpMaxPayloadSize, forma, generateRTPPackets, log)

6
internal/rtmp/chunk/chunk0.go

@ -11,7 +11,7 @@ import ( @@ -11,7 +11,7 @@ import (
type Chunk0 struct {
ChunkStreamID byte
Timestamp uint32
Type MessageType
Type uint8
MessageStreamID uint32
BodyLen uint32
Body []byte
@ -28,7 +28,7 @@ func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen uint32) error { @@ -28,7 +28,7 @@ func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen uint32) error {
c.ChunkStreamID = header[0] & 0x3F
c.Timestamp = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6])
c.Type = MessageType(header[7])
c.Type = header[7]
c.MessageStreamID = uint32(header[8])<<24 | uint32(header[9])<<16 | uint32(header[10])<<8 | uint32(header[11])
chunkBodyLen := c.BodyLen
@ -51,7 +51,7 @@ func (c Chunk0) Marshal() ([]byte, error) { @@ -51,7 +51,7 @@ func (c Chunk0) Marshal() ([]byte, error) {
buf[4] = byte(c.BodyLen >> 16)
buf[5] = byte(c.BodyLen >> 8)
buf[6] = byte(c.BodyLen)
buf[7] = byte(c.Type)
buf[7] = c.Type
buf[8] = byte(c.MessageStreamID >> 24)
buf[9] = byte(c.MessageStreamID >> 16)
buf[10] = byte(c.MessageStreamID >> 8)

2
internal/rtmp/chunk/chunk0_test.go

@ -15,7 +15,7 @@ var chunk0enc = []byte{ @@ -15,7 +15,7 @@ var chunk0enc = []byte{
var chunk0dec = Chunk0{
ChunkStreamID: 25,
Timestamp: 11641233,
Type: MessageTypeCommandAMF0,
Type: 20,
MessageStreamID: 56432445,
BodyLen: 20,
Body: []byte{0x01, 0x02, 0x03, 0x04},

6
internal/rtmp/chunk/chunk1.go

@ -13,7 +13,7 @@ import ( @@ -13,7 +13,7 @@ import (
type Chunk1 struct {
ChunkStreamID byte
TimestampDelta uint32
Type MessageType
Type uint8
BodyLen uint32
Body []byte
}
@ -29,7 +29,7 @@ func (c *Chunk1) Read(r io.Reader, chunkMaxBodyLen uint32) error { @@ -29,7 +29,7 @@ func (c *Chunk1) Read(r io.Reader, chunkMaxBodyLen uint32) error {
c.ChunkStreamID = header[0] & 0x3F
c.TimestampDelta = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6])
c.Type = MessageType(header[7])
c.Type = header[7]
chunkBodyLen := (c.BodyLen)
if chunkBodyLen > chunkMaxBodyLen {
@ -51,7 +51,7 @@ func (c Chunk1) Marshal() ([]byte, error) { @@ -51,7 +51,7 @@ func (c Chunk1) Marshal() ([]byte, error) {
buf[4] = byte(c.BodyLen >> 16)
buf[5] = byte(c.BodyLen >> 8)
buf[6] = byte(c.BodyLen)
buf[7] = byte(c.Type)
buf[7] = c.Type
copy(buf[8:], c.Body)
return buf, nil
}

2
internal/rtmp/chunk/chunk1_test.go

@ -15,7 +15,7 @@ var chunk1enc = []byte{ @@ -15,7 +15,7 @@ var chunk1enc = []byte{
var chunk1dec = Chunk1{
ChunkStreamID: 25,
TimestampDelta: 11641233,
Type: MessageTypeCommandAMF0,
Type: 20,
BodyLen: 20,
Body: []byte{0x01, 0x02, 0x03, 0x04},
}

24
internal/rtmp/chunk/messagetype.go

@ -1,24 +0,0 @@ @@ -1,24 +0,0 @@
package chunk
// MessageType is a message type.
type MessageType byte
// message types.
const (
MessageTypeSetChunkSize MessageType = 1
MessageTypeAbortMessage MessageType = 2
MessageTypeAcknowledge MessageType = 3
MessageTypeSetWindowAckSize MessageType = 5
MessageTypeSetPeerBandwidth MessageType = 6
MessageTypeUserControl MessageType = 4
MessageTypeCommandAMF3 MessageType = 17
MessageTypeCommandAMF0 MessageType = 20
MessageTypeDataAMF3 MessageType = 15
MessageTypeDataAMF0 MessageType = 18
MessageTypeAudio MessageType = 8
MessageTypeVideo MessageType = 9
)

476
internal/rtmp/conn.go

@ -2,26 +2,21 @@ @@ -2,26 +2,21 @@
package rtmp
import (
"errors"
"fmt"
"io"
"net/url"
"strings"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/mediamtx/internal/rtmp/bytecounter"
"github.com/aler9/mediamtx/internal/rtmp/h264conf"
"github.com/aler9/mediamtx/internal/rtmp/handshake"
"github.com/aler9/mediamtx/internal/rtmp/message"
"github.com/aler9/mediamtx/internal/rtmp/tracks"
)
func resultIsOK1(res *message.MsgCommandAMF0) bool {
func resultIsOK1(res *message.CommandAMF0) bool {
if len(res.Arguments) < 2 {
return false
}
@ -39,7 +34,7 @@ func resultIsOK1(res *message.MsgCommandAMF0) bool { @@ -39,7 +34,7 @@ func resultIsOK1(res *message.MsgCommandAMF0) bool {
return v == "status"
}
func resultIsOK2(res *message.MsgCommandAMF0) bool {
func resultIsOK2(res *message.CommandAMF0) bool {
if len(res.Arguments) < 2 {
return false
}
@ -126,27 +121,27 @@ func (c *Conn) BytesSent() uint64 { @@ -126,27 +121,27 @@ func (c *Conn) BytesSent() uint64 {
return c.bc.Writer.Count()
}
func (c *Conn) readCommand() (*message.MsgCommandAMF0, error) {
func (c *Conn) readCommand() (*message.CommandAMF0, error) {
for {
msg, err := c.mrw.Read()
if err != nil {
return nil, err
}
if cmd, ok := msg.(*message.MsgCommandAMF0); ok {
if cmd, ok := msg.(*message.CommandAMF0); ok {
return cmd, nil
}
}
}
func (c *Conn) readCommandResult(commandID int, commandName string, isValid func(*message.MsgCommandAMF0) bool) error {
func (c *Conn) readCommandResult(commandID int, commandName string, isValid func(*message.CommandAMF0) bool) error {
for {
msg, err := c.mrw.Read()
if err != nil {
return err
}
if cmd, ok := msg.(*message.MsgCommandAMF0); ok {
if cmd, ok := msg.(*message.CommandAMF0); ok {
if cmd.CommandID == commandID && cmd.Name == commandName {
if !isValid(cmd) {
return fmt.Errorf("server refused connect request")
@ -169,14 +164,14 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -169,14 +164,14 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
c.mrw = message.NewReadWriter(c.bc, false)
err = c.mrw.Write(&message.MsgSetWindowAckSize{
err = c.mrw.Write(&message.SetWindowAckSize{
Value: 2500000,
})
if err != nil {
return err
}
err = c.mrw.Write(&message.MsgSetPeerBandwidth{
err = c.mrw.Write(&message.SetPeerBandwidth{
Value: 2500000,
Type: 2,
})
@ -184,14 +179,14 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -184,14 +179,14 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
return err
}
err = c.mrw.Write(&message.MsgSetChunkSize{
err = c.mrw.Write(&message.SetChunkSize{
Value: 65536,
})
if err != nil {
return err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 3,
Name: "connect",
CommandID: 1,
@ -218,7 +213,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -218,7 +213,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
}
if !isPublishing {
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 3,
Name: "createStream",
CommandID: 2,
@ -235,14 +230,14 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -235,14 +230,14 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
return err
}
err = c.mrw.Write(&message.MsgUserControlSetBufferLength{
err = c.mrw.Write(&message.UserControlSetBufferLength{
BufferLength: 0x64,
})
if err != nil {
return err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 4,
MessageStreamID: 0x1000000,
Name: "play",
@ -259,7 +254,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -259,7 +254,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
return c.readCommandResult(3, "onStatus", resultIsOK1)
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 3,
Name: "releaseStream",
CommandID: 2,
@ -272,7 +267,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -272,7 +267,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
return err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 3,
Name: "FCPublish",
CommandID: 3,
@ -285,7 +280,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -285,7 +280,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
return err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 3,
Name: "createStream",
CommandID: 4,
@ -302,7 +297,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error { @@ -302,7 +297,7 @@ func (c *Conn) InitializeClient(u *url.URL, isPublishing bool) error {
return err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 4,
MessageStreamID: 0x1000000,
Name: "publish",
@ -362,14 +357,14 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -362,14 +357,14 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
tcURL = strings.Trim(tcURL, "'")
err = c.mrw.Write(&message.MsgSetWindowAckSize{
err = c.mrw.Write(&message.SetWindowAckSize{
Value: 2500000,
})
if err != nil {
return nil, false, err
}
err = c.mrw.Write(&message.MsgSetPeerBandwidth{
err = c.mrw.Write(&message.SetPeerBandwidth{
Value: 2500000,
Type: 2,
})
@ -377,7 +372,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -377,7 +372,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
return nil, false, err
}
err = c.mrw.Write(&message.MsgSetChunkSize{
err = c.mrw.Write(&message.SetChunkSize{
Value: 65536,
})
if err != nil {
@ -386,7 +381,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -386,7 +381,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
oe, _ := ma.GetFloat64("objectEncoding")
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: cmd.ChunkStreamID,
Name: "_result",
CommandID: cmd.CommandID,
@ -415,7 +410,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -415,7 +410,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
switch cmd.Name {
case "createStream":
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: cmd.ChunkStreamID,
Name: "_result",
CommandID: cmd.CommandID,
@ -443,21 +438,21 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -443,21 +438,21 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
return nil, false, err
}
err = c.mrw.Write(&message.MsgUserControlStreamIsRecorded{
err = c.mrw.Write(&message.UserControlStreamIsRecorded{
StreamID: 1,
})
if err != nil {
return nil, false, err
}
err = c.mrw.Write(&message.MsgUserControlStreamBegin{
err = c.mrw.Write(&message.UserControlStreamBegin{
StreamID: 1,
})
if err != nil {
return nil, false, err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 0x1000000,
Name: "onStatus",
@ -475,7 +470,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -475,7 +470,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
return nil, false, err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 0x1000000,
Name: "onStatus",
@ -493,7 +488,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -493,7 +488,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
return nil, false, err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 0x1000000,
Name: "onStatus",
@ -511,7 +506,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -511,7 +506,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
return nil, false, err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 0x1000000,
Name: "onStatus",
@ -546,7 +541,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) { @@ -546,7 +541,7 @@ func (c *Conn) InitializeServer() (*url.URL, bool, error) {
return nil, false, err
}
err = c.mrw.Write(&message.MsgCommandAMF0{
err = c.mrw.Write(&message.CommandAMF0{
ChunkStreamID: 5,
Name: "onStatus",
CommandID: cmd.CommandID,
@ -579,420 +574,13 @@ func (c *Conn) WriteMessage(msg message.Message) error { @@ -579,420 +574,13 @@ func (c *Conn) WriteMessage(msg message.Message) error {
return c.mrw.Write(msg)
}
func trackFromH264DecoderConfig(data []byte) (formats.Format, error) {
var conf h264conf.Conf
err := conf.Unmarshal(data)
if err != nil {
return nil, fmt.Errorf("unable to parse H264 config: %v", err)
}
return &formats.H264{
PayloadTyp: 96,
SPS: conf.SPS,
PPS: conf.PPS,
PacketizationMode: 1,
}, nil
}
func trackFromAACDecoderConfig(data []byte) (*formats.MPEG4Audio, error) {
var mpegConf mpeg4audio.Config
err := mpegConf.Unmarshal(data)
if err != nil {
return nil, err
}
return &formats.MPEG4Audio{
PayloadTyp: 96,
Config: &mpegConf,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}, nil
}
var errEmptyMetadata = errors.New("metadata is empty")
func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, formats.Format, error) {
if len(payload) != 1 {
return nil, nil, fmt.Errorf("invalid metadata")
}
md, ok := payload[0].(flvio.AMFMap)
if !ok {
return nil, nil, fmt.Errorf("invalid metadata")
}
var videoTrack formats.Format
var audioTrack formats.Format
hasVideo, err := func() (bool, error) {
v, ok := md.GetV("videocodecid")
if !ok {
return false, nil
}
switch vt := v.(type) {
case float64:
switch vt {
case 0:
return false, nil
case message.CodecH264:
return true, nil
}
case string:
if vt == "avc1" {
return true, nil
}
}
return false, fmt.Errorf("unsupported video codec: %v", v)
}()
if err != nil {
return nil, nil, err
}
hasAudio, err := func() (bool, error) {
v, ok := md.GetV("audiocodecid")
if !ok {
return false, nil
}
switch vt := v.(type) {
case float64:
switch vt {
case 0:
return false, nil
case message.CodecMPEG2Audio:
audioTrack = &formats.MPEG2Audio{}
return true, nil
case message.CodecMPEG4Audio:
return true, nil
}
case string:
if vt == "mp4a" {
return true, nil
}
}
return false, fmt.Errorf("unsupported audio codec %v", v)
}()
if err != nil {
return nil, nil, err
}
if !hasVideo && !hasAudio {
return nil, nil, errEmptyMetadata
}
for {
if (!hasVideo || videoTrack != nil) &&
(!hasAudio || audioTrack != nil) {
return videoTrack, audioTrack, nil
}
msg, err := c.ReadMessage()
if err != nil {
return nil, nil, err
}
switch tmsg := msg.(type) {
case *message.MsgVideo:
if !hasVideo {
return nil, nil, fmt.Errorf("unexpected video packet")
}
if videoTrack == nil {
if tmsg.Type == message.MsgVideoTypeConfig {
videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
}
} else if tmsg.Type == message.MsgVideoTypeAU && tmsg.IsKeyFrame {
nalus, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return nil, nil, err
}
var h265VPS []byte
var h265SPS []byte
var h265PPS []byte
for _, nalu := range nalus {
typ := h265.NALUType((nalu[0] >> 1) & 0b111111)
switch typ {
case h265.NALUType_VPS_NUT:
h265VPS = append([]byte(nil), nalu...)
case h265.NALUType_SPS_NUT:
h265SPS = append([]byte(nil), nalu...)
case h265.NALUType_PPS_NUT:
h265PPS = append([]byte(nil), nalu...)
}
}
if h265VPS != nil && h265SPS != nil && h265PPS != nil {
videoTrack = &formats.H265{
PayloadTyp: 96,
VPS: h265VPS,
SPS: h265SPS,
PPS: h265PPS,
}
}
}
}
case *message.MsgAudio:
if !hasAudio {
return nil, nil, fmt.Errorf("unexpected audio packet")
}
if audioTrack == nil {
if tmsg.Codec == message.CodecMPEG4Audio && tmsg.AACType == message.MsgAudioAACTypeConfig {
audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
}
}
}
}
}
}
func (c *Conn) readTracksFromMessages(msg message.Message) (formats.Format, *formats.MPEG4Audio, error) {
var startTime *time.Duration
var videoTrack formats.Format
var audioTrack *formats.MPEG4Audio
// analyze 1 second of packets
outer:
for {
switch tmsg := msg.(type) {
case *message.MsgVideo:
if startTime == nil {
v := tmsg.DTS
startTime = &v
}
if tmsg.Type == message.MsgVideoTypeConfig {
if videoTrack == nil {
var err error
videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
}
// stop the analysis if both tracks are found
if videoTrack != nil && audioTrack != nil {
return videoTrack, audioTrack, nil
}
}
}
if (tmsg.DTS - *startTime) >= 1*time.Second {
break outer
}
case *message.MsgAudio:
if startTime == nil {
v := tmsg.DTS
startTime = &v
}
if tmsg.AACType == message.MsgAudioAACTypeConfig {
if audioTrack == nil {
var err error
audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
}
// stop the analysis if both tracks are found
if videoTrack != nil && audioTrack != nil {
return videoTrack, audioTrack, nil
}
}
}
if (tmsg.DTS - *startTime) >= 1*time.Second {
break outer
}
}
var err error
msg, err = c.ReadMessage()
if err != nil {
return nil, nil, err
}
}
if videoTrack == nil && audioTrack == nil {
return nil, nil, fmt.Errorf("no tracks found")
}
return videoTrack, audioTrack, nil
}
// ReadTracks reads track informations.
// It returns the video track and the audio track.
func (c *Conn) ReadTracks() (formats.Format, formats.Format, error) {
msg, err := func() (message.Message, error) {
for {
msg, err := c.ReadMessage()
if err != nil {
return nil, err
}
// skip play start and data start
if cmd, ok := msg.(*message.MsgCommandAMF0); ok && cmd.Name == "onStatus" {
continue
}
// skip RtmpSampleAccess
if data, ok := msg.(*message.MsgDataAMF0); ok && len(data.Payload) >= 1 {
if s, ok := data.Payload[0].(string); ok && s == "|RtmpSampleAccess" {
continue
}
}
return msg, nil
}
}()
if err != nil {
return nil, nil, err
}
if data, ok := msg.(*message.MsgDataAMF0); ok && len(data.Payload) >= 1 {
payload := data.Payload
if s, ok := payload[0].(string); ok && s == "@setDataFrame" {
payload = payload[1:]
}
if len(payload) >= 1 {
if s, ok := payload[0].(string); ok && s == "onMetaData" {
videoTrack, audioTrack, err := c.readTracksFromMetadata(payload[1:])
if err != nil {
if err == errEmptyMetadata {
msg, err := c.ReadMessage()
if err != nil {
return nil, nil, err
}
return c.readTracksFromMessages(msg)
}
return nil, nil, err
}
return videoTrack, audioTrack, nil
}
}
}
return c.readTracksFromMessages(msg)
return tracks.Read(c.mrw)
}
// WriteTracks writes track informations.
func (c *Conn) WriteTracks(videoTrack formats.Format, audioTrack formats.Format) error {
err := c.WriteMessage(&message.MsgDataAMF0{
ChunkStreamID: 4,
MessageStreamID: 0x1000000,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "videodatarate",
V: float64(0),
},
{
K: "videocodecid",
V: func() float64 {
switch videoTrack.(type) {
case *formats.H264:
return message.CodecH264
default:
return 0
}
}(),
},
{
K: "audiodatarate",
V: float64(0),
},
{
K: "audiocodecid",
V: func() float64 {
switch audioTrack.(type) {
case *formats.MPEG2Audio:
return message.CodecMPEG2Audio
case *formats.MPEG4Audio:
return message.CodecMPEG4Audio
default:
return 0
}
}(),
},
},
},
})
if err != nil {
return err
}
if videoTrack, ok := videoTrack.(*formats.H264); ok {
// write decoder config only if SPS and PPS are available.
// if they're not available yet, they're sent later.
if sps, pps := videoTrack.SafeParams(); sps != nil && pps != nil {
buf, _ := h264conf.Conf{
SPS: sps,
PPS: pps,
}.Marshal()
err = c.WriteMessage(&message.MsgVideo{
ChunkStreamID: message.MsgVideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.MsgVideoTypeConfig,
Payload: buf,
})
if err != nil {
return err
}
}
}
if mpeg4audioTrack, ok := audioTrack.(*formats.MPEG4Audio); ok {
enc, err := mpeg4audioTrack.Config.Marshal()
if err != nil {
return err
}
err = c.WriteMessage(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.MsgAudioAACTypeConfig,
Payload: enc,
})
if err != nil {
return err
}
}
return nil
return tracks.Write(c.mrw, videoTrack, audioTrack)
}

992
internal/rtmp/conn_test.go

File diff suppressed because it is too large Load Diff

11
internal/rtmp/message/msg_acknowledge.go → internal/rtmp/message/acknowledge.go

@ -3,17 +3,16 @@ package message //nolint:dupl @@ -3,17 +3,16 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgAcknowledge is an acknowledgement message.
type MsgAcknowledge struct {
// Acknowledge is an acknowledgement message.
type Acknowledge struct {
Value uint32
}
// Unmarshal implements Message.
func (m *MsgAcknowledge) Unmarshal(raw *rawmessage.Message) error {
func (m *Acknowledge) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
@ -28,7 +27,7 @@ func (m *MsgAcknowledge) Unmarshal(raw *rawmessage.Message) error { @@ -28,7 +27,7 @@ func (m *MsgAcknowledge) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m *MsgAcknowledge) Marshal() (*rawmessage.Message, error) {
func (m *Acknowledge) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 4)
buf[0] = byte(m.Value >> 24)
@ -38,7 +37,7 @@ func (m *MsgAcknowledge) Marshal() (*rawmessage.Message, error) { @@ -38,7 +37,7 @@ func (m *MsgAcknowledge) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeAcknowledge,
Type: uint8(TypeAcknowledge),
Body: buf,
}, nil
}

31
internal/rtmp/message/msg_audio.go → internal/rtmp/message/audio.go

@ -4,13 +4,12 @@ import ( @@ -4,13 +4,12 @@ import (
"fmt"
"time"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
const (
// MsgAudioChunkStreamID is the chunk stream ID that is usually used to send MsgAudio{}
MsgAudioChunkStreamID = 4
// AudioChunkStreamID is the chunk stream ID that is usually used to send Audio{}
AudioChunkStreamID = 4
)
// supported audio codecs
@ -19,17 +18,17 @@ const ( @@ -19,17 +18,17 @@ const (
CodecMPEG4Audio = 10
)
// MsgAudioAACType is the AAC type of a MsgAudio.
type MsgAudioAACType uint8
// AudioAACType is the AAC type of a Audio.
type AudioAACType uint8
// MsgAudioAACType values.
// AudioAACType values.
const (
MsgAudioAACTypeConfig MsgAudioAACType = 0
MsgAudioAACTypeAU MsgAudioAACType = 1
AudioAACTypeConfig AudioAACType = 0
AudioAACTypeAU AudioAACType = 1
)
// MsgAudio is an audio message.
type MsgAudio struct {
// Audio is an audio message.
type Audio struct {
ChunkStreamID byte
DTS time.Duration
MessageStreamID uint32
@ -37,12 +36,12 @@ type MsgAudio struct { @@ -37,12 +36,12 @@ type MsgAudio struct {
Rate uint8
Depth uint8
Channels uint8
AACType MsgAudioAACType // only for CodecMPEG4Audio
AACType AudioAACType // only for CodecMPEG4Audio
Payload []byte
}
// Unmarshal implements Message.
func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error {
func (m *Audio) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.DTS = raw.Timestamp
m.MessageStreamID = raw.MessageStreamID
@ -65,9 +64,9 @@ func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error { @@ -65,9 +64,9 @@ func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error {
if m.Codec == CodecMPEG2Audio {
m.Payload = raw.Body[1:]
} else {
m.AACType = MsgAudioAACType(raw.Body[1])
m.AACType = AudioAACType(raw.Body[1])
switch m.AACType {
case MsgAudioAACTypeConfig, MsgAudioAACTypeAU:
case AudioAACTypeConfig, AudioAACTypeAU:
default:
return fmt.Errorf("unsupported audio message type: %d", m.AACType)
}
@ -79,7 +78,7 @@ func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error { @@ -79,7 +78,7 @@ func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgAudio) Marshal() (*rawmessage.Message, error) {
func (m Audio) Marshal() (*rawmessage.Message, error) {
var l int
if m.Codec == CodecMPEG2Audio {
l = 1 + len(m.Payload)
@ -100,7 +99,7 @@ func (m MsgAudio) Marshal() (*rawmessage.Message, error) { @@ -100,7 +99,7 @@ func (m MsgAudio) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Timestamp: m.DTS,
Type: chunk.MessageTypeAudio,
Type: uint8(TypeAudio),
MessageStreamID: m.MessageStreamID,
Body: body,
}, nil

11
internal/rtmp/message/msg_command_amf0.go → internal/rtmp/message/command_amf0.go

@ -5,12 +5,11 @@ import ( @@ -5,12 +5,11 @@ import (
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgCommandAMF0 is a AMF0 command message.
type MsgCommandAMF0 struct {
// CommandAMF0 is a AMF0 command message.
type CommandAMF0 struct {
ChunkStreamID byte
MessageStreamID uint32
Name string
@ -19,7 +18,7 @@ type MsgCommandAMF0 struct { @@ -19,7 +18,7 @@ type MsgCommandAMF0 struct {
}
// Unmarshal implements Message.
func (m *MsgCommandAMF0) Unmarshal(raw *rawmessage.Message) error {
func (m *CommandAMF0) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.MessageStreamID = raw.MessageStreamID
@ -50,10 +49,10 @@ func (m *MsgCommandAMF0) Unmarshal(raw *rawmessage.Message) error { @@ -50,10 +49,10 @@ func (m *MsgCommandAMF0) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgCommandAMF0) Marshal() (*rawmessage.Message, error) {
func (m CommandAMF0) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Type: chunk.MessageTypeCommandAMF0,
Type: uint8(TypeCommandAMF0),
MessageStreamID: m.MessageStreamID,
Body: flvio.FillAMF0ValsMalloc(append([]interface{}{
m.Name,

11
internal/rtmp/message/msg_data_amf0.go → internal/rtmp/message/data_amf0.go

@ -3,19 +3,18 @@ package message @@ -3,19 +3,18 @@ package message
import (
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgDataAMF0 is a AMF0 data message.
type MsgDataAMF0 struct {
// DataAMF0 is a AMF0 data message.
type DataAMF0 struct {
ChunkStreamID byte
MessageStreamID uint32
Payload []interface{}
}
// Unmarshal implements Message.
func (m *MsgDataAMF0) Unmarshal(raw *rawmessage.Message) error {
func (m *DataAMF0) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.MessageStreamID = raw.MessageStreamID
@ -29,10 +28,10 @@ func (m *MsgDataAMF0) Unmarshal(raw *rawmessage.Message) error { @@ -29,10 +28,10 @@ func (m *MsgDataAMF0) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgDataAMF0) Marshal() (*rawmessage.Message, error) {
func (m DataAMF0) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Type: chunk.MessageTypeDataAMF0,
Type: uint8(TypeDataAMF0),
MessageStreamID: m.MessageStreamID,
Body: flvio.FillAMF0ValsMalloc(m.Payload),
}, nil

71
internal/rtmp/message/extended_coded_frames.go

@ -0,0 +1,71 @@ @@ -0,0 +1,71 @@
package message
import (
"fmt"
"time"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// ExtendedCodedFrames is a CodedFrames extended message.
type ExtendedCodedFrames struct {
ChunkStreamID byte
DTS time.Duration
MessageStreamID uint32
FourCC [4]byte
PTSDelta time.Duration
Payload []byte
}
// Unmarshal implements Message.
func (m *ExtendedCodedFrames) Unmarshal(raw *rawmessage.Message) error {
if len(raw.Body) < 8 {
return fmt.Errorf("not enough bytes")
}
m.ChunkStreamID = raw.ChunkStreamID
m.DTS = raw.Timestamp
m.MessageStreamID = raw.MessageStreamID
copy(m.FourCC[:], raw.Body[1:5])
if m.FourCC == FourCCHEVC {
m.PTSDelta = time.Duration(uint32(raw.Body[5])<<16|uint32(raw.Body[6])<<8|uint32(raw.Body[7])) * time.Millisecond
m.Payload = raw.Body[8:]
} else {
m.Payload = raw.Body[5:]
}
return nil
}
// Marshal implements Message.
func (m ExtendedCodedFrames) Marshal() (*rawmessage.Message, error) {
var l int
if m.FourCC == FourCCHEVC {
l = 8 + len(m.Payload)
} else {
l = 5 + len(m.Payload)
}
body := make([]byte, l)
body[0] = 0b10000000 | byte(ExtendedTypeCodedFrames)
copy(body[1:5], m.FourCC[:])
if m.FourCC == FourCCHEVC {
tmp := uint32(m.PTSDelta / time.Millisecond)
body[5] = uint8(tmp >> 16)
body[6] = uint8(tmp >> 8)
body[7] = uint8(tmp)
copy(body[8:], m.Payload)
} else {
copy(body[5:], m.Payload)
}
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Timestamp: m.DTS,
Type: uint8(TypeVideo),
MessageStreamID: m.MessageStreamID,
Body: body,
}, nil
}

44
internal/rtmp/message/extended_frames_x.go

@ -0,0 +1,44 @@ @@ -0,0 +1,44 @@
package message
import (
"time"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// ExtendedFramesX is a FramesX extended message.
type ExtendedFramesX struct {
ChunkStreamID byte
DTS time.Duration
MessageStreamID uint32
FourCC [4]byte
Payload []byte
}
// Unmarshal implements Message.
func (m *ExtendedFramesX) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.DTS = raw.Timestamp
m.MessageStreamID = raw.MessageStreamID
copy(m.FourCC[:], raw.Body[1:5])
m.Payload = raw.Body[5:]
return nil
}
// Marshal implements Message.
func (m ExtendedFramesX) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 5+len(m.Payload))
body[0] = 0b10000000 | byte(ExtendedTypeFramesX)
copy(body[1:5], m.FourCC[:])
copy(body[5:], m.Payload)
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Timestamp: m.DTS,
Type: uint8(TypeVideo),
MessageStreamID: m.MessageStreamID,
Body: body,
}, nil
}

24
internal/rtmp/message/extended_metadata.go

@ -0,0 +1,24 @@ @@ -0,0 +1,24 @@
package message
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// ExtendedMetadata is a metadata extended message.
type ExtendedMetadata struct {
FourCC [4]byte
}
// Unmarshal implements Message.
func (m *ExtendedMetadata) Unmarshal(raw *rawmessage.Message) error {
copy(m.FourCC[:], raw.Body[1:5])
return fmt.Errorf("ExtendedMetadata is not implemented yet")
}
// Marshal implements Message.
func (m ExtendedMetadata) Marshal() (*rawmessage.Message, error) {
return nil, fmt.Errorf("TODO")
}

24
internal/rtmp/message/extended_mpeg2ts_sequence_start.go

@ -0,0 +1,24 @@ @@ -0,0 +1,24 @@
package message
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// ExtendedMPEG2TSSequenceStart is a MPEG2-TS sequence start extended message.
type ExtendedMPEG2TSSequenceStart struct {
FourCC [4]byte
}
// Unmarshal implements Message.
func (m *ExtendedMPEG2TSSequenceStart) Unmarshal(raw *rawmessage.Message) error {
copy(m.FourCC[:], raw.Body[1:5])
return fmt.Errorf("ExtendedMPEG2TSSequenceStart is not implemented yet")
}
// Marshal implements Message.
func (m ExtendedMPEG2TSSequenceStart) Marshal() (*rawmessage.Message, error) {
return nil, fmt.Errorf("TODO")
}

28
internal/rtmp/message/extended_sequence_end.go

@ -0,0 +1,28 @@ @@ -0,0 +1,28 @@
package message
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// ExtendedSequenceEnd is a sequence end extended message.
type ExtendedSequenceEnd struct {
FourCC [4]byte
}
// Unmarshal implements Message.
func (m *ExtendedSequenceEnd) Unmarshal(raw *rawmessage.Message) error {
if len(raw.Body) != 5 {
return fmt.Errorf("invalid body size")
}
copy(m.FourCC[:], raw.Body[1:5])
return nil
}
// Marshal implements Message.
func (m ExtendedSequenceEnd) Marshal() (*rawmessage.Message, error) {
return nil, fmt.Errorf("TODO")
}

26
internal/rtmp/message/extended_sequence_start.go

@ -0,0 +1,26 @@ @@ -0,0 +1,26 @@
package message
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// ExtendedSequenceStart is a sequence start extended message.
type ExtendedSequenceStart struct {
FourCC [4]byte
Config []byte
}
// Unmarshal implements Message.
func (m *ExtendedSequenceStart) Unmarshal(raw *rawmessage.Message) error {
copy(m.FourCC[:], raw.Body[1:5])
m.Config = raw.Body[5:]
return nil
}
// Marshal implements Message.
func (m ExtendedSequenceStart) Marshal() (*rawmessage.Message, error) {
return nil, fmt.Errorf("TODO")
}

60
internal/rtmp/message/message.go

@ -10,6 +10,66 @@ const ( @@ -10,6 +10,66 @@ const (
ControlChunkStreamID = 2
)
// Type is a message type.
type Type byte
// message types.
const (
TypeSetChunkSize Type = 1
TypeAbortMessage Type = 2
TypeAcknowledge Type = 3
TypeSetWindowAckSize Type = 5
TypeSetPeerBandwidth Type = 6
TypeUserControl Type = 4
TypeCommandAMF3 Type = 17
TypeCommandAMF0 Type = 20
TypeDataAMF3 Type = 15
TypeDataAMF0 Type = 18
TypeAudio Type = 8
TypeVideo Type = 9
)
// UserControlType is a user control type.
type UserControlType uint16
// user control types.
const (
UserControlTypeStreamBegin UserControlType = 0
UserControlTypeStreamEOF UserControlType = 1
UserControlTypeStreamDry UserControlType = 2
UserControlTypeSetBufferLength UserControlType = 3
UserControlTypeStreamIsRecorded UserControlType = 4
UserControlTypePingRequest UserControlType = 6
UserControlTypePingResponse UserControlType = 7
)
// ExtendedType is a message extended type.
type ExtendedType uint8
// message extended types.
const (
ExtendedTypeSequenceStart ExtendedType = 0
ExtendedTypeCodedFrames ExtendedType = 1
ExtendedTypeSequenceEnd ExtendedType = 2
ExtendedTypeFramesX ExtendedType = 3
ExtendedTypeMetadata ExtendedType = 4
ExtendedTypeMPEG2TSSequenceStart ExtendedType = 5
)
// FourCC is an identifier of a video codec.
type FourCC [4]byte
// video codec identifiers.
var (
FourCCAV1 FourCC = [4]byte{'a', 'v', '0', '1'}
FourCCVP9 FourCC = [4]byte{'v', 'p', '0', '9'}
FourCCHEVC FourCC = [4]byte{'h', 'v', 'c', '1'}
)
// Message is a message.
type Message interface {
Unmarshal(*rawmessage.Message) error

12
internal/rtmp/message/msg_usercontrol.go

@ -1,12 +0,0 @@ @@ -1,12 +0,0 @@
package message
// user control types.
const (
UserControlTypeStreamBegin = 0
UserControlTypeStreamEOF = 1
UserControlTypeStreamDry = 2
UserControlTypeSetBufferLength = 3
UserControlTypeStreamIsRecorded = 4
UserControlTypePingRequest = 6
UserControlTypePingResponse = 7
)

105
internal/rtmp/message/reader.go

@ -4,70 +4,109 @@ import ( @@ -4,70 +4,109 @@ import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/bytecounter"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
func allocateMessage(raw *rawmessage.Message) (Message, error) {
switch raw.Type {
case chunk.MessageTypeSetChunkSize:
return &MsgSetChunkSize{}, nil
switch Type(raw.Type) {
case TypeSetChunkSize:
return &SetChunkSize{}, nil
case chunk.MessageTypeAcknowledge:
return &MsgAcknowledge{}, nil
case TypeAcknowledge:
return &Acknowledge{}, nil
case chunk.MessageTypeSetWindowAckSize:
return &MsgSetWindowAckSize{}, nil
case TypeSetWindowAckSize:
return &SetWindowAckSize{}, nil
case chunk.MessageTypeSetPeerBandwidth:
return &MsgSetPeerBandwidth{}, nil
case TypeSetPeerBandwidth:
return &SetPeerBandwidth{}, nil
case chunk.MessageTypeUserControl:
case TypeUserControl:
if len(raw.Body) < 2 {
return nil, fmt.Errorf("invalid body size")
return nil, fmt.Errorf("not enough bytes")
}
subType := uint16(raw.Body[0])<<8 | uint16(raw.Body[1])
switch subType {
userControlType := UserControlType(uint16(raw.Body[0])<<8 | uint16(raw.Body[1]))
switch userControlType {
case UserControlTypeStreamBegin:
return &MsgUserControlStreamBegin{}, nil
return &UserControlStreamBegin{}, nil
case UserControlTypeStreamEOF:
return &MsgUserControlStreamEOF{}, nil
return &UserControlStreamEOF{}, nil
case UserControlTypeStreamDry:
return &MsgUserControlStreamDry{}, nil
return &UserControlStreamDry{}, nil
case UserControlTypeSetBufferLength:
return &MsgUserControlSetBufferLength{}, nil
return &UserControlSetBufferLength{}, nil
case UserControlTypeStreamIsRecorded:
return &MsgUserControlStreamIsRecorded{}, nil
return &UserControlStreamIsRecorded{}, nil
case UserControlTypePingRequest:
return &MsgUserControlPingRequest{}, nil
return &UserControlPingRequest{}, nil
case UserControlTypePingResponse:
return &MsgUserControlPingResponse{}, nil
return &UserControlPingResponse{}, nil
default:
return nil, fmt.Errorf("invalid user control type")
return nil, fmt.Errorf("invalid user control type: %v", userControlType)
}
case chunk.MessageTypeCommandAMF0:
return &MsgCommandAMF0{}, nil
case TypeCommandAMF0:
return &CommandAMF0{}, nil
case TypeDataAMF0:
return &DataAMF0{}, nil
case TypeAudio:
return &Audio{}, nil
case TypeVideo:
if len(raw.Body) < 5 {
return nil, fmt.Errorf("not enough bytes")
}
if (raw.Body[0] & 0b10000000) != 0 {
var fourCC [4]byte
copy(fourCC[:], raw.Body[1:5])
switch fourCC {
case FourCCAV1, FourCCVP9, FourCCHEVC:
default:
return nil, fmt.Errorf("invalid fourCC: %v", fourCC)
}
case chunk.MessageTypeDataAMF0:
return &MsgDataAMF0{}, nil
extendedType := ExtendedType(raw.Body[0] & 0x0F)
case chunk.MessageTypeAudio:
return &MsgAudio{}, nil
switch extendedType {
case ExtendedTypeSequenceStart:
return &ExtendedSequenceStart{}, nil
case chunk.MessageTypeVideo:
return &MsgVideo{}, nil
case ExtendedTypeCodedFrames:
return &ExtendedCodedFrames{}, nil
case ExtendedTypeSequenceEnd:
return &ExtendedSequenceEnd{}, nil
case ExtendedTypeFramesX:
return &ExtendedFramesX{}, nil
case ExtendedTypeMetadata:
return &ExtendedMetadata{}, nil
case ExtendedTypeMPEG2TSSequenceStart:
return &ExtendedMPEG2TSSequenceStart{}, nil
default:
return nil, fmt.Errorf("invalid extended type: %v", extendedType)
}
}
return &Video{}, nil
default:
return nil, fmt.Errorf("unhandled message type (%v)", raw.Type)
return nil, fmt.Errorf("invalid message type: %v", raw.Type)
}
}
@ -101,10 +140,10 @@ func (r *Reader) Read() (Message, error) { @@ -101,10 +140,10 @@ func (r *Reader) Read() (Message, error) {
}
switch tmsg := msg.(type) {
case *MsgSetChunkSize:
case *SetChunkSize:
r.r.SetChunkSize(tmsg.Value)
case *MsgSetWindowAckSize:
case *SetWindowAckSize:
r.r.SetWindowAckSize(tmsg.Value)
}

73
internal/rtmp/message/reader_test.go

@ -18,7 +18,7 @@ var readWriterCases = []struct { @@ -18,7 +18,7 @@ var readWriterCases = []struct {
}{
{
"acknowledge",
&MsgAcknowledge{
&Acknowledge{
Value: 45953968,
},
[]byte{
@ -28,7 +28,7 @@ var readWriterCases = []struct { @@ -28,7 +28,7 @@ var readWriterCases = []struct {
},
{
"audio mpeg2",
&MsgAudio{
&Audio{
ChunkStreamID: 7,
DTS: 6013806 * time.Millisecond,
MessageStreamID: 4534543,
@ -45,7 +45,7 @@ var readWriterCases = []struct { @@ -45,7 +45,7 @@ var readWriterCases = []struct {
},
{
"audio mpeg4",
&MsgAudio{
&Audio{
ChunkStreamID: 7,
DTS: 6013806 * time.Millisecond,
MessageStreamID: 4534543,
@ -53,7 +53,7 @@ var readWriterCases = []struct { @@ -53,7 +53,7 @@ var readWriterCases = []struct {
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: MsgAudioAACTypeAU,
AACType: AudioAACTypeAU,
Payload: []byte{0x5A, 0xC0, 0x77, 0x40},
},
[]byte{
@ -64,7 +64,7 @@ var readWriterCases = []struct { @@ -64,7 +64,7 @@ var readWriterCases = []struct {
},
{
"command amf0",
&MsgCommandAMF0{
&CommandAMF0{
ChunkStreamID: 3,
MessageStreamID: 345243,
Name: "i8yythrergre",
@ -90,7 +90,7 @@ var readWriterCases = []struct { @@ -90,7 +90,7 @@ var readWriterCases = []struct {
},
{
"data amf0",
&MsgDataAMF0{
&DataAMF0{
ChunkStreamID: 3,
MessageStreamID: 345243,
Payload: []interface{}{
@ -108,7 +108,7 @@ var readWriterCases = []struct { @@ -108,7 +108,7 @@ var readWriterCases = []struct {
},
{
"set chunk size",
&MsgSetChunkSize{
&SetChunkSize{
Value: 10000,
},
[]byte{
@ -118,7 +118,7 @@ var readWriterCases = []struct { @@ -118,7 +118,7 @@ var readWriterCases = []struct {
},
{
"set peer bandwidth",
&MsgSetChunkSize{
&SetChunkSize{
Value: 10000,
},
[]byte{
@ -128,7 +128,7 @@ var readWriterCases = []struct { @@ -128,7 +128,7 @@ var readWriterCases = []struct {
},
{
"set window ack size",
&MsgSetChunkSize{
&SetChunkSize{
Value: 10000,
},
[]byte{
@ -138,7 +138,7 @@ var readWriterCases = []struct { @@ -138,7 +138,7 @@ var readWriterCases = []struct {
},
{
"user control ping request",
&MsgUserControlPingRequest{
&UserControlPingRequest{
ServerTime: 569834435,
},
[]byte{
@ -149,7 +149,7 @@ var readWriterCases = []struct { @@ -149,7 +149,7 @@ var readWriterCases = []struct {
},
{
"user control ping response",
&MsgUserControlPingResponse{
&UserControlPingResponse{
ServerTime: 569834435,
},
[]byte{
@ -160,7 +160,7 @@ var readWriterCases = []struct { @@ -160,7 +160,7 @@ var readWriterCases = []struct {
},
{
"user control set buffer length",
&MsgUserControlSetBufferLength{
&UserControlSetBufferLength{
StreamID: 35534,
BufferLength: 235345,
},
@ -172,7 +172,7 @@ var readWriterCases = []struct { @@ -172,7 +172,7 @@ var readWriterCases = []struct {
},
{
"user control stream begin",
&MsgUserControlStreamBegin{
&UserControlStreamBegin{
StreamID: 35534,
},
[]byte{
@ -183,7 +183,7 @@ var readWriterCases = []struct { @@ -183,7 +183,7 @@ var readWriterCases = []struct {
},
{
"user control stream dry",
&MsgUserControlStreamDry{
&UserControlStreamDry{
StreamID: 35534,
},
[]byte{
@ -194,7 +194,7 @@ var readWriterCases = []struct { @@ -194,7 +194,7 @@ var readWriterCases = []struct {
},
{
"user control stream eof",
&MsgUserControlStreamEOF{
&UserControlStreamEOF{
StreamID: 35534,
},
[]byte{
@ -205,7 +205,7 @@ var readWriterCases = []struct { @@ -205,7 +205,7 @@ var readWriterCases = []struct {
},
{
"user control stream is recorded",
&MsgUserControlStreamIsRecorded{
&UserControlStreamIsRecorded{
StreamID: 35534,
},
[]byte{
@ -216,20 +216,51 @@ var readWriterCases = []struct { @@ -216,20 +216,51 @@ var readWriterCases = []struct {
},
{
"video",
&MsgVideo{
&Video{
ChunkStreamID: 6,
DTS: 2543534 * time.Millisecond,
MessageStreamID: 0x1000000,
Codec: CodecH264,
IsKeyFrame: true,
Type: MsgVideoTypeConfig,
Type: VideoTypeConfig,
PTSDelta: 10 * time.Millisecond,
Payload: []byte{0x01, 0x02, 0x03},
},
[]byte{
0x6, 0x26, 0xcf, 0xae, 0x0, 0x0, 0x8, 0x9,
0x1, 0x0, 0x0, 0x0, 0x17, 0x0, 0x0, 0x0,
0xa, 0x1, 0x2, 0x3,
0x06, 0x26, 0xcf, 0xae, 0x00, 0x00, 0x08, 0x09,
0x01, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00,
0x0a, 0x01, 0x02, 0x03,
},
},
{
"extended coded frames",
&ExtendedCodedFrames{
ChunkStreamID: 4,
DTS: 15100 * time.Millisecond,
MessageStreamID: 0x1000000,
FourCC: FourCCHEVC,
PTSDelta: 30 * time.Millisecond,
Payload: []byte{0x01, 0x02, 0x03},
},
[]byte{
0x04, 0x00, 0x3a, 0xfc, 0x00, 0x00, 0x0b, 0x09,
0x01, 0x00, 0x00, 0x00, 0x81, 0x68, 0x76, 0x63,
0x31, 0x00, 0x00, 0x1e, 0x01, 0x02, 0x03,
},
},
{
"extended frames x",
&ExtendedFramesX{
ChunkStreamID: 4,
DTS: 15100 * time.Millisecond,
MessageStreamID: 0x1000000,
FourCC: FourCCHEVC,
Payload: []byte{0x01, 0x02, 0x03},
},
[]byte{
0x04, 0x00, 0x3a, 0xfc, 0x00, 0x00, 0x08, 0x09,
0x01, 0x00, 0x00, 0x00, 0x83, 0x68, 0x76, 0x63,
0x31, 0x01, 0x02, 0x03,
},
},
}

8
internal/rtmp/message/readwriter.go

@ -15,7 +15,7 @@ func NewReadWriter(bc *bytecounter.ReadWriter, checkAcknowledge bool) *ReadWrite @@ -15,7 +15,7 @@ func NewReadWriter(bc *bytecounter.ReadWriter, checkAcknowledge bool) *ReadWrite
w := NewWriter(bc.Writer, checkAcknowledge)
r := NewReader(bc.Reader, func(count uint32) error {
return w.Write(&MsgAcknowledge{
return w.Write(&Acknowledge{
Value: count,
})
})
@ -34,11 +34,11 @@ func (rw *ReadWriter) Read() (Message, error) { @@ -34,11 +34,11 @@ func (rw *ReadWriter) Read() (Message, error) {
}
switch tmsg := msg.(type) {
case *MsgAcknowledge:
case *Acknowledge:
rw.w.SetAcknowledgeValue(tmsg.Value)
case *MsgUserControlPingRequest:
rw.w.Write(&MsgUserControlPingResponse{
case *UserControlPingRequest:
rw.w.Write(&UserControlPingResponse{
ServerTime: tmsg.ServerTime,
})
}

6
internal/rtmp/message/readwriter_test.go

@ -31,7 +31,7 @@ func TestReadWriterAcknowledge(t *testing.T) { @@ -31,7 +31,7 @@ func TestReadWriterAcknowledge(t *testing.T) {
Reader: &buf2,
Writer: &buf1,
}), true)
err := rw1.Write(&MsgAcknowledge{
err := rw1.Write(&Acknowledge{
Value: 7863534,
})
require.NoError(t, err)
@ -52,7 +52,7 @@ func TestReadWriterPing(t *testing.T) { @@ -52,7 +52,7 @@ func TestReadWriterPing(t *testing.T) {
Reader: &buf2,
Writer: &buf1,
}), true)
err := rw1.Write(&MsgUserControlPingRequest{
err := rw1.Write(&UserControlPingRequest{
ServerTime: 143424312,
})
require.NoError(t, err)
@ -66,7 +66,7 @@ func TestReadWriterPing(t *testing.T) { @@ -66,7 +66,7 @@ func TestReadWriterPing(t *testing.T) {
msg, err := rw1.Read()
require.NoError(t, err)
require.Equal(t, &MsgUserControlPingResponse{
require.Equal(t, &UserControlPingResponse{
ServerTime: 143424312,
}, msg)
}

13
internal/rtmp/message/msg_setchunksize.go → internal/rtmp/message/set_chunk_size.go

@ -3,23 +3,22 @@ package message //nolint:dupl @@ -3,23 +3,22 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgSetChunkSize is a set chunk size message.
type MsgSetChunkSize struct {
// SetChunkSize is a set chunk size message.
type SetChunkSize struct {
Value uint32
}
// Unmarshal implements Message.
func (m *MsgSetChunkSize) Unmarshal(raw *rawmessage.Message) error {
func (m *SetChunkSize) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 4 {
return fmt.Errorf("unexpected body size")
return fmt.Errorf("invalid body size")
}
m.Value = uint32(raw.Body[0])<<24 | uint32(raw.Body[1])<<16 | uint32(raw.Body[2])<<8 | uint32(raw.Body[3])
@ -28,7 +27,7 @@ func (m *MsgSetChunkSize) Unmarshal(raw *rawmessage.Message) error { @@ -28,7 +27,7 @@ func (m *MsgSetChunkSize) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m *MsgSetChunkSize) Marshal() (*rawmessage.Message, error) {
func (m *SetChunkSize) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 4)
buf[0] = byte(m.Value >> 24)
@ -38,7 +37,7 @@ func (m *MsgSetChunkSize) Marshal() (*rawmessage.Message, error) { @@ -38,7 +37,7 @@ func (m *MsgSetChunkSize) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeSetChunkSize,
Type: uint8(TypeSetChunkSize),
Body: buf,
}, nil
}

13
internal/rtmp/message/msg_setpeerbandwidth.go → internal/rtmp/message/set_peer_bandwidth.go

@ -3,24 +3,23 @@ package message //nolint:dupl @@ -3,24 +3,23 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgSetPeerBandwidth is a set peer bandwidth message.
type MsgSetPeerBandwidth struct {
// SetPeerBandwidth is a set peer bandwidth message.
type SetPeerBandwidth struct {
Value uint32
Type byte
}
// Unmarshal implements Message.
func (m *MsgSetPeerBandwidth) Unmarshal(raw *rawmessage.Message) error {
func (m *SetPeerBandwidth) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 5 {
return fmt.Errorf("unexpected body size")
return fmt.Errorf("invalid body size")
}
m.Value = uint32(raw.Body[0])<<24 | uint32(raw.Body[1])<<16 | uint32(raw.Body[2])<<8 | uint32(raw.Body[3])
@ -30,7 +29,7 @@ func (m *MsgSetPeerBandwidth) Unmarshal(raw *rawmessage.Message) error { @@ -30,7 +29,7 @@ func (m *MsgSetPeerBandwidth) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m *MsgSetPeerBandwidth) Marshal() (*rawmessage.Message, error) {
func (m *SetPeerBandwidth) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 5)
buf[0] = byte(m.Value >> 24)
@ -41,7 +40,7 @@ func (m *MsgSetPeerBandwidth) Marshal() (*rawmessage.Message, error) { @@ -41,7 +40,7 @@ func (m *MsgSetPeerBandwidth) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: uint8(TypeSetPeerBandwidth),
Body: buf,
}, nil
}

13
internal/rtmp/message/msg_setwindowacksize.go → internal/rtmp/message/set_window_ack_size.go

@ -3,23 +3,22 @@ package message //nolint:dupl @@ -3,23 +3,22 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgSetWindowAckSize is a set window acknowledgement message.
type MsgSetWindowAckSize struct {
// SetWindowAckSize is a set window acknowledgement message.
type SetWindowAckSize struct {
Value uint32
}
// Unmarshal implements Message.
func (m *MsgSetWindowAckSize) Unmarshal(raw *rawmessage.Message) error {
func (m *SetWindowAckSize) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 4 {
return fmt.Errorf("unexpected body size")
return fmt.Errorf("invalid body size")
}
m.Value = uint32(raw.Body[0])<<24 | uint32(raw.Body[1])<<16 | uint32(raw.Body[2])<<8 | uint32(raw.Body[3])
@ -28,7 +27,7 @@ func (m *MsgSetWindowAckSize) Unmarshal(raw *rawmessage.Message) error { @@ -28,7 +27,7 @@ func (m *MsgSetWindowAckSize) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m *MsgSetWindowAckSize) Marshal() (*rawmessage.Message, error) {
func (m *SetWindowAckSize) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 4)
buf[0] = byte(m.Value >> 24)
@ -38,7 +37,7 @@ func (m *MsgSetWindowAckSize) Marshal() (*rawmessage.Message, error) { @@ -38,7 +37,7 @@ func (m *MsgSetWindowAckSize) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeSetWindowAckSize,
Type: uint8(TypeSetWindowAckSize),
Body: buf,
}, nil
}

11
internal/rtmp/message/msg_usercontrol_pingrequest.go → internal/rtmp/message/user_control_ping_request.go

@ -3,17 +3,16 @@ package message //nolint:dupl @@ -3,17 +3,16 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgUserControlPingRequest is a user control message.
type MsgUserControlPingRequest struct {
// UserControlPingRequest is a user control message.
type UserControlPingRequest struct {
ServerTime uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlPingRequest) Unmarshal(raw *rawmessage.Message) error {
func (m *UserControlPingRequest) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
@ -28,7 +27,7 @@ func (m *MsgUserControlPingRequest) Unmarshal(raw *rawmessage.Message) error { @@ -28,7 +27,7 @@ func (m *MsgUserControlPingRequest) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgUserControlPingRequest) Marshal() (*rawmessage.Message, error) {
func (m UserControlPingRequest) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 6)
buf[0] = byte(UserControlTypePingRequest >> 8)
@ -40,7 +39,7 @@ func (m MsgUserControlPingRequest) Marshal() (*rawmessage.Message, error) { @@ -40,7 +39,7 @@ func (m MsgUserControlPingRequest) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Type: uint8(TypeUserControl),
Body: buf,
}, nil
}

11
internal/rtmp/message/msg_usercontrol_pingresponse.go → internal/rtmp/message/user_control_ping_response.go

@ -3,17 +3,16 @@ package message //nolint:dupl @@ -3,17 +3,16 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgUserControlPingResponse is a user control message.
type MsgUserControlPingResponse struct {
// UserControlPingResponse is a user control message.
type UserControlPingResponse struct {
ServerTime uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlPingResponse) Unmarshal(raw *rawmessage.Message) error {
func (m *UserControlPingResponse) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
@ -28,7 +27,7 @@ func (m *MsgUserControlPingResponse) Unmarshal(raw *rawmessage.Message) error { @@ -28,7 +27,7 @@ func (m *MsgUserControlPingResponse) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgUserControlPingResponse) Marshal() (*rawmessage.Message, error) {
func (m UserControlPingResponse) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 6)
buf[0] = byte(UserControlTypePingResponse >> 8)
@ -40,7 +39,7 @@ func (m MsgUserControlPingResponse) Marshal() (*rawmessage.Message, error) { @@ -40,7 +39,7 @@ func (m MsgUserControlPingResponse) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Type: uint8(TypeUserControl),
Body: buf,
}, nil
}

11
internal/rtmp/message/msg_usercontrol_setbufferlength.go → internal/rtmp/message/user_control_set_buffer_length.go

@ -3,18 +3,17 @@ package message //nolint:dupl @@ -3,18 +3,17 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgUserControlSetBufferLength is a user control message.
type MsgUserControlSetBufferLength struct {
// UserControlSetBufferLength is a user control message.
type UserControlSetBufferLength struct {
StreamID uint32
BufferLength uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlSetBufferLength) Unmarshal(raw *rawmessage.Message) error {
func (m *UserControlSetBufferLength) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
@ -30,7 +29,7 @@ func (m *MsgUserControlSetBufferLength) Unmarshal(raw *rawmessage.Message) error @@ -30,7 +29,7 @@ func (m *MsgUserControlSetBufferLength) Unmarshal(raw *rawmessage.Message) error
}
// Marshal implements Message.
func (m MsgUserControlSetBufferLength) Marshal() (*rawmessage.Message, error) {
func (m UserControlSetBufferLength) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 10)
buf[0] = byte(UserControlTypeSetBufferLength >> 8)
@ -46,7 +45,7 @@ func (m MsgUserControlSetBufferLength) Marshal() (*rawmessage.Message, error) { @@ -46,7 +45,7 @@ func (m MsgUserControlSetBufferLength) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Type: uint8(TypeUserControl),
Body: buf,
}, nil
}

11
internal/rtmp/message/msg_usercontrol_streambegin.go → internal/rtmp/message/user_control_stream_begin.go

@ -3,17 +3,16 @@ package message //nolint:dupl @@ -3,17 +3,16 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgUserControlStreamBegin is a user control message.
type MsgUserControlStreamBegin struct {
// UserControlStreamBegin is a user control message.
type UserControlStreamBegin struct {
StreamID uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlStreamBegin) Unmarshal(raw *rawmessage.Message) error {
func (m *UserControlStreamBegin) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
@ -28,7 +27,7 @@ func (m *MsgUserControlStreamBegin) Unmarshal(raw *rawmessage.Message) error { @@ -28,7 +27,7 @@ func (m *MsgUserControlStreamBegin) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgUserControlStreamBegin) Marshal() (*rawmessage.Message, error) {
func (m UserControlStreamBegin) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 6)
buf[0] = byte(UserControlTypeStreamBegin >> 8)
@ -40,7 +39,7 @@ func (m MsgUserControlStreamBegin) Marshal() (*rawmessage.Message, error) { @@ -40,7 +39,7 @@ func (m MsgUserControlStreamBegin) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Type: uint8(TypeUserControl),
Body: buf,
}, nil
}

11
internal/rtmp/message/msg_usercontrol_streamdry.go → internal/rtmp/message/user_control_stream_dry.go

@ -3,17 +3,16 @@ package message //nolint:dupl @@ -3,17 +3,16 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgUserControlStreamDry is a user control message.
type MsgUserControlStreamDry struct {
// UserControlStreamDry is a user control message.
type UserControlStreamDry struct {
StreamID uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlStreamDry) Unmarshal(raw *rawmessage.Message) error {
func (m *UserControlStreamDry) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
@ -28,7 +27,7 @@ func (m *MsgUserControlStreamDry) Unmarshal(raw *rawmessage.Message) error { @@ -28,7 +27,7 @@ func (m *MsgUserControlStreamDry) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgUserControlStreamDry) Marshal() (*rawmessage.Message, error) {
func (m UserControlStreamDry) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 6)
buf[0] = byte(UserControlTypeStreamDry >> 8)
@ -40,7 +39,7 @@ func (m MsgUserControlStreamDry) Marshal() (*rawmessage.Message, error) { @@ -40,7 +39,7 @@ func (m MsgUserControlStreamDry) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Type: uint8(TypeUserControl),
Body: buf,
}, nil
}

11
internal/rtmp/message/msg_usercontrol_streameof.go → internal/rtmp/message/user_control_stream_eof.go

@ -3,17 +3,16 @@ package message //nolint:dupl @@ -3,17 +3,16 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgUserControlStreamEOF is a user control message.
type MsgUserControlStreamEOF struct {
// UserControlStreamEOF is a user control message.
type UserControlStreamEOF struct {
StreamID uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlStreamEOF) Unmarshal(raw *rawmessage.Message) error {
func (m *UserControlStreamEOF) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
@ -28,7 +27,7 @@ func (m *MsgUserControlStreamEOF) Unmarshal(raw *rawmessage.Message) error { @@ -28,7 +27,7 @@ func (m *MsgUserControlStreamEOF) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgUserControlStreamEOF) Marshal() (*rawmessage.Message, error) {
func (m UserControlStreamEOF) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 6)
buf[0] = byte(UserControlTypeStreamEOF >> 8)
@ -40,7 +39,7 @@ func (m MsgUserControlStreamEOF) Marshal() (*rawmessage.Message, error) { @@ -40,7 +39,7 @@ func (m MsgUserControlStreamEOF) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Type: uint8(TypeUserControl),
Body: buf,
}, nil
}

11
internal/rtmp/message/msg_usercontrol_streamisrecorded.go → internal/rtmp/message/user_control_stream_is_recorded.go

@ -3,17 +3,16 @@ package message //nolint:dupl @@ -3,17 +3,16 @@ package message //nolint:dupl
import (
"fmt"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
// MsgUserControlStreamIsRecorded is a user control message.
type MsgUserControlStreamIsRecorded struct {
// UserControlStreamIsRecorded is a user control message.
type UserControlStreamIsRecorded struct {
StreamID uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlStreamIsRecorded) Unmarshal(raw *rawmessage.Message) error {
func (m *UserControlStreamIsRecorded) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
@ -28,7 +27,7 @@ func (m *MsgUserControlStreamIsRecorded) Unmarshal(raw *rawmessage.Message) erro @@ -28,7 +27,7 @@ func (m *MsgUserControlStreamIsRecorded) Unmarshal(raw *rawmessage.Message) erro
}
// Marshal implements Message.
func (m MsgUserControlStreamIsRecorded) Marshal() (*rawmessage.Message, error) {
func (m UserControlStreamIsRecorded) Marshal() (*rawmessage.Message, error) {
buf := make([]byte, 6)
buf[0] = byte(UserControlTypeStreamIsRecorded >> 8)
@ -40,7 +39,7 @@ func (m MsgUserControlStreamIsRecorded) Marshal() (*rawmessage.Message, error) { @@ -40,7 +39,7 @@ func (m MsgUserControlStreamIsRecorded) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Type: uint8(TypeUserControl),
Body: buf,
}, nil
}

36
internal/rtmp/message/msg_video.go → internal/rtmp/message/video.go

@ -6,13 +6,12 @@ import ( @@ -6,13 +6,12 @@ import (
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
const (
// MsgVideoChunkStreamID is the chunk stream ID that is usually used to send MsgVideo{}
MsgVideoChunkStreamID = 6
// VideoChunkStreamID is the chunk stream ID that is usually used to send Video{}
VideoChunkStreamID = 6
)
// supported video codecs
@ -20,30 +19,30 @@ const ( @@ -20,30 +19,30 @@ const (
CodecH264 = 7
)
// MsgVideoType is the type of a video message.
type MsgVideoType uint8
// VideoType is the type of a video message.
type VideoType uint8
// MsgVideoType values.
// VideoType values.
const (
MsgVideoTypeConfig MsgVideoType = 0
MsgVideoTypeAU MsgVideoType = 1
MsgVideoTypeEOS MsgVideoType = 2
VideoTypeConfig VideoType = 0
VideoTypeAU VideoType = 1
VideoTypeEOS VideoType = 2
)
// MsgVideo is a video message.
type MsgVideo struct {
// Video is a video message.
type Video struct {
ChunkStreamID byte
DTS time.Duration
MessageStreamID uint32
Codec uint8
IsKeyFrame bool
Type MsgVideoType
Type VideoType
PTSDelta time.Duration
Payload []byte
}
// Unmarshal implements Message.
func (m *MsgVideo) Unmarshal(raw *rawmessage.Message) error {
func (m *Video) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.DTS = raw.Timestamp
m.MessageStreamID = raw.MessageStreamID
@ -61,15 +60,14 @@ func (m *MsgVideo) Unmarshal(raw *rawmessage.Message) error { @@ -61,15 +60,14 @@ func (m *MsgVideo) Unmarshal(raw *rawmessage.Message) error {
return fmt.Errorf("unsupported video codec: %d", m.Codec)
}
m.Type = MsgVideoType(raw.Body[1])
m.Type = VideoType(raw.Body[1])
switch m.Type {
case MsgVideoTypeConfig, MsgVideoTypeAU, MsgVideoTypeEOS:
case VideoTypeConfig, VideoTypeAU, VideoTypeEOS:
default:
return fmt.Errorf("unsupported video message type: %d", m.Type)
}
tmp := uint32(raw.Body[2])<<16 | uint32(raw.Body[3])<<8 | uint32(raw.Body[4])
m.PTSDelta = time.Duration(tmp) * time.Millisecond
m.PTSDelta = time.Duration(uint32(raw.Body[2])<<16|uint32(raw.Body[3])<<8|uint32(raw.Body[4])) * time.Millisecond
m.Payload = raw.Body[5:]
@ -77,7 +75,7 @@ func (m *MsgVideo) Unmarshal(raw *rawmessage.Message) error { @@ -77,7 +75,7 @@ func (m *MsgVideo) Unmarshal(raw *rawmessage.Message) error {
}
// Marshal implements Message.
func (m MsgVideo) Marshal() (*rawmessage.Message, error) {
func (m Video) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 5+len(m.Payload))
if m.IsKeyFrame {
@ -98,7 +96,7 @@ func (m MsgVideo) Marshal() (*rawmessage.Message, error) { @@ -98,7 +96,7 @@ func (m MsgVideo) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Timestamp: m.DTS,
Type: chunk.MessageTypeVideo,
Type: uint8(TypeVideo),
MessageStreamID: m.MessageStreamID,
Body: body,
}, nil

4
internal/rtmp/message/writer.go

@ -35,10 +35,10 @@ func (w *Writer) Write(msg Message) error { @@ -35,10 +35,10 @@ func (w *Writer) Write(msg Message) error {
}
switch tmsg := msg.(type) {
case *MsgSetChunkSize:
case *SetChunkSize:
w.w.SetChunkSize(tmsg.Value)
case *MsgSetWindowAckSize:
case *SetWindowAckSize:
w.w.SetWindowAckSize(tmsg.Value)
}

4
internal/rtmp/rawmessage/message.go

@ -3,15 +3,13 @@ package rawmessage @@ -3,15 +3,13 @@ package rawmessage
import (
"time"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
)
// Message is a raw message.
type Message struct {
ChunkStreamID byte
Timestamp time.Duration
Type chunk.MessageType
Type uint8
MessageStreamID uint32
Body []byte
}

2
internal/rtmp/rawmessage/reader.go

@ -15,7 +15,7 @@ var errMoreChunksNeeded = errors.New("more chunks are needed") @@ -15,7 +15,7 @@ var errMoreChunksNeeded = errors.New("more chunks are needed")
type readerChunkStream struct {
mr *Reader
curTimestamp *uint32
curType *chunk.MessageType
curType *uint8
curMessageStreamID *uint32
curBodyLen *uint32
curBody []byte

30
internal/rtmp/rawmessage/reader_test.go

@ -23,14 +23,14 @@ var cases = []struct { @@ -23,14 +23,14 @@ var cases = []struct {
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 64),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetWindowAckSize,
Type: 5,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 64),
},
@ -39,7 +39,7 @@ var cases = []struct { @@ -39,7 +39,7 @@ var cases = []struct {
&chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
BodyLen: 64,
Body: bytes.Repeat([]byte{0x03}, 64),
@ -47,7 +47,7 @@ var cases = []struct { @@ -47,7 +47,7 @@ var cases = []struct {
&chunk.Chunk1{
ChunkStreamID: 27,
TimestampDelta: 15,
Type: chunk.MessageTypeSetWindowAckSize,
Type: 5,
BodyLen: 64,
Body: bytes.Repeat([]byte{0x04}, 64),
},
@ -63,21 +63,21 @@ var cases = []struct { @@ -63,21 +63,21 @@ var cases = []struct {
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 64),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 64),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x05}, 64),
},
@ -86,7 +86,7 @@ var cases = []struct { @@ -86,7 +86,7 @@ var cases = []struct {
&chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
BodyLen: 64,
Body: bytes.Repeat([]byte{0x03}, 64),
@ -113,28 +113,28 @@ var cases = []struct { @@ -113,28 +113,28 @@ var cases = []struct {
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 190),
},
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 192),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x05}, 192),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x06}, 192),
},
@ -143,7 +143,7 @@ var cases = []struct { @@ -143,7 +143,7 @@ var cases = []struct {
&chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
BodyLen: 190,
Body: bytes.Repeat([]byte{0x03}, 128),
@ -155,7 +155,7 @@ var cases = []struct { @@ -155,7 +155,7 @@ var cases = []struct {
&chunk.Chunk1{
ChunkStreamID: 27,
TimestampDelta: 0,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
BodyLen: 192,
Body: bytes.Repeat([]byte{0x04}, 128),
},
@ -240,7 +240,7 @@ func TestReaderAcknowledge(t *testing.T) { @@ -240,7 +240,7 @@ func TestReaderAcknowledge(t *testing.T) {
buf2, err := chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
BodyLen: 200,
Body: bytes.Repeat([]byte{0x03}, 200),

2
internal/rtmp/rawmessage/writer.go

@ -12,7 +12,7 @@ import ( @@ -12,7 +12,7 @@ import (
type writerChunkStream struct {
mw *Writer
lastMessageStreamID *uint32
lastType *chunk.MessageType
lastType *uint8
lastBodyLen *uint32
lastTimestamp *time.Duration
lastTimestampDelta *time.Duration

4
internal/rtmp/rawmessage/writer_test.go

@ -50,7 +50,7 @@ func TestWriterAcknowledge(t *testing.T) { @@ -50,7 +50,7 @@ func TestWriterAcknowledge(t *testing.T) {
err := w.Write(&Message{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 200),
})
@ -59,7 +59,7 @@ func TestWriterAcknowledge(t *testing.T) { @@ -59,7 +59,7 @@ func TestWriterAcknowledge(t *testing.T) {
err = w.Write(&Message{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 200),
})

37
internal/rtmp/tracks/boxes_av1.go

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
package tracks
import (
gomp4 "github.com/abema/go-mp4"
)
// BoxTypeAv1C returns the box type.
func BoxTypeAv1C() gomp4.BoxType { return gomp4.StrToBoxType("av1C") }
func init() { //nolint:gochecknoinits
gomp4.AddBoxDef(&Av1C{})
}
// Av1C is a Av1C ISO-BMFF box.
type Av1C struct {
gomp4.Box
Marker uint8 `mp4:"0,size=1,const=1"`
Version uint8 `mp4:"1,size=7,const=1"`
SeqProfile uint8 `mp4:"2,size=3"`
SeqLevelIdx0 uint8 `mp4:"3,size=5"`
SeqTier0 uint8 `mp4:"4,size=1"`
HighBitdepth uint8 `mp4:"5,size=1"`
TwelveBit uint8 `mp4:"6,size=1"`
Monochrome uint8 `mp4:"7,size=1"`
ChromaSubsamplingX uint8 `mp4:"8,size=1"`
ChromaSubsamplingY uint8 `mp4:"9,size=1"`
ChromaSamplePosition uint8 `mp4:"10,size=2"`
Reserved uint8 `mp4:"11,size=3,const=0"`
InitialPresentationDelayPresent uint8 `mp4:"12,size=1"`
InitialPresentationDelayMinusOne uint8 `mp4:"13,size=4"`
ConfigOBUs []uint8 `mp4:"14,size=8"`
}
// GetType returns the box type.
func (Av1C) GetType() gomp4.BoxType {
return BoxTypeAv1C()
}

396
internal/rtmp/tracks/read.go

@ -0,0 +1,396 @@ @@ -0,0 +1,396 @@
// Package tracks contains functions to read and write track metadata.
package tracks
import (
"bytes"
"errors"
"fmt"
"time"
gomp4 "github.com/abema/go-mp4"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/mediamtx/internal/rtmp/h264conf"
"github.com/aler9/mediamtx/internal/rtmp/message"
)
func h265FindNALU(array []gomp4.HEVCNaluArray, typ h265.NALUType) []byte {
for _, entry := range array {
if entry.NaluType == byte(typ) && entry.NumNalus == 1 &&
h265.NALUType((entry.Nalus[0].NALUnit[0]>>1)&0b111111) == typ {
return entry.Nalus[0].NALUnit
}
}
return nil
}
func trackFromH264DecoderConfig(data []byte) (formats.Format, error) {
var conf h264conf.Conf
err := conf.Unmarshal(data)
if err != nil {
return nil, fmt.Errorf("unable to parse H264 config: %v", err)
}
return &formats.H264{
PayloadTyp: 96,
SPS: conf.SPS,
PPS: conf.PPS,
PacketizationMode: 1,
}, nil
}
func trackFromAACDecoderConfig(data []byte) (*formats.MPEG4Audio, error) {
var mpegConf mpeg4audio.Config
err := mpegConf.Unmarshal(data)
if err != nil {
return nil, err
}
return &formats.MPEG4Audio{
PayloadTyp: 96,
Config: &mpegConf,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}, nil
}
var errEmptyMetadata = errors.New("metadata is empty")
func readTracksFromMetadata(r *message.ReadWriter, payload []interface{}) (formats.Format, formats.Format, error) {
if len(payload) != 1 {
return nil, nil, fmt.Errorf("invalid metadata")
}
md, ok := payload[0].(flvio.AMFMap)
if !ok {
return nil, nil, fmt.Errorf("invalid metadata")
}
var videoTrack formats.Format
var audioTrack formats.Format
hasVideo, err := func() (bool, error) {
v, ok := md.GetV("videocodecid")
if !ok {
return false, nil
}
switch vt := v.(type) {
case float64:
switch vt {
case 0:
return false, nil
case message.CodecH264:
return true, nil
}
case string:
if vt == "avc1" {
return true, nil
}
}
return false, fmt.Errorf("unsupported video codec: %v", v)
}()
if err != nil {
return nil, nil, err
}
hasAudio, err := func() (bool, error) {
v, ok := md.GetV("audiocodecid")
if !ok {
return false, nil
}
switch vt := v.(type) {
case float64:
switch vt {
case 0:
return false, nil
case message.CodecMPEG2Audio:
audioTrack = &formats.MPEG2Audio{}
return true, nil
case message.CodecMPEG4Audio:
return true, nil
}
case string:
if vt == "mp4a" {
return true, nil
}
}
return false, fmt.Errorf("unsupported audio codec %v", v)
}()
if err != nil {
return nil, nil, err
}
if !hasVideo && !hasAudio {
return nil, nil, errEmptyMetadata
}
for {
if (!hasVideo || videoTrack != nil) &&
(!hasAudio || audioTrack != nil) {
return videoTrack, audioTrack, nil
}
msg, err := r.Read()
if err != nil {
return nil, nil, err
}
switch tmsg := msg.(type) {
case *message.Video:
if !hasVideo {
return nil, nil, fmt.Errorf("unexpected video packet")
}
if videoTrack == nil {
if tmsg.Type == message.VideoTypeConfig {
videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
}
// format used by OBS < 29.1 to publish H265
} else if tmsg.Type == message.VideoTypeAU && tmsg.IsKeyFrame {
nalus, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return nil, nil, err
}
var vps []byte
var sps []byte
var pps []byte
for _, nalu := range nalus {
typ := h265.NALUType((nalu[0] >> 1) & 0b111111)
switch typ {
case h265.NALUType_VPS_NUT:
vps = nalu
case h265.NALUType_SPS_NUT:
sps = nalu
case h265.NALUType_PPS_NUT:
pps = nalu
}
}
if vps != nil && sps != nil && pps != nil {
videoTrack = &formats.H265{
PayloadTyp: 96,
VPS: vps,
SPS: sps,
PPS: pps,
}
}
}
}
case *message.ExtendedSequenceStart:
if videoTrack == nil {
switch tmsg.FourCC {
case message.FourCCHEVC:
var hvcc gomp4.HvcC
_, err := gomp4.Unmarshal(bytes.NewReader(tmsg.Config), uint64(len(tmsg.Config)), &hvcc, gomp4.Context{})
if err != nil {
return nil, nil, fmt.Errorf("invalid H265 configuration: %v", err)
}
vps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_VPS_NUT)
sps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_SPS_NUT)
pps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_PPS_NUT)
if vps == nil || sps == nil || pps == nil {
return nil, nil, fmt.Errorf("H265 parameters are missing")
}
videoTrack = &formats.H265{
PayloadTyp: 96,
VPS: vps,
SPS: sps,
PPS: pps,
}
case message.FourCCAV1:
var av1c Av1C
_, err := gomp4.Unmarshal(bytes.NewReader(tmsg.Config), uint64(len(tmsg.Config)), &av1c, gomp4.Context{})
if err != nil {
return nil, nil, fmt.Errorf("invalid AV1 configuration: %v", err)
}
// parse sequence header and metadata contained in ConfigOBUs, but do not use them
_, err = av1.BitstreamUnmarshal(av1c.ConfigOBUs, false)
if err != nil {
return nil, nil, fmt.Errorf("invalid AV1 configuration: %v", err)
}
videoTrack = &formats.AV1{}
default: // VP9
return nil, nil, fmt.Errorf("VP9 is not supported yet")
}
}
case *message.Audio:
if !hasAudio {
return nil, nil, fmt.Errorf("unexpected audio packet")
}
if audioTrack == nil &&
tmsg.Codec == message.CodecMPEG4Audio &&
tmsg.AACType == message.AudioAACTypeConfig {
audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
}
}
}
}
}
func readTracksFromMessages(r *message.ReadWriter, msg message.Message) (formats.Format, *formats.MPEG4Audio, error) {
var startTime *time.Duration
var videoTrack formats.Format
var audioTrack *formats.MPEG4Audio
// analyze 1 second of packets
outer:
for {
switch tmsg := msg.(type) {
case *message.Video:
if startTime == nil {
v := tmsg.DTS
startTime = &v
}
if tmsg.Type == message.VideoTypeConfig {
if videoTrack == nil {
var err error
videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
}
// stop the analysis if both tracks are found
if videoTrack != nil && audioTrack != nil {
return videoTrack, audioTrack, nil
}
}
}
if (tmsg.DTS - *startTime) >= 1*time.Second {
break outer
}
case *message.Audio:
if startTime == nil {
v := tmsg.DTS
startTime = &v
}
if tmsg.AACType == message.AudioAACTypeConfig {
if audioTrack == nil {
var err error
audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
}
// stop the analysis if both tracks are found
if videoTrack != nil && audioTrack != nil {
return videoTrack, audioTrack, nil
}
}
}
if (tmsg.DTS - *startTime) >= 1*time.Second {
break outer
}
}
var err error
msg, err = r.Read()
if err != nil {
return nil, nil, err
}
}
if videoTrack == nil && audioTrack == nil {
return nil, nil, fmt.Errorf("no tracks found")
}
return videoTrack, audioTrack, nil
}
// Read reads track informations.
// It returns the video track and the audio track.
func Read(r *message.ReadWriter) (formats.Format, formats.Format, error) {
msg, err := func() (message.Message, error) {
for {
msg, err := r.Read()
if err != nil {
return nil, err
}
// skip play start and data start
if cmd, ok := msg.(*message.CommandAMF0); ok && cmd.Name == "onStatus" {
continue
}
// skip RtmpSampleAccess
if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 {
if s, ok := data.Payload[0].(string); ok && s == "|RtmpSampleAccess" {
continue
}
}
return msg, nil
}
}()
if err != nil {
return nil, nil, err
}
if data, ok := msg.(*message.DataAMF0); ok && len(data.Payload) >= 1 {
payload := data.Payload
if s, ok := payload[0].(string); ok && s == "@setDataFrame" {
payload = payload[1:]
}
if len(payload) >= 1 {
if s, ok := payload[0].(string); ok && s == "onMetaData" {
videoTrack, audioTrack, err := readTracksFromMetadata(r, payload[1:])
if err != nil {
if err == errEmptyMetadata {
msg, err := r.Read()
if err != nil {
return nil, nil, err
}
return readTracksFromMessages(r, msg)
}
return nil, nil, err
}
return videoTrack, audioTrack, nil
}
}
}
return readTracksFromMessages(r, msg)
}

484
internal/rtmp/tracks/read_test.go

@ -0,0 +1,484 @@ @@ -0,0 +1,484 @@
package tracks
import (
"bytes"
"testing"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/stretchr/testify/require"
"github.com/aler9/mediamtx/internal/rtmp/bytecounter"
"github.com/aler9/mediamtx/internal/rtmp/h264conf"
"github.com/aler9/mediamtx/internal/rtmp/message"
)
func TestRead(t *testing.T) {
sps := []byte{
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0,
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00,
0x00, 0x03, 0x00, 0x3d, 0x08,
}
pps := []byte{
0x68, 0xee, 0x3c, 0x80,
}
for _, ca := range []struct {
name string
videoTrack formats.Format
audioTrack formats.Format
}{
{
"video+audio",
&formats.H264{
PayloadTyp: 96,
SPS: sps,
PPS: pps,
PacketizationMode: 1,
},
&formats.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
},
},
{
"video",
&formats.H264{
PayloadTyp: 96,
SPS: sps,
PPS: pps,
PacketizationMode: 1,
},
nil,
},
{
"metadata without codec id",
&formats.H264{
PayloadTyp: 96,
SPS: sps,
PPS: pps,
PacketizationMode: 1,
},
&formats.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
},
},
{
"missing metadata, video+audio",
&formats.H264{
PayloadTyp: 96,
SPS: sps,
PPS: pps,
PacketizationMode: 1,
},
&formats.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
},
},
{
"missing metadata, audio",
nil,
&formats.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
},
},
{
"obs studio pre 29.1 h265",
&formats.H265{
PayloadTyp: 96,
VPS: []byte{
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x01, 0x40,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x7b, 0xac, 0x09,
},
SPS: []byte{
0x42, 0x01, 0x01, 0x01, 0x40, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x7b, 0xa0, 0x03, 0xc0, 0x80, 0x11,
0x07, 0xcb, 0x96, 0xb4, 0xa4, 0x25, 0x92, 0xe3,
0x01, 0x6a, 0x02, 0x02, 0x02, 0x08, 0x00, 0x00,
0x03, 0x00, 0x08, 0x00, 0x00, 0x03, 0x01, 0xe3,
0x00, 0x2e, 0xf2, 0x88, 0x00, 0x09, 0x89, 0x60,
0x00, 0x04, 0xc4, 0xb4, 0x20,
},
PPS: []byte{
0x44, 0x01, 0xc0, 0xf7, 0xc0, 0xcc, 0x90,
},
},
&formats.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
},
},
} {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
bc := bytecounter.NewReadWriter(&buf)
mrw := message.NewReadWriter(bc, true)
switch ca.name {
case "video+audio":
err := mrw.Write(&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "videodatarate",
V: float64(0),
},
{
K: "videocodecid",
V: float64(message.CodecH264),
},
{
K: "audiodatarate",
V: float64(0),
},
{
K: "audiocodecid",
V: float64(message.CodecMPEG4Audio),
},
},
},
})
require.NoError(t, err)
buf, _ := h264conf.Conf{
SPS: sps,
PPS: pps,
}.Marshal()
err = mrw.Write(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.VideoTypeConfig,
Payload: buf,
})
require.NoError(t, err)
enc, err := mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
}.Marshal()
require.NoError(t, err)
err = mrw.Write(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeConfig,
Payload: enc,
})
require.NoError(t, err)
case "video":
err := mrw.Write(&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "videodatarate",
V: float64(0),
},
{
K: "videocodecid",
V: float64(message.CodecH264),
},
{
K: "audiodatarate",
V: float64(0),
},
{
K: "audiocodecid",
V: float64(0),
},
},
},
})
require.NoError(t, err)
buf, _ := h264conf.Conf{
SPS: sps,
PPS: pps,
}.Marshal()
err = mrw.Write(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.VideoTypeConfig,
Payload: buf,
})
require.NoError(t, err)
case "metadata without codec id":
err := mrw.Write(&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "width",
V: float64(2688),
},
{
K: "height",
V: float64(1520),
},
{
K: "framerate",
V: float64(0o25),
},
},
},
})
require.NoError(t, err)
buf, _ := h264conf.Conf{
SPS: sps,
PPS: pps,
}.Marshal()
err = mrw.Write(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.VideoTypeConfig,
Payload: buf,
})
require.NoError(t, err)
enc, err := mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
}.Marshal()
require.NoError(t, err)
err = mrw.Write(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeConfig,
Payload: enc,
})
require.NoError(t, err)
case "missing metadata, video+audio":
buf, _ := h264conf.Conf{
SPS: sps,
PPS: pps,
}.Marshal()
err := mrw.Write(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.VideoTypeConfig,
Payload: buf,
})
require.NoError(t, err)
enc, err := mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
}.Marshal()
require.NoError(t, err)
err = mrw.Write(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeConfig,
Payload: enc,
})
require.NoError(t, err)
case "missing metadata, audio":
enc, err := mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
}.Marshal()
require.NoError(t, err)
err = mrw.Write(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeConfig,
Payload: enc,
})
require.NoError(t, err)
err = mrw.Write(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeConfig,
Payload: enc,
DTS: 1 * time.Second,
})
require.NoError(t, err)
case "obs studio pre 29.1 h265":
err := mrw.Write(&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "videodatarate",
V: float64(0),
},
{
K: "videocodecid",
V: float64(message.CodecH264),
},
{
K: "audiodatarate",
V: float64(0),
},
{
K: "audiocodecid",
V: float64(message.CodecMPEG4Audio),
},
},
},
})
require.NoError(t, err)
avcc, err := h264.AVCCMarshal([][]byte{
{ // VPS
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x01, 0x40,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x7b, 0xac, 0x09,
},
{ // SPS
0x42, 0x01, 0x01, 0x01, 0x40, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x7b, 0xa0, 0x03, 0xc0, 0x80, 0x11,
0x07, 0xcb, 0x96, 0xb4, 0xa4, 0x25, 0x92, 0xe3,
0x01, 0x6a, 0x02, 0x02, 0x02, 0x08, 0x00, 0x00,
0x03, 0x00, 0x08, 0x00, 0x00, 0x03, 0x01, 0xe3,
0x00, 0x2e, 0xf2, 0x88, 0x00, 0x09, 0x89, 0x60,
0x00, 0x04, 0xc4, 0xb4, 0x20,
},
{
// PPS
0x44, 0x01, 0xc0, 0xf7, 0xc0, 0xcc, 0x90,
},
})
require.NoError(t, err)
err = mrw.Write(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.VideoTypeAU,
Payload: avcc,
})
require.NoError(t, err)
enc, err := mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
}.Marshal()
require.NoError(t, err)
err = mrw.Write(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeConfig,
Payload: enc,
})
require.NoError(t, err)
}
videoTrack, audioTrack, err := Read(mrw)
require.NoError(t, err)
require.Equal(t, ca.videoTrack, videoTrack)
require.Equal(t, ca.audioTrack, audioTrack)
})
}
}

107
internal/rtmp/tracks/write.go

@ -0,0 +1,107 @@ @@ -0,0 +1,107 @@
package tracks
import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/mediamtx/internal/rtmp/h264conf"
"github.com/aler9/mediamtx/internal/rtmp/message"
)
// Write writes track informations.
func Write(w *message.ReadWriter, videoTrack formats.Format, audioTrack formats.Format) error {
err := w.Write(&message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 0x1000000,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "videodatarate",
V: float64(0),
},
{
K: "videocodecid",
V: func() float64 {
switch videoTrack.(type) {
case *formats.H264:
return message.CodecH264
default:
return 0
}
}(),
},
{
K: "audiodatarate",
V: float64(0),
},
{
K: "audiocodecid",
V: func() float64 {
switch audioTrack.(type) {
case *formats.MPEG2Audio:
return message.CodecMPEG2Audio
case *formats.MPEG4Audio:
return message.CodecMPEG4Audio
default:
return 0
}
}(),
},
},
},
})
if err != nil {
return err
}
if videoTrack, ok := videoTrack.(*formats.H264); ok {
// write decoder config only if SPS and PPS are available.
// if they're not available yet, they're sent later.
if sps, pps := videoTrack.SafeParams(); sps != nil && pps != nil {
buf, _ := h264conf.Conf{
SPS: sps,
PPS: pps,
}.Marshal()
err = w.Write(&message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.VideoTypeConfig,
Payload: buf,
})
if err != nil {
return err
}
}
}
if mpeg4audioTrack, ok := audioTrack.(*formats.MPEG4Audio); ok {
enc, err := mpeg4audioTrack.Config.Marshal()
if err != nil {
return err
}
err = w.Write(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeConfig,
Payload: enc,
})
if err != nil {
return err
}
}
return nil
}

96
internal/rtmp/tracks/write_test.go

@ -0,0 +1,96 @@ @@ -0,0 +1,96 @@
package tracks
import (
"bytes"
"testing"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/stretchr/testify/require"
"github.com/aler9/mediamtx/internal/rtmp/bytecounter"
"github.com/aler9/mediamtx/internal/rtmp/message"
)
func TestWrite(t *testing.T) {
var buf bytes.Buffer
bc := bytecounter.NewReadWriter(&buf)
mrw := message.NewReadWriter(bc, true)
videoTrack := &formats.H264{
PayloadTyp: 96,
SPS: []byte{
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0,
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00,
0x00, 0x03, 0x00, 0x3d, 0x08,
},
PPS: []byte{
0x68, 0xee, 0x3c, 0x80,
},
PacketizationMode: 1,
}
audioTrack := &formats.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
err := Write(mrw, videoTrack, audioTrack)
require.NoError(t, err)
msg, err := mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.DataAMF0{
ChunkStreamID: 4,
MessageStreamID: 0x1000000,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{K: "videodatarate", V: float64(0)},
{K: "videocodecid", V: float64(7)},
{K: "audiodatarate", V: float64(0)},
{K: "audiocodecid", V: float64(10)},
},
},
}, msg)
msg, err = mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.Video{
ChunkStreamID: message.VideoChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecH264,
IsKeyFrame: true,
Type: message.VideoTypeConfig,
Payload: []byte{
0x1, 0x64, 0x0,
0xc, 0xff, 0xe1, 0x0, 0x15, 0x67, 0x64, 0x0,
0xc, 0xac, 0x3b, 0x50, 0xb0, 0x4b, 0x42, 0x0,
0x0, 0x3, 0x0, 0x2, 0x0, 0x0, 0x3, 0x0,
0x3d, 0x8, 0x1, 0x0, 0x4, 0x68, 0xee, 0x3c,
0x80,
},
}, msg)
msg, err = mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeConfig,
Payload: []byte{0x12, 0x10},
}, msg)
}
Loading…
Cancel
Save