Browse Source

support publishing, reading and proxying MPEG-2 audio (MP3) tracks with RTMP (#1102) (#1736)

pull/1738/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
2d17dff3b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      README.md
  2. 4
      go.mod
  3. 8
      go.sum
  4. 2
      internal/core/hls_muxer.go
  5. 8
      internal/core/hls_source.go
  6. 3
      internal/core/hls_source_test.go
  7. 2
      internal/core/rpicamera_source.go
  8. 387
      internal/core/rtmp_conn.go
  9. 52
      internal/core/rtmp_source.go
  10. 158
      internal/core/rtsp_session.go
  11. 91
      internal/core/rtsp_source.go
  12. 4
      internal/core/stream.go
  13. 2
      internal/core/stream_format.go
  14. 8
      internal/core/udp_source.go
  15. 2
      internal/formatprocessor/generic_test.go
  16. 8
      internal/formatprocessor/h264.go
  17. 7
      internal/formatprocessor/h265.go
  18. 99
      internal/formatprocessor/mpeg2audio.go
  19. 13
      internal/formatprocessor/mpeg4audio.go
  20. 10
      internal/formatprocessor/opus.go
  21. 3
      internal/formatprocessor/processor.go
  22. 9
      internal/formatprocessor/vp8.go
  23. 9
      internal/formatprocessor/vp9.go
  24. 54
      internal/rtmp/conn.go
  25. 13
      internal/rtmp/conn_test.go
  26. 48
      internal/rtmp/message/msg_audio.go
  27. 20
      internal/rtmp/message/reader_test.go

10
README.md

@ -10,10 +10,10 @@ Live streams can be published to the server with: @@ -10,10 +10,10 @@ Live streams can be published to the server with:
|protocol|variants|codecs|
|--------|--------|------|
|RTSP clients (FFmpeg, GStreamer, etc)|UDP, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, MPEG-2 audio, M-JPEG, MP3, MPEG-4 video, MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, MPEG-2 audio, M-JPEG, MP3, MPEG-4 video, MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTMP clients (OBS Studio)|RTMP, RTMPS|H264, H265, MPEG-4 Audio (AAC)|
|RTMP servers and cameras|RTMP, RTMPS|H264, MPEG-4 Audio (AAC)|
|RTSP clients (FFmpeg, GStreamer, etc)|UDP, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, M-JPEG, MPEG-4 video, MPEG-2 audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, M-JPEG, MPEG-4 video, MPEG-2 audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTMP clients (OBS Studio)|RTMP, RTMPS|H264, H265, MPEG-2 audio (MP3), MPEG-4 Audio (AAC)|
|RTMP servers and cameras|RTMP, RTMPS|H264, MPEG-2 audio (MP3), MPEG-4 Audio (AAC)|
|HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG-4 Audio (AAC), Opus|
|UDP/MPEG-TS streams|Unicast, broadcast, multicast|H264, H265, MPEG-4 Audio (AAC), Opus|
|Raspberry Pi Cameras||H264|
@ -22,7 +22,7 @@ And can be read from the server with: @@ -22,7 +22,7 @@ And can be read from the server with:
|protocol|variants|codecs|
|--------|--------|------|
|RTSP|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, MPEG-2 audio, M-JPEG, MP3, MPEG-4 video, MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTSP|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG-2 video, M-JPEG, MPEG-4 video, MPEG-2 audio (MP3), MPEG-4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec|
|RTMP|RTMP, RTMPS|H264, MPEG-4 Audio (AAC)|
|HLS|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG-4 Audio (AAC), Opus|
|WebRTC||H264, VP8, VP9, Opus, G711, G722|

4
go.mod

@ -7,8 +7,8 @@ require ( @@ -7,8 +7,8 @@ require (
github.com/alecthomas/kong v0.7.1
github.com/asticode/go-astits v1.11.0
github.com/bluenviron/gohlslib v0.2.1
github.com/bluenviron/gortsplib/v3 v3.2.1
github.com/bluenviron/mediacommon v0.3.1
github.com/bluenviron/gortsplib/v3 v3.3.0
github.com/bluenviron/mediacommon v0.4.1
github.com/fsnotify/fsnotify v1.6.0
github.com/gin-gonic/gin v1.9.0
github.com/google/uuid v1.3.0

8
go.sum

@ -14,10 +14,10 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z @@ -14,10 +14,10 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/gohlslib v0.2.1 h1:ZDbC87oaSv6B85o5lYC+gcOISJsg1z9IX4xTbxMqJh4=
github.com/bluenviron/gohlslib v0.2.1/go.mod h1:fxwqh+twBM2Mi3AZ05nuQ7qvp8un833dFqcyykzv8bc=
github.com/bluenviron/gortsplib/v3 v3.2.1 h1:wdMocTWu1EWa9PPWb8F/S6LY2hZikgrs7zgDtnwBPO0=
github.com/bluenviron/gortsplib/v3 v3.2.1/go.mod h1:AzHdywoBckre5q9Y581xg93PVthXayVHVqYMc3hwBlk=
github.com/bluenviron/mediacommon v0.3.1 h1:C4okNqyN1Mg5CVGcGKk2tEk9Uj2hHZusHV7nqdjn1Lk=
github.com/bluenviron/mediacommon v0.3.1/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bluenviron/gortsplib/v3 v3.3.0 h1:g7hXsLSXk8Z/qiJ70zLdiWb/HTn5L59ngIAsdFREBLk=
github.com/bluenviron/gortsplib/v3 v3.3.0/go.mod h1:7p+nkw/4yyNrKxHaLwskMhDlXXHKJXqO85kpVOi+eXc=
github.com/bluenviron/mediacommon v0.4.1 h1:oiqvqwnZ0NbB+mCZjuyBtgY7cF88rZdhb/PTSNG9M+Q=
github.com/bluenviron/mediacommon v0.4.1/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.8.0 h1:ea0Xadu+sHlu7x5O3gKhRpQ1IKiMrSiHttPF0ybECuA=
github.com/bytedance/sonic v1.8.0/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=

2
internal/core/hls_muxer.go

@ -296,7 +296,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -296,7 +296,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
if medias == nil {
return fmt.Errorf(
"the stream doesn't contain any supported codec (which are currently H264, H265, MPEG4-Audio, Opus)")
"the stream doesn't contain any supported codec, which are currently H264, H265, MPEG4-Audio, Opus")
}
var muxerDirectory string

8
internal/core/hls_source.go

@ -103,7 +103,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -103,7 +103,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnData(track, func(pts time.Duration, unit interface{}) {
err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH264{
err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
PTS: pts,
AU: unit.([][]byte),
NTP: time.Now(),
@ -125,7 +125,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -125,7 +125,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnData(track, func(pts time.Duration, unit interface{}) {
err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH265{
err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{
PTS: pts,
AU: unit.([][]byte),
NTP: time.Now(),
@ -148,7 +148,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -148,7 +148,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnData(track, func(pts time.Duration, unit interface{}) {
err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{
err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{
PTS: pts,
AUs: [][]byte{unit.([]byte)},
NTP: time.Now(),
@ -168,7 +168,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -168,7 +168,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnData(track, func(pts time.Duration, unit interface{}) {
err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitOpus{
err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{
PTS: pts,
Frame: unit.([]byte),
NTP: time.Now(),

3
internal/core/hls_source_test.go

@ -262,7 +262,8 @@ func TestHLSSource(t *testing.T) { @@ -262,7 +262,8 @@ func TestHLSSource(t *testing.T) {
Control: medias[1].Control,
Formats: []formats.Format{
&formats.MPEG4Audio{
PayloadTyp: 96,
PayloadTyp: 96,
ProfileLevelID: 1,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,

2
internal/core/rpicamera_source.go

@ -97,7 +97,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon @@ -97,7 +97,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon
stream = res.stream
}
err := stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH264{
err := stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
PTS: dts,
AU: au,
NTP: time.Now(),

387
internal/core/rtmp_conn.go

@ -14,6 +14,7 @@ import ( @@ -14,6 +14,7 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/gortsplib/v3/pkg/ringbuffer"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg2audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/google/uuid"
"github.com/notedit/rtmp/format/flv/flvio"
@ -39,6 +40,96 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) { @@ -39,6 +40,96 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) {
return pathName, ur.Query(), ur.RawQuery
}
type rtmpWriteFunc func(msg interface{}) error
func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) rtmpWriteFunc {
switch format.(type) {
case *formats.H264:
return func(msg interface{}) error {
tmsg := msg.(*message.MsgVideo)
if tmsg.H264Type == flvio.AVC_SEQHDR {
var conf h264conf.Conf
err := conf.Unmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to parse H264 config: %v", err)
}
au := [][]byte{
conf.SPS,
conf.PPS,
}
return stream.writeUnit(medi, format, &formatprocessor.UnitH264{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
})
}
if tmsg.H264Type == flvio.AVC_NALU {
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to decode AVCC: %v", err)
}
return stream.writeUnit(medi, format, &formatprocessor.UnitH264{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
})
}
return nil
}
case *formats.H265:
return func(msg interface{}) error {
tmsg := msg.(*message.MsgVideo)
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to decode AVCC: %v", err)
}
return stream.writeUnit(medi, format, &formatprocessor.UnitH265{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
})
}
case *formats.MPEG2Audio:
return func(msg interface{}) error {
tmsg := msg.(*message.MsgAudio)
return stream.writeUnit(medi, format, &formatprocessor.UnitMPEG2Audio{
PTS: tmsg.DTS,
Frames: [][]byte{tmsg.Payload},
NTP: time.Now(),
})
}
case *formats.MPEG4Audio:
return func(msg interface{}) error {
tmsg := msg.(*message.MsgAudio)
if tmsg.AACType != flvio.AAC_RAW {
return nil
}
return stream.writeUnit(medi, format, &formatprocessor.UnitMPEG4Audio{
PTS: tmsg.DTS,
AUs: [][]byte{tmsg.Payload},
NTP: time.Now(),
})
}
default:
return nil
}
}
type rtmpConnState int
const (
@ -73,11 +164,10 @@ type rtmpConn struct { @@ -73,11 +164,10 @@ type rtmpConn struct {
pathManager rtmpConnPathManager
parent rtmpConnParent
ctx context.Context
ctxCancel func()
uuid uuid.UUID
created time.Time
// path *path
ctx context.Context
ctxCancel func()
uuid uuid.UUID
created time.Time
state rtmpConnState
stateMutex sync.Mutex
}
@ -252,18 +342,6 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -252,18 +342,6 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
c.state = rtmpConnStateRead
c.stateMutex.Unlock()
var videoFormat *formats.H264
videoMedia := res.stream.medias().FindFormat(&videoFormat)
videoFirstIDRFound := false
var videoStartDTS time.Duration
var audioFormat *formats.MPEG4Audio
audioMedia := res.stream.medias().FindFormat(&audioFormat)
if videoFormat == nil && audioFormat == nil {
return fmt.Errorf("the stream doesn't contain an H264 track or an AAC track")
}
ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount))
go func() {
<-ctx.Done()
@ -271,14 +349,82 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -271,14 +349,82 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
}()
var medias media.Medias
videoFirstIDRFound := false
var videoStartDTS time.Duration
videoMedia, videoFormat := c.findVideoFormat(res.stream, ringBuffer,
&videoFirstIDRFound, &videoStartDTS)
if videoMedia != nil {
medias = append(medias, videoMedia)
}
audioMedia, audioFormat := c.findAudioFormat(res.stream, ringBuffer,
videoFormat, &videoFirstIDRFound, &videoStartDTS)
if audioFormat != nil {
medias = append(medias, audioMedia)
}
if videoFormat == nil && audioFormat == nil {
return fmt.Errorf(
"the stream doesn't contain any supported codec, which are currently H264, MPEG2-Audio, MPEG4-Audio")
}
defer res.stream.readerRemove(c)
c.log(logger.Info, "is reading from path '%s', %s",
path.name, sourceMediaInfo(medias))
pathConf := path.safeConf()
if pathConf.RunOnRead != "" {
c.log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
path.externalCmdEnv(),
func(co int) {
c.log(logger.Info, "runOnRead command exited with code %d", co)
})
defer func() {
onReadCmd.Close()
c.log(logger.Info, "runOnRead command stopped")
}()
}
err := c.conn.WriteTracks(videoFormat, audioFormat)
if err != nil {
return err
}
// disable read deadline
c.nconn.SetReadDeadline(time.Time{})
for {
item, ok := ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
err := item.(func() error)()
if err != nil {
return err
}
}
}
func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBuffer,
videoFirstIDRFound *bool, videoStartDTS *time.Duration,
) (*media.Media, formats.Format) {
var videoFormatH264 *formats.H264
videoMedia := stream.medias().FindFormat(&videoFormatH264)
if videoFormatH264 != nil {
videoStartPTSFilled := false
var videoStartPTS time.Duration
var videoDTSExtractor *h264.DTSExtractor
res.stream.readerAdd(c, videoMedia, videoFormat, func(unit formatprocessor.Unit) {
stream.readerAdd(c, videoMedia, videoFormatH264, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitH264)
@ -309,12 +455,12 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -309,12 +455,12 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
var dts time.Duration
// wait until we receive an IDR
if !videoFirstIDRFound {
if !*videoFirstIDRFound {
if !idrPresent {
return nil
}
videoFirstIDRFound = true
*videoFirstIDRFound = true
videoDTSExtractor = h264.NewDTSExtractor()
var err error
@ -323,9 +469,9 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -323,9 +469,9 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return err
}
videoStartDTS = dts
*videoStartDTS = dts
dts = 0
pts -= videoStartDTS
pts -= *videoStartDTS
} else {
if !idrPresent && !nonIDRPresent {
return nil
@ -337,8 +483,8 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -337,8 +483,8 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return err
}
dts -= videoStartDTS
pts -= videoStartDTS
dts -= *videoStartDTS
pts -= *videoStartDTS
}
avcc, err := h264.AVCCMarshal(tunit.AU)
@ -363,15 +509,24 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -363,15 +509,24 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return nil
})
})
return videoMedia, videoFormatH264
}
if audioMedia != nil {
medias = append(medias, audioMedia)
return nil, nil
}
func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBuffer,
videoFormat formats.Format, videoFirstIDRFound *bool, videoStartDTS *time.Duration,
) (*media.Media, formats.Format) {
var audioFormatMPEG4 *formats.MPEG4Audio
audioMedia := stream.medias().FindFormat(&audioFormatMPEG4)
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration
res.stream.readerAdd(c, audioMedia, audioFormat, func(unit formatprocessor.Unit) {
stream.readerAdd(c, audioMedia, audioFormatMPEG4, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4Audio)
@ -386,11 +541,11 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -386,11 +541,11 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
pts := tunit.PTS - audioStartPTS
if videoFormat != nil {
if !videoFirstIDRFound {
if !*videoFirstIDRFound {
return nil
}
pts -= videoStartDTS
pts -= *videoStartDTS
if pts < 0 {
return nil
}
@ -401,13 +556,14 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -401,13 +556,14 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
err := c.conn.WriteMessage(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: flvio.AAC_RAW,
Payload: au,
DTS: pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormat.ClockRate()),
time.Second/time.Duration(audioFormatMPEG4.ClockRate()),
})
if err != nil {
return err
@ -417,50 +573,83 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -417,50 +573,83 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return nil
})
})
return audioMedia, audioFormatMPEG4
}
defer res.stream.readerRemove(c)
var audioFormatMPEG2 *formats.MPEG2Audio
audioMedia = stream.medias().FindFormat(&audioFormatMPEG2)
c.log(logger.Info, "is reading from path '%s', %s",
path.name, sourceMediaInfo(medias))
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration
pathConf := path.safeConf()
stream.readerAdd(c, audioMedia, audioFormatMPEG2, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG2Audio)
if pathConf.RunOnRead != "" {
c.log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
c.externalCmdPool,
pathConf.RunOnRead,
pathConf.RunOnReadRestart,
path.externalCmdEnv(),
func(co int) {
c.log(logger.Info, "runOnRead command exited with code %d", co)
})
defer func() {
onReadCmd.Close()
c.log(logger.Info, "runOnRead command stopped")
}()
}
if !audioStartPTSFilled {
audioStartPTSFilled = true
audioStartPTS = tunit.PTS
}
pts := tunit.PTS - audioStartPTS
err := c.conn.WriteTracks(videoFormat, audioFormat)
if err != nil {
return err
}
if videoFormat != nil {
if !*videoFirstIDRFound {
return nil
}
// disable read deadline
c.nconn.SetReadDeadline(time.Time{})
pts -= *videoStartDTS
if pts < 0 {
return nil
}
}
for {
item, ok := ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
for _, frame := range tunit.Frames {
var h mpeg2audio.FrameHeader
err := h.Unmarshal(frame)
if err != nil {
return err
}
err := item.(func() error)()
if err != nil {
return err
}
if !(!h.MPEG2 && h.Layer == 3) {
return fmt.Errorf("RTMP only supports MPEG-1 audio layer 3")
}
channels := uint8(flvio.SOUND_STEREO)
if h.ChannelMode == mpeg2audio.ChannelModeMono {
channels = flvio.SOUND_MONO
}
msg := &message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG2Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: channels,
Payload: frame,
DTS: pts,
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = c.conn.WriteMessage(msg)
if err != nil {
return err
}
pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
}
return nil
})
})
return audioMedia, audioFormatMPEG2
}
return nil, nil
}
func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
@ -538,31 +727,8 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -538,31 +727,8 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
// disable write deadline to allow outgoing acknowledges
c.nconn.SetWriteDeadline(time.Time{})
var onVideoData func(time.Duration, [][]byte)
if _, ok := videoFormat.(*formats.H264); ok {
onVideoData = func(pts time.Duration, au [][]byte) {
err = rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{
PTS: pts,
AU: au,
NTP: time.Now(),
})
if err != nil {
c.log(logger.Warn, "%v", err)
}
}
} else {
onVideoData = func(pts time.Duration, au [][]byte) {
err = rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH265{
PTS: pts,
AU: au,
NTP: time.Now(),
})
if err != nil {
c.log(logger.Warn, "%v", err)
}
}
}
videoWriteFunc := getRTMPWriteFunc(videoMedia, videoFormat, rres.stream)
audioWriteFunc := getRTMPWriteFunc(audioMedia, audioFormat, rres.stream)
for {
c.nconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout)))
@ -577,34 +743,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -577,34 +743,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
return fmt.Errorf("received a video packet, but track is not set up")
}
if tmsg.H264Type == flvio.AVC_SEQHDR {
var conf h264conf.Conf
err = conf.Unmarshal(tmsg.Payload)
if err != nil {
return fmt.Errorf("unable to parse H264 config: %v", err)
}
au := [][]byte{
conf.SPS,
conf.PPS,
}
err := rres.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
})
if err != nil {
c.log(logger.Warn, "%v", err)
}
} else if tmsg.H264Type == flvio.AVC_NALU {
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
c.log(logger.Warn, "unable to decode AVCC: %v", err)
continue
}
onVideoData(tmsg.DTS+tmsg.PTSDelta, au)
err := videoWriteFunc(tmsg)
if err != nil {
c.log(logger.Warn, "%v", err)
}
case *message.MsgAudio:
@ -612,15 +753,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -612,15 +753,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
return fmt.Errorf("received an audio packet, but track is not set up")
}
if tmsg.AACType == flvio.AAC_RAW {
err := rres.stream.writeData(audioMedia, audioFormat, &formatprocessor.UnitMPEG4Audio{
PTS: tmsg.DTS,
AUs: [][]byte{tmsg.Payload},
NTP: time.Now(),
})
if err != nil {
c.log(logger.Warn, "%v", err)
}
err := audioWriteFunc(tmsg)
if err != nil {
c.log(logger.Warn, "%v", err)
}
}
}

52
internal/core/rtmp_source.go

@ -13,11 +13,8 @@ import ( @@ -13,11 +13,8 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/mediamtx/internal/conf"
"github.com/aler9/mediamtx/internal/formatprocessor"
"github.com/aler9/mediamtx/internal/logger"
"github.com/aler9/mediamtx/internal/rtmp"
"github.com/aler9/mediamtx/internal/rtmp/message"
@ -154,6 +151,9 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -154,6 +151,9 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
}()
videoWriteFunc := getRTMPWriteFunc(videoMedia, videoFormat, res.stream)
audioWriteFunc := getRTMPWriteFunc(audioMedia, audioFormat, res.stream)
// disable write deadline to allow outgoing acknowledges
nconn.SetWriteDeadline(time.Time{})
@ -166,41 +166,23 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -166,41 +166,23 @@ func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
switch tmsg := msg.(type) {
case *message.MsgVideo:
if tmsg.H264Type == flvio.AVC_NALU {
if videoFormat == nil {
return fmt.Errorf("received an H264 packet, but track is not set up")
}
au, err := h264.AVCCUnmarshal(tmsg.Payload)
if err != nil {
s.Log(logger.Warn, "unable to decode AVCC: %v", err)
continue
}
err = res.stream.writeData(videoMedia, videoFormat, &formatprocessor.UnitH264{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
if videoFormat == nil {
return fmt.Errorf("received an H264 packet, but track is not set up")
}
err := videoWriteFunc(tmsg)
if err != nil {
s.Log(logger.Warn, "%v", err)
}
case *message.MsgAudio:
if tmsg.AACType == flvio.AAC_RAW {
if audioFormat == nil {
return fmt.Errorf("received an AAC packet, but track is not set up")
}
err := res.stream.writeData(audioMedia, audioFormat, &formatprocessor.UnitMPEG4Audio{
PTS: tmsg.DTS,
AUs: [][]byte{tmsg.Payload},
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
if audioFormat == nil {
return fmt.Errorf("received an AAC packet, but track is not set up")
}
err := audioWriteFunc(tmsg)
if err != nil {
s.Log(logger.Warn, "%v", err)
}
}
}

158
internal/core/rtsp_session.go

@ -11,6 +11,7 @@ import ( @@ -11,6 +11,7 @@ import (
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/gortsplib/v3/pkg/url"
"github.com/google/uuid"
"github.com/pion/rtp"
@ -25,6 +26,76 @@ const ( @@ -25,6 +26,76 @@ const (
pauseAfterAuthError = 2 * time.Second
)
type rtspWriteFunc func(*rtp.Packet) error
func getRTSPWriteFunc(medi *media.Media, forma formats.Format, stream *stream) rtspWriteFunc {
switch forma.(type) {
case *formats.H264:
return func(pkt *rtp.Packet) error {
return stream.writeUnit(medi, forma, &formatprocessor.UnitH264{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
}
case *formats.H265:
return func(pkt *rtp.Packet) error {
return stream.writeUnit(medi, forma, &formatprocessor.UnitH265{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
}
case *formats.VP8:
return func(pkt *rtp.Packet) error {
return stream.writeUnit(medi, forma, &formatprocessor.UnitVP8{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
}
case *formats.VP9:
return func(pkt *rtp.Packet) error {
return stream.writeUnit(medi, forma, &formatprocessor.UnitVP9{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
}
case *formats.MPEG2Audio:
return func(pkt *rtp.Packet) error {
return stream.writeUnit(medi, forma, &formatprocessor.UnitMPEG2Audio{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
}
case *formats.MPEG4Audio:
return func(pkt *rtp.Packet) error {
return stream.writeUnit(medi, forma, &formatprocessor.UnitMPEG4Audio{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
}
case *formats.Opus:
return func(pkt *rtp.Packet) error {
return stream.writeUnit(medi, forma, &formatprocessor.UnitOpus{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
}
default:
return func(pkt *rtp.Packet) error {
return stream.writeUnit(medi, forma, &formatprocessor.UnitGeneric{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
}
}
}
type rtspSessionPathManager interface {
publisherAdd(req pathPublisherAddReq) pathPublisherAnnounceRes
readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes
@ -321,87 +392,14 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -321,87 +392,14 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
for _, medi := range s.session.AnnouncedMedias() {
for _, forma := range medi.Formats {
cmedia := medi
cformat := forma
switch forma.(type) {
case *formats.H264:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitH264{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
case *formats.H265:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitH265{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
case *formats.VP8:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP8{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
case *formats.VP9:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP9{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
writeFunc := getRTSPWriteFunc(medi, forma, s.stream)
case *formats.MPEG4Audio:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitMPEG4Audio{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
case *formats.Opus:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitOpus{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
default:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &formatprocessor.UnitGeneric{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
}
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := writeFunc(pkt)
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
}
}

91
internal/core/rtsp_source.go

@ -11,11 +11,9 @@ import ( @@ -11,11 +11,9 @@ import (
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/pion/rtp"
"github.com/aler9/mediamtx/internal/conf"
"github.com/aler9/mediamtx/internal/formatprocessor"
"github.com/aler9/mediamtx/internal/logger"
"github.com/bluenviron/gortsplib/v3/pkg/url"
)
@ -139,87 +137,14 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -139,87 +137,14 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
for _, medi := range medias {
for _, forma := range medi.Formats {
cmedia := medi
cformat := forma
switch forma.(type) {
case *formats.H264:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitH264{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
case *formats.H265:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitH265{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
case *formats.VP8:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP8{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
case *formats.VP9:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitVP9{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
case *formats.MPEG4Audio:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitMPEG4Audio{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
case *formats.Opus:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitOpus{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
default:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &formatprocessor.UnitGeneric{
RTPPackets: []*rtp.Packet{pkt},
NTP: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
}
writeFunc := getRTSPWriteFunc(medi, forma, res.stream)
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := writeFunc(pkt)
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
}
}

4
internal/core/stream.go

@ -60,8 +60,8 @@ func (s *stream) readerRemove(r reader) { @@ -60,8 +60,8 @@ func (s *stream) readerRemove(r reader) {
}
}
func (s *stream) writeData(medi *media.Media, forma formats.Format, data formatprocessor.Unit) error {
func (s *stream) writeUnit(medi *media.Media, forma formats.Format, data formatprocessor.Unit) error {
sm := s.smedias[medi]
sf := sm.formats[forma]
return sf.writeData(s, medi, data)
return sf.writeUnit(s, medi, data)
}

2
internal/core/stream_format.go

@ -46,7 +46,7 @@ func (sf *streamFormat) readerRemove(r reader) { @@ -46,7 +46,7 @@ func (sf *streamFormat) readerRemove(r reader) {
delete(sf.nonRTSPReaders, r)
}
func (sf *streamFormat) writeData(s *stream, medi *media.Media, data formatprocessor.Unit) error {
func (sf *streamFormat) writeUnit(s *stream, medi *media.Media, data formatprocessor.Unit) error {
sf.mutex.RLock()
defer sf.mutex.RUnlock()

8
internal/core/udp_source.go

@ -196,7 +196,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -196,7 +196,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
return
}
err = stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH264{
err = stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
PTS: pts,
AU: au,
NTP: time.Now(),
@ -221,7 +221,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -221,7 +221,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
return
}
err = stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitH265{
err = stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{
PTS: pts,
AU: au,
NTP: time.Now(),
@ -256,7 +256,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -256,7 +256,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
aus[i] = pkt.AU
}
err = stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{
err = stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{
PTS: pts,
AUs: aus,
NTP: time.Now(),
@ -287,7 +287,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -287,7 +287,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
pos += n
err = stream.writeData(medi, medi.Formats[0], &formatprocessor.UnitOpus{
err = stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{
PTS: pts,
Frame: au.Frame,
NTP: time.Now(),

2
internal/formatprocessor/generic_test.go

@ -11,7 +11,7 @@ import ( @@ -11,7 +11,7 @@ import (
func TestGenericRemovePadding(t *testing.T) {
forma := &formats.Generic{
PayloadTyp: 96,
RTPMap: "private/90000",
RTPMa: "private/90000",
}
forma.Init()

8
internal/formatprocessor/h264.go

@ -104,7 +104,12 @@ func newH264( @@ -104,7 +104,12 @@ func newH264(
}
if allocateEncoder {
t.encoder = forma.CreateEncoder()
t.encoder = &rtph264.Encoder{
PayloadMaxSize: udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
PacketizationMode: forma.PacketizationMode,
}
t.encoder.Init()
}
return t, nil
@ -280,6 +285,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -280,6 +285,7 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error {
tunit.AU = t.remuxAccessUnit(tunit.AU)
}
// encode into RTP
if len(tunit.AU) != 0 {
pkts, err := t.encoder.Encode(tunit.AU, tunit.PTS)
if err != nil {

7
internal/formatprocessor/h265.go

@ -111,7 +111,11 @@ func newH265( @@ -111,7 +111,11 @@ func newH265(
}
if allocateEncoder {
t.encoder = forma.CreateEncoder()
t.encoder = &rtph265.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
}
t.encoder.Init()
}
return t, nil
@ -303,6 +307,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -303,6 +307,7 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error {
tunit.AU = t.remuxAccessUnit(tunit.AU)
}
// encode into RTP
if len(tunit.AU) != 0 {
pkts, err := t.encoder.Encode(tunit.AU, tunit.PTS)
if err != nil {

99
internal/formatprocessor/mpeg2audio.go

@ -0,0 +1,99 @@ @@ -0,0 +1,99 @@
package formatprocessor
import (
"fmt"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/formats/rtpmpeg2audio"
"github.com/pion/rtp"
)
// UnitMPEG2Audio is a MPEG2-audio data unit.
type UnitMPEG2Audio struct {
RTPPackets []*rtp.Packet
NTP time.Time
PTS time.Duration
Frames [][]byte
}
// GetRTPPackets implements Unit.
func (d *UnitMPEG2Audio) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitMPEG2Audio) GetNTP() time.Time {
return d.NTP
}
type formatProcessorMPEG2Audio struct {
udpMaxPayloadSize int
format *formats.MPEG2Audio
encoder *rtpmpeg2audio.Encoder
decoder *rtpmpeg2audio.Decoder
}
func newMPEG2Audio(
udpMaxPayloadSize int,
forma *formats.MPEG2Audio,
allocateEncoder bool,
) (*formatProcessorMPEG2Audio, error) {
t := &formatProcessorMPEG2Audio{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if allocateEncoder {
t.encoder = &rtpmpeg2audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitMPEG2Audio)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
}
frames, pts, err := t.decoder.Decode(pkt)
if err != nil {
return err
}
tunit.Frames = frames
tunit.PTS = pts
}
// route packet as is
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.Frames, tunit.PTS)
if err != nil {
return err
}
tunit.RTPPackets = pkts
return nil
}

13
internal/formatprocessor/mpeg4audio.go

@ -45,7 +45,15 @@ func newMPEG4Audio( @@ -45,7 +45,15 @@ func newMPEG4Audio(
}
if allocateEncoder {
t.encoder = forma.CreateEncoder()
t.encoder = &rtpmpeg4audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
SampleRate: forma.Config.SampleRate,
SizeLength: forma.SizeLength,
IndexLength: forma.IndexLength,
IndexDeltaLength: forma.IndexDeltaLength,
}
t.encoder.Init()
}
return t, nil
@ -88,11 +96,12 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e @@ -88,11 +96,12 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.AUs, tunit.PTS)
if err != nil {
return err
}
tunit.RTPPackets = pkts
return nil
}

10
internal/formatprocessor/opus.go

@ -45,7 +45,12 @@ func newOpus( @@ -45,7 +45,12 @@ func newOpus(
}
if allocateEncoder {
t.encoder = forma.CreateEncoder()
t.encoder = &rtpsimpleaudio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
SampleRate: 48000,
}
t.encoder.Init()
}
return t, nil
@ -85,11 +90,12 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -85,11 +90,12 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error {
return nil
}
// encode into RTP
pkt, err := t.encoder.Encode(tunit.Frame, tunit.PTS)
if err != nil {
return err
}
tunit.RTPPackets = []*rtp.Packet{pkt}
return nil
}

3
internal/formatprocessor/processor.go

@ -30,6 +30,9 @@ func New( @@ -30,6 +30,9 @@ func New(
case *formats.VP9:
return newVP9(udpMaxPayloadSize, forma, generateRTPPackets)
case *formats.MPEG2Audio:
return newMPEG2Audio(udpMaxPayloadSize, forma, generateRTPPackets)
case *formats.MPEG4Audio:
return newMPEG4Audio(udpMaxPayloadSize, forma, generateRTPPackets)

9
internal/formatprocessor/vp8.go

@ -45,7 +45,11 @@ func newVP8( @@ -45,7 +45,11 @@ func newVP8(
}
if allocateEncoder {
t.encoder = forma.CreateEncoder()
t.encoder = &rtpvp8.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
}
t.encoder.Init()
}
return t, nil
@ -88,11 +92,12 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -88,11 +92,12 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error {
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.Frame, tunit.PTS)
if err != nil {
return err
}
tunit.RTPPackets = pkts
return nil
}

9
internal/formatprocessor/vp9.go

@ -45,7 +45,11 @@ func newVP9( @@ -45,7 +45,11 @@ func newVP9(
}
if allocateEncoder {
t.encoder = forma.CreateEncoder()
t.encoder = &rtpvp9.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
}
t.encoder.Init()
}
return t, nil
@ -88,11 +92,12 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -88,11 +92,12 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error {
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.Frame, tunit.PTS)
if err != nil {
return err
}
tunit.RTPPackets = pkts
return nil
}

54
internal/rtmp/conn.go

@ -23,7 +23,6 @@ import ( @@ -23,7 +23,6 @@ import (
const (
codecH264 = 7
codecAAC = 10
)
func resultIsOK1(res *message.MsgCommandAMF0) bool {
@ -617,7 +616,7 @@ func trackFromAACDecoderConfig(data []byte) (*formats.MPEG4Audio, error) { @@ -617,7 +616,7 @@ func trackFromAACDecoderConfig(data []byte) (*formats.MPEG4Audio, error) {
var errEmptyMetadata = errors.New("metadata is empty")
func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *formats.MPEG4Audio, error) {
func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, formats.Format, error) {
if len(payload) != 1 {
return nil, nil, fmt.Errorf("invalid metadata")
}
@ -627,6 +626,9 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f @@ -627,6 +626,9 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f
return nil, nil, fmt.Errorf("invalid metadata")
}
var videoTrack formats.Format
var audioTrack formats.Format
hasVideo, err := func() (bool, error) {
v, ok := md.GetV("videocodecid")
if !ok {
@ -667,7 +669,11 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f @@ -667,7 +669,11 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f
case 0:
return false, nil
case codecAAC:
case message.CodecMPEG2Audio:
audioTrack = &formats.MPEG2Audio{}
return true, nil
case message.CodecMPEG4Audio:
return true, nil
}
@ -687,9 +693,6 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f @@ -687,9 +693,6 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f
return nil, nil, errEmptyMetadata
}
var videoTrack formats.Format
var audioTrack *formats.MPEG4Audio
for {
msg, err := c.ReadMessage()
if err != nil {
@ -750,7 +753,7 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f @@ -750,7 +753,7 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (formats.Format, *f
}
if audioTrack == nil {
if tmsg.AACType == flvio.AVC_SEQHDR {
if tmsg.Codec == message.CodecMPEG4Audio && tmsg.AACType == flvio.AVC_SEQHDR {
audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload)
if err != nil {
return nil, nil, err
@ -842,7 +845,7 @@ outer: @@ -842,7 +845,7 @@ outer:
// ReadTracks reads track informations.
// It returns the video track and the audio track.
func (c *Conn) ReadTracks() (formats.Format, *formats.MPEG4Audio, error) {
func (c *Conn) ReadTracks() (formats.Format, formats.Format, error) {
msg, err := func() (message.Message, error) {
for {
msg, err := c.ReadMessage()
@ -901,7 +904,7 @@ func (c *Conn) ReadTracks() (formats.Format, *formats.MPEG4Audio, error) { @@ -901,7 +904,7 @@ func (c *Conn) ReadTracks() (formats.Format, *formats.MPEG4Audio, error) {
}
// WriteTracks writes track informations.
func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Audio) error {
func (c *Conn) WriteTracks(videoTrack formats.Format, audioTrack formats.Format) error {
err := c.WriteMessage(&message.MsgDataAMF0{
ChunkStreamID: 4,
MessageStreamID: 0x1000000,
@ -916,10 +919,13 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au @@ -916,10 +919,13 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au
{
K: "videocodecid",
V: func() float64 {
if videoTrack != nil {
switch videoTrack.(type) {
case *formats.H264:
return codecH264
default:
return 0
}
return 0
}(),
},
{
@ -929,10 +935,16 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au @@ -929,10 +935,16 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au
{
K: "audiocodecid",
V: func() float64 {
if audioTrack != nil {
return codecAAC
switch audioTrack.(type) {
case *formats.MPEG2Audio:
return message.CodecMPEG2Audio
case *formats.MPEG4Audio:
return message.CodecMPEG4Audio
default:
return 0
}
return 0
}(),
},
},
@ -942,10 +954,11 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au @@ -942,10 +954,11 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au
return err
}
// write decoder config only if SPS and PPS are available.
// if they're not available yet, they're sent later.
if videoTrack != nil {
sps, pps := videoTrack.SafeParams()
if h264Track, ok := videoTrack.(*formats.H264); ok {
sps, pps := h264Track.SafeParams()
// write decoder config only if SPS and PPS are available.
// if they're not available yet, they're sent later.
if sps != nil && pps != nil {
buf, _ := h264conf.Conf{
SPS: sps,
@ -965,8 +978,8 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au @@ -965,8 +978,8 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au
}
}
if audioTrack != nil {
enc, err := audioTrack.Config.Marshal()
if mpeg4audioTrack, ok := audioTrack.(*formats.MPEG4Audio); ok {
enc, err := mpeg4audioTrack.Config.Marshal()
if err != nil {
return err
}
@ -974,6 +987,7 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au @@ -974,6 +987,7 @@ func (c *Conn) WriteTracks(videoTrack *formats.H264, audioTrack *formats.MPEG4Au
err = c.WriteMessage(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,

13
internal/rtmp/conn_test.go

@ -515,7 +515,7 @@ func TestReadTracks(t *testing.T) { @@ -515,7 +515,7 @@ func TestReadTracks(t *testing.T) {
PPS: pps,
PacketizationMode: 1,
},
(*formats.MPEG4Audio)(nil),
nil,
},
{
"metadata without codec id",
@ -801,7 +801,7 @@ func TestReadTracks(t *testing.T) { @@ -801,7 +801,7 @@ func TestReadTracks(t *testing.T) {
},
{
K: "audiocodecid",
V: float64(codecAAC),
V: float64(message.CodecMPEG4Audio),
},
},
},
@ -832,6 +832,7 @@ func TestReadTracks(t *testing.T) { @@ -832,6 +832,7 @@ func TestReadTracks(t *testing.T) {
err = mrw.Write(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
@ -932,6 +933,7 @@ func TestReadTracks(t *testing.T) { @@ -932,6 +933,7 @@ func TestReadTracks(t *testing.T) {
err = mrw.Write(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
@ -965,6 +967,7 @@ func TestReadTracks(t *testing.T) { @@ -965,6 +967,7 @@ func TestReadTracks(t *testing.T) {
err = mrw.Write(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
@ -984,6 +987,7 @@ func TestReadTracks(t *testing.T) { @@ -984,6 +987,7 @@ func TestReadTracks(t *testing.T) {
err = mrw.Write(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
@ -995,6 +999,7 @@ func TestReadTracks(t *testing.T) { @@ -995,6 +999,7 @@ func TestReadTracks(t *testing.T) {
err = mrw.Write(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
@ -1026,7 +1031,7 @@ func TestReadTracks(t *testing.T) { @@ -1026,7 +1031,7 @@ func TestReadTracks(t *testing.T) {
},
{
K: "audiocodecid",
V: float64(codecAAC),
V: float64(message.CodecMPEG4Audio),
},
},
},
@ -1075,6 +1080,7 @@ func TestReadTracks(t *testing.T) { @@ -1075,6 +1080,7 @@ func TestReadTracks(t *testing.T) {
err = mrw.Write(&message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
@ -1375,6 +1381,7 @@ func TestWriteTracks(t *testing.T) { @@ -1375,6 +1381,7 @@ func TestWriteTracks(t *testing.T) {
require.Equal(t, &message.MsgAudio{
ChunkStreamID: message.MsgAudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,

48
internal/rtmp/message/msg_audio.go

@ -4,15 +4,19 @@ import ( @@ -4,15 +4,19 @@ import (
"fmt"
"time"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/mediamtx/internal/rtmp/chunk"
"github.com/aler9/mediamtx/internal/rtmp/rawmessage"
)
const (
// MsgAudioChunkStreamID is the chunk stream ID that is usually used to send MsgAudio{}
MsgAudioChunkStreamID = 6
MsgAudioChunkStreamID = 4
)
// supported audio codecs
const (
CodecMPEG2Audio = 2
CodecMPEG4Audio = 10
)
// MsgAudio is an audio message.
@ -20,10 +24,11 @@ type MsgAudio struct { @@ -20,10 +24,11 @@ type MsgAudio struct {
ChunkStreamID byte
DTS time.Duration
MessageStreamID uint32
Codec uint8
Rate uint8
Depth uint8
Channels uint8
AACType uint8
AACType uint8 // only for CodecMPEG4Audio
Payload []byte
}
@ -37,28 +42,45 @@ func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error { @@ -37,28 +42,45 @@ func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error {
return fmt.Errorf("invalid body size")
}
codec := raw.Body[0] >> 4
if codec != flvio.SOUND_AAC {
return fmt.Errorf("unsupported audio codec: %d", codec)
m.Codec = raw.Body[0] >> 4
switch m.Codec {
case CodecMPEG2Audio, CodecMPEG4Audio:
default:
return fmt.Errorf("unsupported audio codec: %d", m.Codec)
}
m.Rate = (raw.Body[0] >> 2) & 0x03
m.Depth = (raw.Body[0] >> 1) & 0x01
m.Channels = raw.Body[0] & 0x01
m.AACType = raw.Body[1]
m.Payload = raw.Body[2:]
if m.Codec == CodecMPEG2Audio {
m.Payload = raw.Body[1:]
} else {
m.AACType = raw.Body[1]
m.Payload = raw.Body[2:]
}
return nil
}
// Marshal implements Message.
func (m MsgAudio) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 2+len(m.Payload))
var l int
if m.Codec == CodecMPEG2Audio {
l = 1 + len(m.Payload)
} else {
l = 2 + len(m.Payload)
}
body := make([]byte, l)
body[0] = flvio.SOUND_AAC<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels
body[1] = m.AACType
body[0] = m.Codec<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels
copy(body[2:], m.Payload)
if m.Codec == CodecMPEG2Audio {
copy(body[1:], m.Payload)
} else {
body[1] = m.AACType
copy(body[2:], m.Payload)
}
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,

20
internal/rtmp/message/reader_test.go

@ -27,11 +27,29 @@ var readWriterCases = []struct { @@ -27,11 +27,29 @@ var readWriterCases = []struct {
},
},
{
"audio",
"audio mpeg2",
&MsgAudio{
ChunkStreamID: 7,
DTS: 6013806 * time.Millisecond,
MessageStreamID: 4534543,
Codec: CodecMPEG2Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
[]byte{
0x7, 0x5b, 0xc3, 0x6e, 0x0, 0x0, 0x5, 0x8, 0x0, 0x45, 0x31, 0xf, 0x2f,
0x01, 0x02, 0x03, 0x04,
},
},
{
"audio mpeg4",
&MsgAudio{
ChunkStreamID: 7,
DTS: 6013806 * time.Millisecond,
MessageStreamID: 4534543,
Codec: CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,

Loading…
Cancel
Save