diff --git a/README.md b/README.md index 679e42fd..54283067 100644 --- a/README.md +++ b/README.md @@ -20,25 +20,25 @@ Live streams can be published to the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| -|[SRT clients](#srt-clients)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| -|[SRT cameras and servers](#srt-cameras-and-servers)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[SRT clients](#srt-clients)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| +|[SRT cameras and servers](#srt-cameras-and-servers)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711| |[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| -|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| -|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| +|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| +|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| |[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)| -|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[Raspberry Pi Cameras](#raspberry-pi-cameras)||H264|| And can be read from the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| -|[SRT](#srt)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[SRT](#srt)||H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[WebRTC](#webrtc)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| -|[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| +|[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711, LPCM and any RTP-compatible codec| |[RTMP](#rtmp)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS](#hls)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)| @@ -794,7 +794,7 @@ snap install vlc ##### Encrypted streams -At the moment VLC doesn't support reading encrypted RTSP streams. However, you can use a proxy like [stunnel](https://www.stunnel.org) or [nginx](https://nginx.org/) or a dedicated _MediaMTX_ instance to decrypt streams before reading them. +At the moment VLC doesn't support reading encrypted RTSP streams. However, you can use a proxy like [stunnel](https://www.stunnel.org) or [nginx](https://nginx.org/) or a local _MediaMTX_ instance to decrypt streams before reading them. #### Web browsers @@ -1156,7 +1156,7 @@ All available recording parameters are listed in the [sample configuration file] Currently the server supports recording tracks encoded with the following codecs: * Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video -* Audio: Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3) +* Audio: Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3 ### Forward streams to another server diff --git a/go.mod b/go.mod index cc84559a..b8b05c59 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,8 @@ require ( github.com/alecthomas/kong v0.8.0 github.com/aler9/writerseeker v1.1.0 github.com/bluenviron/gohlslib v1.0.3 - github.com/bluenviron/gortsplib/v4 v4.1.0 - github.com/bluenviron/mediacommon v1.3.0 + github.com/bluenviron/gortsplib/v4 v4.1.1-0.20230919201539-fc2a10a4999a + github.com/bluenviron/mediacommon v1.3.1-0.20230919191723-607668055ebe github.com/datarhei/gosrt v0.5.4 github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.9.1 diff --git a/go.sum b/go.sum index faec0dc6..1e6a49d5 100644 --- a/go.sum +++ b/go.sum @@ -14,10 +14,10 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/bluenviron/gohlslib v1.0.3 h1:FMHevlIrrZ67uzCXmlTSGflsfYREEtHb8L9BDyf7lJc= github.com/bluenviron/gohlslib v1.0.3/go.mod h1:R/aIsSxLI61N0CVMjtcHqJouK6+Ddd5YIihcCr7IFIw= -github.com/bluenviron/gortsplib/v4 v4.1.0 h1:QR8/QpNpNypmhxHwumtRAOHOogHnpicgt21JkCYC2m4= -github.com/bluenviron/gortsplib/v4 v4.1.0/go.mod h1:Q6WqK0CDVGZLbUZvriXZ++j2k75eO0ZvMWNbEaNLgn0= -github.com/bluenviron/mediacommon v1.3.0 h1:2ttKdlvEXJSzHTd1+7x4TmJDTqEhLAAPP9QfdnYWo8U= -github.com/bluenviron/mediacommon v1.3.0/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM= +github.com/bluenviron/gortsplib/v4 v4.1.1-0.20230919201539-fc2a10a4999a h1:2zaPnmdTRJyfySBbRF8mCQDfOFOL2qQHfQFrLpKeyS0= +github.com/bluenviron/gortsplib/v4 v4.1.1-0.20230919201539-fc2a10a4999a/go.mod h1:0rVtKDafUA14isZuaBTm5+X9NPqLYs/lY8JIww6+doM= +github.com/bluenviron/mediacommon v1.3.1-0.20230919191723-607668055ebe h1:8kvIJfRXvv1Za1hdArKjvd/l8WCHJF+d+oLtANdFbr8= +github.com/bluenviron/mediacommon v1.3.1-0.20230919191723-607668055ebe/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/internal/core/mpegts.go b/internal/core/mpegts.go new file mode 100644 index 00000000..a7cfd0e6 --- /dev/null +++ b/internal/core/mpegts.go @@ -0,0 +1,198 @@ +package core + +import ( + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/unit" +) + +func mpegtsSetupTracks(r *mpegts.Reader, stream **stream.Stream) ([]*description.Media, error) { + var medias []*description.Media //nolint:prealloc + + var td *mpegts.TimeDecoder + decodeTime := func(t int64) time.Duration { + if td == nil { + td = mpegts.NewTimeDecoder(t) + } + return td.Decode(t) + } + + for _, track := range r.Tracks() { //nolint:dupl + var medi *description.Media + + switch codec := track.Codec.(type) { + case *mpegts.CodecH265: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H265{ + PayloadTyp: 96, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.H265{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + AU: au, + }) + return nil + }) + + case *mpegts.CodecH264: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.H264{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + AU: au, + }) + return nil + }) + + case *mpegts.CodecMPEG4Video: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.MPEG4Video{ + PayloadTyp: 96, + }}, + } + + r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG4Video{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frame: frame, + }) + return nil + }) + + case *mpegts.CodecMPEG1Video: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.MPEG1Video{}}, + } + + r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG1Video{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frame: frame, + }) + return nil + }) + + case *mpegts.CodecOpus: + medi = &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.Opus{ + PayloadTyp: 96, + IsStereo: (codec.ChannelCount == 2), + }}, + } + + r.OnDataOpus(track, func(pts int64, packets [][]byte) error { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.Opus{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Packets: packets, + }) + return nil + }) + + case *mpegts.CodecMPEG4Audio: + medi = &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.MPEG4Audio{ + PayloadTyp: 96, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + Config: &codec.Config, + }}, + } + + r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioGeneric{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + AUs: aus, + }) + return nil + }) + + case *mpegts.CodecMPEG1Audio: + medi = &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.MPEG1Audio{}}, + } + + r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG1Audio{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frames: frames, + }) + return nil + }) + + case *mpegts.CodecAC3: + medi = &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.AC3{ + PayloadTyp: 96, + SampleRate: codec.SampleRate, + ChannelCount: codec.ChannelCount, + }}, + } + + r.OnDataAC3(track, func(pts int64, frame []byte) error { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.AC3{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + Frames: [][]byte{frame}, + }) + return nil + }) + + default: + continue + } + + medias = append(medias, medi) + } + + if len(medias) == 0 { + return nil, fmt.Errorf("no supported tracks found") + } + + return medias, nil +} diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index 1ae83f48..2d0dc7f9 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -12,6 +12,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/ac3" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" @@ -262,165 +263,11 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { decodeErrLogger.Log(logger.Warn, err.Error()) }) - var medias []*description.Media //nolint:prealloc var stream *stream.Stream - var td *mpegts.TimeDecoder - decodeTime := func(t int64) time.Duration { - if td == nil { - td = mpegts.NewTimeDecoder(t) - } - return td.Decode(t) - } - - for _, track := range r.Tracks() { //nolint:dupl - var medi *description.Media - - switch tcodec := track.Codec.(type) { - case *mpegts.CodecH265: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H265{ - PayloadTyp: 96, - }}, - } - - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AU: au, - }) - return nil - }) - - case *mpegts.CodecH264: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - }}, - } - - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H264{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AU: au, - }) - return nil - }) - - case *mpegts.CodecMPEG4Video: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.MPEG4Video{ - PayloadTyp: 96, - }}, - } - - r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4Video{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frame: frame, - }) - return nil - }) - - case *mpegts.CodecMPEG1Video: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.MPEG1Video{}}, - } - - r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Video{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frame: frame, - }) - return nil - }) - - case *mpegts.CodecOpus: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.Opus{ - PayloadTyp: 96, - IsStereo: (tcodec.ChannelCount == 2), - }}, - } - - r.OnDataOpus(track, func(pts int64, packets [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Packets: packets, - }) - return nil - }) - - case *mpegts.CodecMPEG4Audio: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.MPEG4Audio{ - PayloadTyp: 96, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, - Config: &tcodec.Config, - }}, - } - - r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioGeneric{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AUs: aus, - }) - return nil - }) - - case *mpegts.CodecMPEG1Audio: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.MPEG1Audio{}}, - } - - r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Audio{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frames: frames, - }) - return nil - }) - - default: - continue - } - - medias = append(medias, medi) - } - - if len(medias) == 0 { - return fmt.Errorf("no supported tracks found") + medias, err := mpegtsSetupTracks(r, &stream) + if err != nil { + return err } rres := path.startPublisher(pathStartPublisherReq{ @@ -702,6 +549,30 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass } return bw.Flush() }) + + case *format.AC3: + track := addTrack(medi, &mpegts.CodecAC3{}) + + sampleRate := time.Duration(forma.SampleRate) + + res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error { + tunit := u.(*unit.AC3) + if tunit.Frames == nil { + return nil + } + + for i, frame := range tunit.Frames { + framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* + time.Second/sampleRate + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) + if err != nil { + return err + } + } + return bw.Flush() + }) } } } diff --git a/internal/core/srt_source.go b/internal/core/srt_source.go index bfff2e3d..bbe90936 100644 --- a/internal/core/srt_source.go +++ b/internal/core/srt_source.go @@ -2,18 +2,15 @@ package core import ( "context" - "fmt" "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/datarhei/gosrt" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" - "github.com/bluenviron/mediamtx/internal/unit" ) type srtSourceParent interface { @@ -97,165 +94,11 @@ func (s *srtSource) runReader(sconn srt.Conn) error { decodeErrLogger.Log(logger.Warn, err.Error()) }) - var medias []*description.Media //nolint:prealloc var stream *stream.Stream - var td *mpegts.TimeDecoder - decodeTime := func(t int64) time.Duration { - if td == nil { - td = mpegts.NewTimeDecoder(t) - } - return td.Decode(t) - } - - for _, track := range r.Tracks() { //nolint:dupl - var medi *description.Media - - switch tcodec := track.Codec.(type) { - case *mpegts.CodecH265: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H265{ - PayloadTyp: 96, - }}, - } - - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AU: au, - }) - return nil - }) - - case *mpegts.CodecH264: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - }}, - } - - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H264{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AU: au, - }) - return nil - }) - - case *mpegts.CodecMPEG4Video: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.MPEG4Video{ - PayloadTyp: 96, - }}, - } - - r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4Video{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frame: frame, - }) - return nil - }) - - case *mpegts.CodecMPEG1Video: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.MPEG1Video{}}, - } - - r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Video{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frame: frame, - }) - return nil - }) - - case *mpegts.CodecMPEG4Audio: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.MPEG4Audio{ - PayloadTyp: 96, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, - Config: &tcodec.Config, - }}, - } - - r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioGeneric{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AUs: aus, - }) - return nil - }) - - case *mpegts.CodecOpus: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.Opus{ - PayloadTyp: 96, - IsStereo: (tcodec.ChannelCount == 2), - }}, - } - - r.OnDataOpus(track, func(pts int64, packets [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Packets: packets, - }) - return nil - }) - - case *mpegts.CodecMPEG1Audio: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.MPEG1Audio{}}, - } - - r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Audio{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frames: frames, - }) - return nil - }) - - default: - continue - } - - medias = append(medias, medi) - } - - if len(medias) == 0 { - return fmt.Errorf("no supported tracks found") + medias, err := mpegtsSetupTracks(r, &stream) + if err != nil { + return err } res := s.parent.setReady(pathSourceStaticSetReadyReq{ diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index 6f56ae8f..b0dca1ff 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -7,14 +7,12 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/multicast" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" - "github.com/bluenviron/mediamtx/internal/unit" ) const ( @@ -129,165 +127,11 @@ func (s *udpSource) runReader(pc net.PacketConn) error { decodeErrLogger.Log(logger.Warn, err.Error()) }) - var medias []*description.Media //nolint:prealloc var stream *stream.Stream - var td *mpegts.TimeDecoder - decodeTime := func(t int64) time.Duration { - if td == nil { - td = mpegts.NewTimeDecoder(t) - } - return td.Decode(t) - } - - for _, track := range r.Tracks() { //nolint:dupl - var medi *description.Media - - switch tcodec := track.Codec.(type) { - case *mpegts.CodecH265: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H265{ - PayloadTyp: 96, - }}, - } - - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AU: au, - }) - return nil - }) - - case *mpegts.CodecH264: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - }}, - } - - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H264{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AU: au, - }) - return nil - }) - - case *mpegts.CodecMPEG4Video: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.MPEG4Video{ - PayloadTyp: 96, - }}, - } - - r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4Video{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frame: frame, - }) - return nil - }) - - case *mpegts.CodecMPEG1Video: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.MPEG1Video{}}, - } - - r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Video{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frame: frame, - }) - return nil - }) - - case *mpegts.CodecOpus: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.Opus{ - PayloadTyp: 96, - IsStereo: (tcodec.ChannelCount == 2), - }}, - } - - r.OnDataOpus(track, func(pts int64, packets [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Packets: packets, - }) - return nil - }) - - case *mpegts.CodecMPEG4Audio: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.MPEG4Audio{ - PayloadTyp: 96, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, - Config: &tcodec.Config, - }}, - } - - r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioGeneric{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - AUs: aus, - }) - return nil - }) - - case *mpegts.CodecMPEG1Audio: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.MPEG1Audio{}}, - } - - r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG1Audio{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Frames: frames, - }) - return nil - }) - - default: - continue - } - - medias = append(medias, medi) - } - - if len(medias) == 0 { - return fmt.Errorf("no supported tracks found") + medias, err := mpegtsSetupTracks(r, &stream) + if err != nil { + return err } res := s.parent.setReady(pathSourceStaticSetReadyReq{ diff --git a/internal/formatprocessor/ac3.go b/internal/formatprocessor/ac3.go new file mode 100644 index 00000000..6f546f11 --- /dev/null +++ b/internal/formatprocessor/ac3.go @@ -0,0 +1,112 @@ +package formatprocessor + +import ( + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpac3" + "github.com/pion/rtp" + + "github.com/bluenviron/mediamtx/internal/unit" +) + +type formatProcessorAC3 struct { + udpMaxPayloadSize int + format *format.AC3 + encoder *rtpac3.Encoder + decoder *rtpac3.Decoder +} + +func newAC3( + udpMaxPayloadSize int, + forma *format.AC3, + generateRTPPackets bool, +) (*formatProcessorAC3, error) { + t := &formatProcessorAC3{ + udpMaxPayloadSize: udpMaxPayloadSize, + format: forma, + } + + if generateRTPPackets { + err := t.createEncoder() + if err != nil { + return nil, err + } + } + + return t, nil +} + +func (t *formatProcessorAC3) createEncoder() error { + t.encoder = &rtpac3.Encoder{ + PayloadType: t.format.PayloadTyp, + } + return t.encoder.Init() +} + +func (t *formatProcessorAC3) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.AC3) + + pkts, err := t.encoder.Encode(u.Frames) + if err != nil { + return err + } + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp = ts + } + + u.RTPPackets = pkts + + return nil +} + +func (t *formatProcessorAC3) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.AC3{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + frames, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpac3.ErrNonStartingPacketAndNoPrevious || err == rtpac3.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.Frames = frames + } + + // route packet as is + return u, nil +} diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 89fe634b..b888b43d 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -72,6 +72,9 @@ func New( case *format.MPEG1Audio: return newMPEG1Audio(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.AC3: + return newAC3(udpMaxPayloadSize, forma, generateRTPPackets) + default: return newGeneric(udpMaxPayloadSize, forma, generateRTPPackets) } diff --git a/internal/record/agent.go b/internal/record/agent.go index a0770e59..19658e58 100644 --- a/internal/record/agent.go +++ b/internal/record/agent.go @@ -9,6 +9,7 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/ac3" "github.com/bluenviron/mediacommon/pkg/codecs/av1" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" @@ -629,6 +630,8 @@ func NewAgent( } track := addTrack(codec) + parsed := false + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG1Audio) if tunit.Frames == nil { @@ -644,12 +647,10 @@ func NewAgent( return err } - if codec.SampleRate != h.SampleRate { + if !parsed { + parsed = true codec.SampleRate = h.SampleRate - r.updateCodecs() - } - if c := mpeg1audioChannelCount(h.ChannelMode); codec.ChannelCount != c { - codec.ChannelCount = c + codec.ChannelCount = mpeg1audioChannelCount(h.ChannelMode) r.updateCodecs() } @@ -670,6 +671,72 @@ func NewAgent( return nil }) + case *format.AC3: + codec := &fmp4.CodecAC3{ + SampleRate: forma.SampleRate, + ChannelCount: forma.ChannelCount, + Fscod: 0, + Bsid: 8, + Bsmod: 0, + Acmod: 7, + LfeOn: true, + BitRateCode: 7, + } + track := addTrack(codec) + + parsed := false + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.AC3) + if tunit.Frames == nil { + return nil + } + + pts := tunit.PTS + + for _, frame := range tunit.Frames { + var syncInfo ac3.SyncInfo + err := syncInfo.Unmarshal(frame) + if err != nil { + return fmt.Errorf("invalid AC-3 frame: %s", err) + } + + var bsi ac3.BSI + err = bsi.Unmarshal(frame[5:]) + if err != nil { + return fmt.Errorf("invalid AC-3 frame: %s", err) + } + + if !parsed { + parsed = true + codec.SampleRate = syncInfo.SampleRate() + codec.ChannelCount = bsi.ChannelCount() + codec.Fscod = syncInfo.Fscod + codec.Bsid = bsi.Bsid + codec.Bsmod = bsi.Bsmod + codec.Acmod = bsi.Acmod + codec.LfeOn = bsi.LfeOn + codec.BitRateCode = syncInfo.Frmsizecod >> 1 + r.updateCodecs() + } + + err = track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: frame, + }, + dts: pts, + }) + if err != nil { + return err + } + + pts += time.Duration(ac3.SamplesPerFrame) * + time.Second / time.Duration(codec.SampleRate) + } + + return nil + }) + case *format.G722: // TODO diff --git a/internal/unit/ac3.go b/internal/unit/ac3.go new file mode 100644 index 00000000..da6b94c6 --- /dev/null +++ b/internal/unit/ac3.go @@ -0,0 +1,7 @@ +package unit + +// AC3 is a AC-3 data unit. +type AC3 struct { + Base + Frames [][]byte +}