Browse Source

support reading MP4A-LATM-encoded AAC with RTMP and HLS (#1694) (#1898)

pull/1900/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
681a00347d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      go.mod
  2. 4
      go.sum
  3. 59
      internal/core/hls_muxer.go
  4. 2
      internal/core/hls_source.go
  5. 80
      internal/core/rtmp_conn.go
  6. 2
      internal/core/udp_source.go
  7. 42
      internal/core/webrtc_outgoing_track.go
  8. 19
      internal/formatprocessor/av1.go
  9. 39
      internal/formatprocessor/h264.go
  10. 4
      internal/formatprocessor/h264_test.go
  11. 38
      internal/formatprocessor/h265.go
  12. 4
      internal/formatprocessor/h265_test.go
  13. 19
      internal/formatprocessor/mpeg2audio.go
  14. 55
      internal/formatprocessor/mpeg4audio_generic.go
  15. 124
      internal/formatprocessor/mpeg4audio_latm.go
  16. 23
      internal/formatprocessor/opus.go
  17. 7
      internal/formatprocessor/processor.go
  18. 21
      internal/formatprocessor/vp8.go
  19. 21
      internal/formatprocessor/vp9.go
  20. 17
      internal/rtmp/tracks/write.go

2
go.mod

@ -8,7 +8,7 @@ require ( @@ -8,7 +8,7 @@ require (
github.com/alecthomas/kong v0.7.1
github.com/asticode/go-astits v1.11.0
github.com/bluenviron/gohlslib v0.2.5
github.com/bluenviron/gortsplib/v3 v3.6.6
github.com/bluenviron/gortsplib/v3 v3.7.0
github.com/bluenviron/mediacommon v0.5.1
github.com/fsnotify/fsnotify v1.6.0
github.com/gin-gonic/gin v1.9.1

4
go.sum

@ -12,8 +12,8 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z @@ -12,8 +12,8 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/gohlslib v0.2.5 h1:M+uWOLjOa7hirUITLndf9bYarA8F7HR1m7+5dG/3WVI=
github.com/bluenviron/gohlslib v0.2.5/go.mod h1:IvhV5h+92FljHy/xHlcOZm09rDeqg1RR88MjCTBx67Y=
github.com/bluenviron/gortsplib/v3 v3.6.6 h1:/BPCjKUWS1lSyjxpHlsSpnEY1NGh2GuegikragY5pWY=
github.com/bluenviron/gortsplib/v3 v3.6.6/go.mod h1:2lvENNScNyPBPQalK9DjZNmfhXFh5JFRz6jb/gJPTOM=
github.com/bluenviron/gortsplib/v3 v3.7.0 h1:gpepAA0Vf17+V5cw4epPySzc6O1hHazidXisGHtHCLY=
github.com/bluenviron/gortsplib/v3 v3.7.0/go.mod h1:d6F2mJqC2qXrav6Xzg7kdnqYFUYobrhXjERQlWH+eRs=
github.com/bluenviron/mediacommon v0.5.1 h1:eNLc3SQp1pOe5lwsFgVUzZwf/NPkQbWGnGpeS8EQMeU=
github.com/bluenviron/mediacommon v0.5.1/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=

59
internal/core/hls_muxer.go

@ -421,16 +421,16 @@ func (m *hlsMuxer) createVideoTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -421,16 +421,16 @@ func (m *hlsMuxer) createVideoTrack(stream *stream) (*media.Media, *gohlslib.Tra
}
func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Track) {
var audioFormatMPEG4Audio *formats.MPEG4Audio
audioMedia := stream.medias().FindFormat(&audioFormatMPEG4Audio)
var audioFormatMPEG4AudioGeneric *formats.MPEG4AudioGeneric
audioMedia := stream.medias().FindFormat(&audioFormatMPEG4AudioGeneric)
if audioFormatMPEG4Audio != nil {
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(m, audioMedia, audioFormatMPEG4Audio, func(unit formatprocessor.Unit) {
stream.readerAdd(m, audioMedia, audioFormatMPEG4AudioGeneric, func(unit formatprocessor.Unit) {
m.ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4Audio)
tunit := unit.(*formatprocessor.UnitMPEG4AudioGeneric)
if tunit.AUs == nil {
return nil
@ -446,7 +446,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -446,7 +446,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra
err := m.muxer.WriteAudio(
tunit.NTP,
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormatMPEG4Audio.ClockRate()),
time.Second/time.Duration(audioFormatMPEG4AudioGeneric.ClockRate()),
au)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
@ -459,7 +459,50 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -459,7 +459,50 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra
return audioMedia, &gohlslib.Track{
Codec: &codecs.MPEG4Audio{
Config: *audioFormatMPEG4Audio.Config,
Config: *audioFormatMPEG4AudioGeneric.Config,
},
}
}
var audioFormatMPEG4AudioLATM *formats.MPEG4AudioLATM
audioMedia = stream.medias().FindFormat(&audioFormatMPEG4AudioLATM)
if audioMedia != nil &&
audioFormatMPEG4AudioLATM.Config != nil &&
len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 &&
len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 {
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(m, audioMedia, audioFormatMPEG4AudioLATM, func(unit formatprocessor.Unit) {
m.ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4AudioLATM)
if tunit.AU == nil {
return nil
}
if !audioStartPTSFilled {
audioStartPTSFilled = true
audioStartPTS = tunit.PTS
}
pts := tunit.PTS - audioStartPTS
err := m.muxer.WriteAudio(
tunit.NTP,
pts,
tunit.AU)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
return nil
})
})
return audioMedia, &gohlslib.Track{
Codec: &codecs.MPEG4Audio{
Config: *audioFormatMPEG4AudioLATM.Config.Programs[0].Layers[0].AudioSpecificConfig,
},
}
}
@ -467,7 +510,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -467,7 +510,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra
var audioFormatOpus *formats.Opus
audioMedia = stream.medias().FindFormat(&audioFormatOpus)
if audioFormatOpus != nil {
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration

2
internal/core/hls_source.go

@ -142,7 +142,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -142,7 +142,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnData(track, func(pts time.Duration, unit interface{}) {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{
PTS: pts,
AUs: [][]byte{unit.([]byte)},
NTP: time.Now(),

80
internal/core/rtmp_conn.go

@ -163,7 +163,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -163,7 +163,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
tmsg := msg.(*message.Audio)
if tmsg.AACType == message.AudioAACTypeAU {
stream.writeUnit(medi, format, &formatprocessor.UnitMPEG4Audio{
stream.writeUnit(medi, format, &formatprocessor.UnitMPEG4AudioGeneric{
PTS: tmsg.DTS,
AUs: [][]byte{tmsg.Payload},
NTP: time.Now(),
@ -558,19 +558,23 @@ func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBu @@ -558,19 +558,23 @@ func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBu
return nil, nil
}
func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBuffer,
videoFormat formats.Format, videoFirstIDRFound *bool, videoStartDTS *time.Duration,
func (c *rtmpConn) findAudioFormat(
stream *stream,
ringBuffer *ringbuffer.RingBuffer,
videoFormat formats.Format,
videoFirstIDRFound *bool,
videoStartDTS *time.Duration,
) (*media.Media, formats.Format) {
var audioFormatMPEG4 *formats.MPEG4Audio
audioMedia := stream.medias().FindFormat(&audioFormatMPEG4)
var audioFormatMPEG4Generic *formats.MPEG4AudioGeneric
audioMedia := stream.medias().FindFormat(&audioFormatMPEG4Generic)
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(c, audioMedia, audioFormatMPEG4, func(unit formatprocessor.Unit) {
stream.readerAdd(c, audioMedia, audioFormatMPEG4Generic, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4Audio)
tunit := unit.(*formatprocessor.UnitMPEG4AudioGeneric)
if tunit.AUs == nil {
return nil
@ -605,7 +609,7 @@ func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBu @@ -605,7 +609,7 @@ func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBu
AACType: message.AudioAACTypeAU,
Payload: au,
DTS: pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormatMPEG4.ClockRate()),
time.Second/time.Duration(audioFormatMPEG4Generic.ClockRate()),
})
if err != nil {
return err
@ -616,7 +620,65 @@ func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBu @@ -616,7 +620,65 @@ func (c *rtmpConn) findAudioFormat(stream *stream, ringBuffer *ringbuffer.RingBu
})
})
return audioMedia, audioFormatMPEG4
return audioMedia, audioFormatMPEG4Generic
}
var audioFormatMPEG4AudioLATM *formats.MPEG4AudioLATM
audioMedia = stream.medias().FindFormat(&audioFormatMPEG4AudioLATM)
if audioMedia != nil &&
audioFormatMPEG4AudioLATM.Config != nil &&
len(audioFormatMPEG4AudioLATM.Config.Programs) == 1 &&
len(audioFormatMPEG4AudioLATM.Config.Programs[0].Layers) == 1 {
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(c, audioMedia, audioFormatMPEG4AudioLATM, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4AudioLATM)
if tunit.AU == nil {
return nil
}
if !audioStartPTSFilled {
audioStartPTSFilled = true
audioStartPTS = tunit.PTS
}
pts := tunit.PTS - audioStartPTS
if videoFormat != nil {
if !*videoFirstIDRFound {
return nil
}
pts -= *videoStartDTS
if pts < 0 {
return nil
}
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err := c.conn.WriteMessage(&message.Audio{
ChunkStreamID: message.AudioChunkStreamID,
MessageStreamID: 0x1000000,
Codec: message.CodecMPEG4Audio,
Rate: flvio.SOUND_44Khz,
Depth: flvio.SOUND_16BIT,
Channels: flvio.SOUND_STEREO,
AACType: message.AudioAACTypeAU,
Payload: tunit.AU,
DTS: pts,
})
if err != nil {
return err
}
return nil
})
})
return audioMedia, audioFormatMPEG4AudioLATM
}
var audioFormatMPEG2 *formats.MPEG2Audio

2
internal/core/udp_source.go

@ -267,7 +267,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, _ chan *conf.Pa @@ -267,7 +267,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, _ chan *conf.Pa
aus[i] = pkt.AU
}
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4Audio{
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{
PTS: pts,
AUs: aus,
NTP: time.Now(),

42
internal/core/webrtc_outgoing_track.go

@ -27,9 +27,9 @@ type webRTCOutgoingTrack struct { @@ -27,9 +27,9 @@ type webRTCOutgoingTrack struct {
func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, error) {
var av1Format *formats.AV1
av1Media := medias.FindFormat(&av1Format)
videoMedia := medias.FindFormat(&av1Format)
if av1Format != nil {
if videoMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
@ -49,7 +49,7 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err @@ -49,7 +49,7 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err
encoder.Init()
return &webRTCOutgoingTrack{
media: av1Media,
media: videoMedia,
format: av1Format,
track: webRTCTrak,
cb: func(unit formatprocessor.Unit) error {
@ -74,9 +74,9 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err @@ -74,9 +74,9 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err
}
var vp9Format *formats.VP9
vp9Media := medias.FindFormat(&vp9Format)
videoMedia = medias.FindFormat(&vp9Format)
if vp9Format != nil {
if videoMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
@ -96,7 +96,7 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err @@ -96,7 +96,7 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err
encoder.Init()
return &webRTCOutgoingTrack{
media: vp9Media,
media: videoMedia,
format: vp9Format,
track: webRTCTrak,
cb: func(unit formatprocessor.Unit) error {
@ -121,9 +121,9 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err @@ -121,9 +121,9 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err
}
var vp8Format *formats.VP8
vp8Media := medias.FindFormat(&vp8Format)
videoMedia = medias.FindFormat(&vp8Format)
if vp8Format != nil {
if videoMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
@ -143,7 +143,7 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err @@ -143,7 +143,7 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err
encoder.Init()
return &webRTCOutgoingTrack{
media: vp8Media,
media: videoMedia,
format: vp8Format,
track: webRTCTrak,
cb: func(unit formatprocessor.Unit) error {
@ -168,9 +168,9 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err @@ -168,9 +168,9 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err
}
var h264Format *formats.H264
h264Media := medias.FindFormat(&h264Format)
videoMedia = medias.FindFormat(&h264Format)
if h264Format != nil {
if videoMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
@ -193,7 +193,7 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err @@ -193,7 +193,7 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err
firstNALUReceived := false
return &webRTCOutgoingTrack{
media: h264Media,
media: videoMedia,
format: h264Format,
track: webRTCTrak,
cb: func(unit formatprocessor.Unit) error {
@ -232,9 +232,9 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err @@ -232,9 +232,9 @@ func newWebRTCOutgoingTrackVideo(medias media.Medias) (*webRTCOutgoingTrack, err
func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, error) {
var opusFormat *formats.Opus
opusMedia := medias.FindFormat(&opusFormat)
audioMedia := medias.FindFormat(&opusFormat)
if opusFormat != nil {
if audioMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
@ -249,7 +249,7 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err @@ -249,7 +249,7 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err
}
return &webRTCOutgoingTrack{
media: opusMedia,
media: audioMedia,
format: opusFormat,
track: webRTCTrak,
cb: func(unit formatprocessor.Unit) error {
@ -263,9 +263,9 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err @@ -263,9 +263,9 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err
}
var g722Format *formats.G722
g722Media := medias.FindFormat(&g722Format)
audioMedia = medias.FindFormat(&g722Format)
if g722Format != nil {
if audioMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeG722,
@ -279,7 +279,7 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err @@ -279,7 +279,7 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err
}
return &webRTCOutgoingTrack{
media: g722Media,
media: audioMedia,
format: g722Format,
track: webRTCTrak,
cb: func(unit formatprocessor.Unit) error {
@ -293,9 +293,9 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err @@ -293,9 +293,9 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err
}
var g711Format *formats.G711
g711Media := medias.FindFormat(&g711Format)
audioMedia = medias.FindFormat(&g711Format)
if g711Format != nil {
if audioMedia != nil {
var mtyp string
if g711Format.MULaw {
mtyp = webrtc.MimeTypePCMU
@ -316,7 +316,7 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err @@ -316,7 +316,7 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err
}
return &webRTCOutgoingTrack{
media: g711Media,
media: audioMedia,
format: g711Format,
track: webRTCTrak,
cb: func(unit formatprocessor.Unit) error {

19
internal/formatprocessor/av1.go

@ -54,15 +54,22 @@ func newAV1( @@ -54,15 +54,22 @@ func newAV1(
}
if generateRTPPackets {
t.encoder = &rtpav1.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
err := t.createEncoder()
if err != nil {
return nil, err
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorAV1) createEncoder() error {
t.encoder = &rtpav1.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
}
return t.encoder.Init()
}
func (t *formatProcessorAV1) checkKeyFrameInterval(ntp time.Time, isKeyFrame bool) {
if !t.lastKeyFrameTimeReceived || isKeyFrame {
t.lastKeyFrameTimeReceived = true
@ -99,7 +106,11 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -99,7 +106,11 @@ func (t *formatProcessorAV1) Process(unit Unit, hasNonRTSPReaders bool) error {
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
// DecodeUntilMarker() is necessary, otherwise Encode() generates partial groups

39
internal/formatprocessor/h264.go

@ -111,17 +111,29 @@ func newH264( @@ -111,17 +111,29 @@ func newH264(
}
if generateRTPPackets {
t.encoder = &rtph264.Encoder{
PayloadMaxSize: udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
PacketizationMode: forma.PacketizationMode,
err := t.createEncoder(nil, nil, nil)
if err != nil {
return nil, err
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorH264) createEncoder(
ssrc *uint32, initialSequenceNumber *uint16, initialTimestamp *uint32,
) error {
t.encoder = &rtph264.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp,
SSRC: ssrc,
InitialSequenceNumber: initialSequenceNumber,
InitialTimestamp: initialTimestamp,
PacketizationMode: t.format.PacketizationMode,
}
return t.encoder.Init()
}
func (t *formatProcessorH264) updateTrackParametersFromRTPPacket(pkt *rtp.Packet) {
sps, pps := rtpH264ExtractSPSPPS(pkt)
update := false
@ -263,22 +275,21 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -263,22 +275,21 @@ func (t *formatProcessorH264) Process(unit Unit, hasNonRTSPReaders bool) error {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
v3 := pkt.Timestamp
t.encoder = &rtph264.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: pkt.PayloadType,
SSRC: &v1,
InitialSequenceNumber: &v2,
InitialTimestamp: &v3,
PacketizationMode: t.format.PacketizationMode,
err := t.createEncoder(&v1, &v2, &v3)
if err != nil {
return err
}
t.encoder.Init()
}
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
if t.encoder != nil {

4
internal/formatprocessor/h264_test.go

@ -31,10 +31,12 @@ func TestH264DynamicParams(t *testing.T) { @@ -31,10 +31,12 @@ func TestH264DynamicParams(t *testing.T) {
p, err := New(1472, forma, false, nil)
require.NoError(t, err)
enc := forma.CreateEncoder()
enc, err := forma.CreateEncoder2()
require.NoError(t, err)
pkts, err := enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}}, 0)
require.NoError(t, err)
data := &UnitH264{RTPPackets: []*rtp.Packet{pkts[0]}}
p.Process(data, true)

38
internal/formatprocessor/h265.go

@ -118,16 +118,29 @@ func newH265( @@ -118,16 +118,29 @@ func newH265(
}
if generateRTPPackets {
t.encoder = &rtph265.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
err := t.createEncoder(nil, nil, nil)
if err != nil {
return nil, err
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorH265) createEncoder(
ssrc *uint32, initialSequenceNumber *uint16, initialTimestamp *uint32,
) error {
t.encoder = &rtph265.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp,
SSRC: ssrc,
InitialSequenceNumber: initialSequenceNumber,
InitialTimestamp: initialTimestamp,
MaxDONDiff: t.format.MaxDONDiff,
}
return t.encoder.Init()
}
func (t *formatProcessorH265) updateTrackParametersFromRTPPacket(pkt *rtp.Packet) {
vps, sps, pps := rtpH265ExtractVPSSPSPPS(pkt)
update := false
@ -284,22 +297,21 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -284,22 +297,21 @@ func (t *formatProcessorH265) Process(unit Unit, hasNonRTSPReaders bool) error {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
v3 := pkt.Timestamp
t.encoder = &rtph265.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: pkt.PayloadType,
SSRC: &v1,
InitialSequenceNumber: &v2,
InitialTimestamp: &v3,
MaxDONDiff: t.format.MaxDONDiff,
err := t.createEncoder(&v1, &v2, &v3)
if err != nil {
return err
}
t.encoder.Init()
}
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
if t.encoder != nil {

4
internal/formatprocessor/h265_test.go

@ -19,10 +19,12 @@ func TestH265DynamicParams(t *testing.T) { @@ -19,10 +19,12 @@ func TestH265DynamicParams(t *testing.T) {
p, err := New(1472, forma, false, nil)
require.NoError(t, err)
enc := forma.CreateEncoder()
enc, err := forma.CreateEncoder2()
require.NoError(t, err)
pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}}, 0)
require.NoError(t, err)
data := &UnitH265{RTPPackets: []*rtp.Packet{pkts[0]}}
p.Process(data, true)

19
internal/formatprocessor/mpeg2audio.go

@ -48,15 +48,22 @@ func newMPEG2Audio( @@ -48,15 +48,22 @@ func newMPEG2Audio(
}
if generateRTPPackets {
t.encoder = &rtpmpeg2audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
err := t.createEncoder()
if err != nil {
return nil, err
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorMPEG2Audio) createEncoder() error {
t.encoder = &rtpmpeg2audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
}
return t.encoder.Init()
}
func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitMPEG2Audio)
@ -75,7 +82,11 @@ func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) e @@ -75,7 +82,11 @@ func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) e
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
frames, pts, err := t.decoder.Decode(pkt)

55
internal/formatprocessor/mpeg4audio.go → internal/formatprocessor/mpeg4audio_generic.go

@ -11,8 +11,8 @@ import ( @@ -11,8 +11,8 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
)
// UnitMPEG4Audio is a MPEG-4 Audio data unit.
type UnitMPEG4Audio struct {
// UnitMPEG4AudioGeneric is a MPEG-4 Audio data unit.
type UnitMPEG4AudioGeneric struct {
RTPPackets []*rtp.Packet
NTP time.Time
PTS time.Duration
@ -20,50 +20,57 @@ type UnitMPEG4Audio struct { @@ -20,50 +20,57 @@ type UnitMPEG4Audio struct {
}
// GetRTPPackets implements Unit.
func (d *UnitMPEG4Audio) GetRTPPackets() []*rtp.Packet {
func (d *UnitMPEG4AudioGeneric) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitMPEG4Audio) GetNTP() time.Time {
func (d *UnitMPEG4AudioGeneric) GetNTP() time.Time {
return d.NTP
}
type formatProcessorMPEG4Audio struct {
type formatProcessorMPEG4AudioGeneric struct {
udpMaxPayloadSize int
format *formats.MPEG4Audio
encoder *rtpmpeg4audio.Encoder
decoder *rtpmpeg4audio.Decoder
}
func newMPEG4Audio(
func newMPEG4AudioGeneric(
udpMaxPayloadSize int,
forma *formats.MPEG4Audio,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorMPEG4Audio, error) {
t := &formatProcessorMPEG4Audio{
) (*formatProcessorMPEG4AudioGeneric, error) {
t := &formatProcessorMPEG4AudioGeneric{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
t.encoder = &rtpmpeg4audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
SampleRate: forma.Config.SampleRate,
SizeLength: forma.SizeLength,
IndexLength: forma.IndexLength,
IndexDeltaLength: forma.IndexDeltaLength,
err := t.createEncoder()
if err != nil {
return nil, err
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitMPEG4Audio)
func (t *formatProcessorMPEG4AudioGeneric) createEncoder() error {
t.encoder = &rtpmpeg4audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp,
SampleRate: t.format.Config.SampleRate,
SizeLength: t.format.SizeLength,
IndexLength: t.format.IndexLength,
IndexDeltaLength: t.format.IndexDeltaLength,
}
return t.encoder.Init()
}
func (t *formatProcessorMPEG4AudioGeneric) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitMPEG4AudioGeneric)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
@ -78,9 +85,13 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e @@ -78,9 +85,13 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if hasNonRTSPReaders || t.decoder != nil || true {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
aus, pts, err := t.decoder.Decode(pkt)
@ -109,8 +120,8 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e @@ -109,8 +120,8 @@ func (t *formatProcessorMPEG4Audio) Process(unit Unit, hasNonRTSPReaders bool) e
return nil
}
func (t *formatProcessorMPEG4Audio) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitMPEG4Audio{
func (t *formatProcessorMPEG4AudioGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitMPEG4AudioGeneric{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
}

124
internal/formatprocessor/mpeg4audio_latm.go

@ -0,0 +1,124 @@ @@ -0,0 +1,124 @@
package formatprocessor
import (
"fmt"
"time"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/formats/rtpmpeg4audiolatm"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
)
// UnitMPEG4AudioLATM is a MPEG-4 Audio data unit.
type UnitMPEG4AudioLATM struct {
RTPPackets []*rtp.Packet
NTP time.Time
PTS time.Duration
AU []byte
}
// GetRTPPackets implements Unit.
func (d *UnitMPEG4AudioLATM) GetRTPPackets() []*rtp.Packet {
return d.RTPPackets
}
// GetNTP implements Unit.
func (d *UnitMPEG4AudioLATM) GetNTP() time.Time {
return d.NTP
}
type formatProcessorMPEG4AudioLATM struct {
udpMaxPayloadSize int
format *formats.MPEG4AudioLATM
encoder *rtpmpeg4audiolatm.Encoder
decoder *rtpmpeg4audiolatm.Decoder
}
func newMPEG4AudioLATM(
udpMaxPayloadSize int,
forma *formats.MPEG4AudioLATM,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorMPEG4AudioLATM, error) {
t := &formatProcessorMPEG4AudioLATM{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder()
if err != nil {
return nil, err
}
}
return t, nil
}
func (t *formatProcessorMPEG4AudioLATM) createEncoder() error {
t.encoder = &rtpmpeg4audiolatm.Encoder{
PayloadType: t.format.PayloadTyp,
Config: t.format.Config,
}
return t.encoder.Init()
}
func (t *formatProcessorMPEG4AudioLATM) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitMPEG4AudioLATM)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
au, pts, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpmpeg4audiolatm.ErrMorePacketsNeeded {
return nil
}
return err
}
tunit.AU = au
tunit.PTS = pts
}
// route packet as is
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.AU, tunit.PTS)
if err != nil {
return err
}
tunit.RTPPackets = pkts
return nil
}
func (t *formatProcessorMPEG4AudioLATM) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit {
return &UnitMPEG4AudioLATM{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
}
}

23
internal/formatprocessor/opus.go

@ -48,17 +48,24 @@ func newOpus( @@ -48,17 +48,24 @@ func newOpus(
}
if generateRTPPackets {
t.encoder = &rtpsimpleaudio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
SampleRate: 48000,
err := t.createEncoder()
if err != nil {
return nil, err
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorOpus) createEncoder() error {
t.encoder = &rtpsimpleaudio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp,
SampleRate: 48000,
}
return t.encoder.Init()
}
func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitOpus)
@ -77,7 +84,11 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -77,7 +84,11 @@ func (t *formatProcessorOpus) Process(unit Unit, hasNonRTSPReaders bool) error {
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
frame, pts, err := t.decoder.Decode(pkt)

7
internal/formatprocessor/processor.go

@ -49,8 +49,11 @@ func New( @@ -49,8 +49,11 @@ func New(
case *formats.MPEG2Audio:
return newMPEG2Audio(udpMaxPayloadSize, forma, generateRTPPackets, log)
case *formats.MPEG4Audio:
return newMPEG4Audio(udpMaxPayloadSize, forma, generateRTPPackets, log)
case *formats.MPEG4AudioGeneric:
return newMPEG4AudioGeneric(udpMaxPayloadSize, forma, generateRTPPackets, log)
case *formats.MPEG4AudioLATM:
return newMPEG4AudioLATM(udpMaxPayloadSize, forma, generateRTPPackets, log)
case *formats.Opus:
return newOpus(udpMaxPayloadSize, forma, generateRTPPackets, log)

21
internal/formatprocessor/vp8.go

@ -48,16 +48,23 @@ func newVP8( @@ -48,16 +48,23 @@ func newVP8(
}
if generateRTPPackets {
t.encoder = &rtpvp8.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
err := t.createEncoder()
if err != nil {
return nil, err
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorVP8) createEncoder() error {
t.encoder = &rtpvp8.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp,
}
return t.encoder.Init()
}
func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitVP8)
@ -76,7 +83,11 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -76,7 +83,11 @@ func (t *formatProcessorVP8) Process(unit Unit, hasNonRTSPReaders bool) error {
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
frame, pts, err := t.decoder.Decode(pkt)

21
internal/formatprocessor/vp9.go

@ -48,16 +48,23 @@ func newVP9( @@ -48,16 +48,23 @@ func newVP9(
}
if generateRTPPackets {
t.encoder = &rtpvp9.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: forma.PayloadTyp,
err := t.createEncoder()
if err != nil {
return nil, err
}
t.encoder.Init()
}
return t, nil
}
func (t *formatProcessorVP9) createEncoder() error {
t.encoder = &rtpvp9.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
PayloadType: t.format.PayloadTyp,
}
return t.encoder.Init()
}
func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := unit.(*UnitVP9)
@ -76,7 +83,11 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error { @@ -76,7 +83,11 @@ func (t *formatProcessorVP9) Process(unit Unit, hasNonRTSPReaders bool) error {
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
var err error
t.decoder, err = t.format.CreateDecoder2()
if err != nil {
return err
}
}
frame, pts, err := t.decoder.Decode(pkt)

17
internal/rtmp/tracks/write.go

@ -2,6 +2,7 @@ package tracks @@ -2,6 +2,7 @@ package tracks
import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/bluenviron/mediamtx/internal/rtmp/h264conf"
@ -44,7 +45,7 @@ func Write(w *message.ReadWriter, videoTrack formats.Format, audioTrack formats. @@ -44,7 +45,7 @@ func Write(w *message.ReadWriter, videoTrack formats.Format, audioTrack formats.
case *formats.MPEG2Audio:
return message.CodecMPEG2Audio
case *formats.MPEG4Audio:
case *formats.MPEG4AudioGeneric, *formats.MPEG4AudioLATM:
return message.CodecMPEG4Audio
default:
@ -82,8 +83,18 @@ func Write(w *message.ReadWriter, videoTrack formats.Format, audioTrack formats. @@ -82,8 +83,18 @@ func Write(w *message.ReadWriter, videoTrack formats.Format, audioTrack formats.
}
}
if mpeg4audioTrack, ok := audioTrack.(*formats.MPEG4Audio); ok {
enc, err := mpeg4audioTrack.Config.Marshal()
var audioConfig *mpeg4audio.AudioSpecificConfig
switch track := audioTrack.(type) {
case *formats.MPEG4Audio:
audioConfig = track.Config
case *formats.MPEG4AudioLATM:
audioConfig = track.Config.Programs[0].Layers[0].AudioSpecificConfig
}
if audioConfig != nil {
enc, err := audioConfig.Marshal()
if err != nil {
return err
}

Loading…
Cancel
Save