From 8bb71ac8d8ecec62d3ca4dac410dca777a2eab21 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sat, 5 Aug 2023 14:47:20 +0200 Subject: [PATCH] srt, udp: support reading and writing MPEG-1 audio streams (#2147) --- README.md | 8 +- go.mod | 8 +- go.sum | 16 +- internal/core/api_test.go | 3 - internal/core/hls_source.go | 4 +- internal/core/hls_source_test.go | 2 - internal/core/rtmp_conn.go | 22 +-- internal/core/rtmp_source.go | 6 +- internal/core/srt_conn.go | 137 +++++++++++------- internal/core/srt_server_test.go | 1 - internal/core/srt_source.go | 29 +++- internal/core/srt_source_test.go | 1 - internal/core/udp_source.go | 28 +++- .../{mpeg2audio.go => mpeg1audio.go} | 36 ++--- internal/formatprocessor/processor.go | 4 +- internal/rtmp/message/audio.go | 10 +- internal/rtmp/message/reader_test.go | 4 +- internal/rtmp/reader.go | 14 +- internal/rtmp/writer.go | 22 +-- 19 files changed, 218 insertions(+), 137 deletions(-) rename internal/formatprocessor/{mpeg2audio.go => mpeg1audio.go} (66%) diff --git a/README.md b/README.md index ec9bdf4e..68f154bb 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,8 @@ Live streams can be published to the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| -|[SRT clients](#srt-clients)||H265, H264|Opus, MPEG-4 Audio (AAC)| -|[SRT servers](#srt-servers)||H265, H264|Opus, MPEG-4 Audio (AAC)| +|[SRT clients](#srt-clients)||H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| +|[SRT servers](#srt-servers)||H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711| |[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| |[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| @@ -29,14 +29,14 @@ Live streams can be published to the server with: |[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)| -|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264|Opus, MPEG-4 Audio (AAC)| +|[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[Raspberry Pi Cameras](#raspberry-pi-cameras)||H264|| And can be read from the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| -|[SRT](#srt)||H265, H264|Opus, MPEG-4 Audio (AAC)| +|[SRT](#srt)||H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[WebRTC](#webrtc)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| |[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| |[RTMP](#rtmp)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| diff --git a/go.mod b/go.mod index a4674072..1788c90a 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( code.cloudfoundry.org/bytefmt v0.0.0 github.com/abema/go-mp4 v0.11.0 github.com/alecthomas/kong v0.8.0 - github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072 - github.com/bluenviron/gortsplib/v3 v3.9.1-0.20230801115353-f296099f26d5 - github.com/bluenviron/mediacommon v0.7.1-0.20230730144331-10b74a4f6eda + github.com/bluenviron/gohlslib v0.3.1-0.20230805121442-9577da1cb068 + github.com/bluenviron/gortsplib/v3 v3.9.1-0.20230805122812-17acec3f0dea + github.com/bluenviron/mediacommon v0.7.1-0.20230805114828-bee33f3b286d github.com/datarhei/gosrt v0.5.3 github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.9.1 @@ -33,7 +33,7 @@ require ( require ( github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 // indirect github.com/asticode/go-astikit v0.30.0 // indirect - github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87 // indirect + github.com/asticode/go-astits v1.12.0 // indirect github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c // indirect github.com/bytedance/sonic v1.9.1 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect diff --git a/go.sum b/go.sum index 8f41cf2c..e675e86a 100644 --- a/go.sum +++ b/go.sum @@ -10,16 +10,16 @@ github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9T github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U= github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= -github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87 h1:SCAqalLhgKGDghGz03yYVWr8TavHluP/i7IwshKU9yA= -github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= +github.com/asticode/go-astits v1.12.0 h1:BiefTgVEyPgEB8nT6J+Sys/uxE4H/a04SW/aedpOpPc= +github.com/asticode/go-astits v1.12.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= -github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072 h1:pAbC7frXsTMxP7Ck3E50hl7oFeSeD2dgc2lWjmHXztQ= -github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072/go.mod h1:rK4b161qErs82QqvBEl84vpi2xhdZBUT0yubXuytZ7E= -github.com/bluenviron/gortsplib/v3 v3.9.1-0.20230801115353-f296099f26d5 h1:d50XiZLfcoTWfPU/FWSk0hN8qWrqv6hqp02vBZQFnJk= -github.com/bluenviron/gortsplib/v3 v3.9.1-0.20230801115353-f296099f26d5/go.mod h1:reIJG1PKHiUwNa8NZQAyUmprtR8jeGZEnzU958rDgH4= -github.com/bluenviron/mediacommon v0.7.1-0.20230730144331-10b74a4f6eda h1:+ungCWRNDjsy/CVL1l/UjAj4vYL4+NIJQoJJWbR3Xw8= -github.com/bluenviron/mediacommon v0.7.1-0.20230730144331-10b74a4f6eda/go.mod h1:tfk0qGPhqnOxVCrElu8ct3LKQn6Cj4Tpu3zbbJBTKj4= +github.com/bluenviron/gohlslib v0.3.1-0.20230805121442-9577da1cb068 h1:mNSdk14esA9ggCP0+gDmcG9wLvb7QzHLta071nTLYe4= +github.com/bluenviron/gohlslib v0.3.1-0.20230805121442-9577da1cb068/go.mod h1:DouD/43mEaXgwbeQZdtVhzRnA7RuKFYeaVPz6tj/Rfk= +github.com/bluenviron/gortsplib/v3 v3.9.1-0.20230805122812-17acec3f0dea h1:AK6GuUoXDaGfk3HDe6qMJ/Swmj1hIxVtMuw98sa2VO8= +github.com/bluenviron/gortsplib/v3 v3.9.1-0.20230805122812-17acec3f0dea/go.mod h1:9GOyv9dKyDjMHXcrh7rytIoXWvRk+/DC0Mjq3vqEyCU= +github.com/bluenviron/mediacommon v0.7.1-0.20230805114828-bee33f3b286d h1:ye0ze/XtVq23NwgxV/PWcjtagcZXTKZq+V6ZBT0HoTY= +github.com/bluenviron/mediacommon v0.7.1-0.20230805114828-bee33f3b286d/go.mod h1:8Y0rvMJDUCgqDegYrUxG1rbumB6DV0TZ98Rw9mowIrA= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/internal/core/api_test.go b/internal/core/api_test.go index a3d03e66..8f3e76db 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -677,7 +677,6 @@ func TestAPIProtocolList(t *testing.T) { defer conn.Close() track := &mpegts.Track{ - PID: 256, Codec: &mpegts.CodecH264{}, } @@ -958,7 +957,6 @@ func TestAPIProtocolGet(t *testing.T) { defer conn.Close() track := &mpegts.Track{ - PID: 256, Codec: &mpegts.CodecH264{}, } @@ -1230,7 +1228,6 @@ func TestAPIProtocolKick(t *testing.T) { defer conn.Close() track := &mpegts.Track{ - PID: 256, Codec: &mpegts.CodecH264{}, } diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 020d3da1..e045b7df 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -130,7 +130,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan }}, } - c.OnDataMPEG4Audio(track, func(pts time.Duration, dts time.Duration, aus [][]byte) { + c.OnDataMPEG4Audio(track, func(pts time.Duration, aus [][]byte) { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), @@ -149,7 +149,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan }}, } - c.OnDataOpus(track, func(pts time.Duration, dts time.Duration, packets [][]byte) { + c.OnDataOpus(track, func(pts time.Duration, packets [][]byte) { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), diff --git a/internal/core/hls_source_test.go b/internal/core/hls_source_test.go index 6ca98500..9e6d8877 100644 --- a/internal/core/hls_source_test.go +++ b/internal/core/hls_source_test.go @@ -20,12 +20,10 @@ import ( ) var track1 = &mpegts.Track{ - PID: 256, Codec: &mpegts.CodecH264{}, } var track2 = &mpegts.Track{ - PID: 257, Codec: &mpegts.CodecMPEG4Audio{ Config: mpeg4audio.Config{ Type: 2, diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index df0caaf5..d8061748 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -14,7 +14,7 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/gortsplib/v3/pkg/ringbuffer" "github.com/bluenviron/mediacommon/pkg/codecs/h264" - "github.com/bluenviron/mediacommon/pkg/codecs/mpeg2audio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/google/uuid" @@ -512,16 +512,16 @@ func (c *rtmpConn) setupAudio( return audioMedia, audioFormatMPEG4AudioLATM } - var audioFormatMPEG2 *formats.MPEG2Audio - audioMedia = stream.Medias().FindFormat(&audioFormatMPEG2) + var audioFormatMPEG1 *formats.MPEG1Audio + audioMedia = stream.Medias().FindFormat(&audioFormatMPEG1) if audioMedia != nil { startPTSFilled := false var startPTS time.Duration - stream.AddReader(c, audioMedia, audioFormatMPEG2, func(unit formatprocessor.Unit) { + stream.AddReader(c, audioMedia, audioFormatMPEG1, func(unit formatprocessor.Unit) { ringBuffer.Push(func() error { - tunit := unit.(*formatprocessor.UnitMPEG2Audio) + tunit := unit.(*formatprocessor.UnitMPEG1Audio) if !startPTSFilled { startPTSFilled = true @@ -541,7 +541,7 @@ func (c *rtmpConn) setupAudio( } for _, frame := range tunit.Frames { - var h mpeg2audio.FrameHeader + var h mpeg1audio.FrameHeader err := h.Unmarshal(frame) if err != nil { return err @@ -552,7 +552,7 @@ func (c *rtmpConn) setupAudio( } c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = (*w).WriteMPEG2Audio(pts, &h, frame) + err = (*w).WriteMPEG1Audio(pts, &h, frame) if err != nil { return err } @@ -565,7 +565,7 @@ func (c *rtmpConn) setupAudio( }) }) - return audioMedia, audioFormatMPEG2 + return audioMedia, audioFormatMPEG1 } return nil, nil @@ -674,9 +674,9 @@ func (c *rtmpConn) runPublish(conn *rtmp.Conn, u *url.URL) error { }) }) - case *formats.MPEG2Audio: - r.OnDataMPEG2Audio(func(pts time.Duration, frame []byte) { - stream.WriteUnit(audioMedia, audioFormat, &formatprocessor.UnitMPEG2Audio{ + case *formats.MPEG1Audio: + r.OnDataMPEG1Audio(func(pts time.Duration, frame []byte) { + stream.WriteUnit(audioMedia, audioFormat, &formatprocessor.UnitMPEG1Audio{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), }, diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index ba6283fa..fd997db8 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -160,9 +160,9 @@ func (s *rtmpSource) runReader(u *url.URL, nconn net.Conn) error { }) }) - case *formats.MPEG2Audio: - mc.OnDataMPEG2Audio(func(pts time.Duration, frame []byte) { - stream.WriteUnit(audioMedia, audioFormat, &formatprocessor.UnitMPEG2Audio{ + case *formats.MPEG1Audio: + mc.OnDataMPEG1Audio(func(pts time.Duration, frame []byte) { + stream.WriteUnit(audioMedia, audioFormat, &formatprocessor.UnitMPEG1Audio{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), }, diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index 8dea51cc..1cbab161 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -308,7 +308,7 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { }}, } - r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error { + r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), @@ -328,7 +328,7 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { }}, } - r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error { + r.OnDataOpus(track, func(pts int64, packets [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), @@ -338,11 +338,35 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { }) return nil }) + + case *mpegts.CodecMPEG1Audio: + medi = &media.Media{ + Type: media.TypeAudio, + Formats: []formats.Format{&formats.MPEG1Audio{}}, + } + + r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG1Audio{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + Frames: frames, + }) + return nil + }) + + default: + continue } medias = append(medias, medi) } + if len(medias) == 0 { + return fmt.Errorf("no supported tracks found") + } + rres := path.startPublisher(pathStartPublisherReq{ author: c, medias: medias, @@ -410,7 +434,6 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass }() var w *mpegts.Writer - nextPID := uint16(256) var tracks []*mpegts.Track var medias media.Medias bw := bufio.NewWriterSize(sconn, srtMaxPayloadSize(c.udpMaxPayloadSize)) @@ -419,17 +442,20 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass leadingTrackInitialized := false var leadingTrackStartDTS time.Duration + addTrack := func(medi *media.Media, codec mpegts.Codec) *mpegts.Track { + track := &mpegts.Track{ + Codec: codec, + } + tracks = append(tracks, track) + medias = append(medias, medi) + return track + } + for _, medi := range res.stream.Medias() { for _, format := range medi.Formats { switch format := format.(type) { case *formats.H265: - track := &mpegts.Track{ - PID: nextPID, - Codec: &mpegts.CodecH265{}, - } - tracks = append(tracks, track) - medias = append(medias, medi) - nextPID++ + track := addTrack(medi, &mpegts.CodecH265{}) var startPTS time.Duration startPTSFilled := false @@ -493,13 +519,7 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass }) case *formats.H264: - track := &mpegts.Track{ - PID: nextPID, - Codec: &mpegts.CodecH264{}, - } - tracks = append(tracks, track) - medias = append(medias, medi) - nextPID++ + track := addTrack(medi, &mpegts.CodecH264{}) var startPTS time.Duration startPTSFilled := false @@ -563,15 +583,9 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass }) case *formats.MPEG4AudioGeneric: - track := &mpegts.Track{ - PID: nextPID, - Codec: &mpegts.CodecMPEG4Audio{ - Config: *format.Config, - }, - } - tracks = append(tracks, track) - medias = append(medias, medi) - nextPID++ + track := addTrack(medi, &mpegts.CodecMPEG4Audio{ + Config: *format.Config, + }) var startPTS time.Duration startPTSFilled := false @@ -609,15 +623,9 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass if format.Config != nil && len(format.Config.Programs) == 1 && len(format.Config.Programs[0].Layers) == 1 { - track := &mpegts.Track{ - PID: nextPID, - Codec: &mpegts.CodecMPEG4Audio{ - Config: *format.Config.Programs[0].Layers[0].AudioSpecificConfig, - }, - } - tracks = append(tracks, track) - medias = append(medias, medi) - nextPID++ + track := addTrack(medi, &mpegts.CodecMPEG4Audio{ + Config: *format.Config.Programs[0].Layers[0].AudioSpecificConfig, + }) var startPTS time.Duration startPTSFilled := false @@ -653,20 +661,14 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass } case *formats.Opus: - track := &mpegts.Track{ - PID: nextPID, - Codec: &mpegts.CodecOpus{ - ChannelCount: func() int { - if format.IsStereo { - return 2 - } - return 1 - }(), - }, - } - tracks = append(tracks, track) - medias = append(medias, medi) - nextPID++ + track := addTrack(medi, &mpegts.CodecOpus{ + ChannelCount: func() int { + if format.IsStereo { + return 2 + } + return 1 + }(), + }) var startPTS time.Duration startPTSFilled := false @@ -699,6 +701,43 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass return bw.Flush() }) }) + + case *formats.MPEG1Audio: + // TODO: check whether's a MPEG-1 or MPEG-2 stream by parsing header + // of first frame + track := addTrack(medi, &mpegts.CodecMPEG1Audio{}) + + var startPTS time.Duration + startPTSFilled := false + + res.stream.AddReader(c, medi, format, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitMPEG1Audio) + if tunit.Frames == nil { + return nil + } + + if !startPTSFilled { + startPTS = tunit.PTS + startPTSFilled = true + } + + if leadingTrackChosen && !leadingTrackInitialized { + return nil + } + + pts := tunit.PTS + pts -= startPTS + pts -= leadingTrackStartDTS + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteMPEG1Audio(track, durationGoToMPEGTS(pts), tunit.Frames) + if err != nil { + return err + } + return bw.Flush() + }) + }) } } } diff --git a/internal/core/srt_server_test.go b/internal/core/srt_server_test.go index b3e79063..0d0a5e9d 100644 --- a/internal/core/srt_server_test.go +++ b/internal/core/srt_server_test.go @@ -28,7 +28,6 @@ func TestSRTServer(t *testing.T) { defer publisher.Close() track := &mpegts.Track{ - PID: 256, Codec: &mpegts.CodecH264{}, } diff --git a/internal/core/srt_source.go b/internal/core/srt_source.go index c926b55b..4391bf2c 100644 --- a/internal/core/srt_source.go +++ b/internal/core/srt_source.go @@ -2,6 +2,7 @@ package core import ( "context" + "fmt" "time" "github.com/bluenviron/gortsplib/v3/pkg/formats" @@ -156,7 +157,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error { }}, } - r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error { + r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), @@ -176,7 +177,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error { }}, } - r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error { + r.OnDataOpus(track, func(pts int64, packets [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), @@ -186,11 +187,35 @@ func (s *srtSource) runReader(sconn srt.Conn) error { }) return nil }) + + case *mpegts.CodecMPEG1Audio: + medi = &media.Media{ + Type: media.TypeAudio, + Formats: []formats.Format{&formats.MPEG1Audio{}}, + } + + r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG1Audio{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + Frames: frames, + }) + return nil + }) + + default: + continue } medias = append(medias, medi) } + if len(medias) == 0 { + return fmt.Errorf("no supported tracks found") + } + res := s.parent.setReady(pathSourceStaticSetReadyReq{ medias: medias, generateRTPPackets: true, diff --git a/internal/core/srt_source_test.go b/internal/core/srt_source_test.go index ce48cfb8..d912e01b 100644 --- a/internal/core/srt_source_test.go +++ b/internal/core/srt_source_test.go @@ -38,7 +38,6 @@ func TestSRTSource(t *testing.T) { defer conn.Close() track := &mpegts.Track{ - PID: 256, Codec: &mpegts.CodecH264{}, } diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index 45bdfdce..9dc211ca 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -204,7 +204,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { }}, } - r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error { + r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), @@ -224,7 +224,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { }}, } - r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error { + r.OnDataOpus(track, func(pts int64, packets [][]byte) error { stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ BaseUnit: formatprocessor.BaseUnit{ NTP: time.Now(), @@ -234,11 +234,35 @@ func (s *udpSource) runReader(pc net.PacketConn) error { }) return nil }) + + case *mpegts.CodecMPEG1Audio: + medi = &media.Media{ + Type: media.TypeAudio, + Formats: []formats.Format{&formats.MPEG1Audio{}}, + } + + r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG1Audio{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + Frames: frames, + }) + return nil + }) + + default: + continue } medias = append(medias, medi) } + if len(medias) == 0 { + return fmt.Errorf("no supported tracks found") + } + res := s.parent.setReady(pathSourceStaticSetReadyReq{ medias: medias, generateRTPPackets: true, diff --git a/internal/formatprocessor/mpeg2audio.go b/internal/formatprocessor/mpeg1audio.go similarity index 66% rename from internal/formatprocessor/mpeg2audio.go rename to internal/formatprocessor/mpeg1audio.go index 0aaa5e98..09d51afd 100644 --- a/internal/formatprocessor/mpeg2audio.go +++ b/internal/formatprocessor/mpeg1audio.go @@ -5,33 +5,33 @@ import ( "time" "github.com/bluenviron/gortsplib/v3/pkg/formats" - "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpmpeg2audio" + "github.com/bluenviron/gortsplib/v3/pkg/formats/rtpmpeg1audio" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/logger" ) -// UnitMPEG2Audio is a MPEG-1/2 Audio data unit. -type UnitMPEG2Audio struct { +// UnitMPEG1Audio is a MPEG-1/2 Audio data unit. +type UnitMPEG1Audio struct { BaseUnit PTS time.Duration Frames [][]byte } -type formatProcessorMPEG2Audio struct { +type formatProcessorMPEG1Audio struct { udpMaxPayloadSize int - format *formats.MPEG2Audio - encoder *rtpmpeg2audio.Encoder - decoder *rtpmpeg2audio.Decoder + format *formats.MPEG1Audio + encoder *rtpmpeg1audio.Encoder + decoder *rtpmpeg1audio.Decoder } -func newMPEG2Audio( +func newMPEG1Audio( udpMaxPayloadSize int, - forma *formats.MPEG2Audio, + forma *formats.MPEG1Audio, generateRTPPackets bool, _ logger.Writer, -) (*formatProcessorMPEG2Audio, error) { - t := &formatProcessorMPEG2Audio{ +) (*formatProcessorMPEG1Audio, error) { + t := &formatProcessorMPEG1Audio{ udpMaxPayloadSize: udpMaxPayloadSize, format: forma, } @@ -46,15 +46,15 @@ func newMPEG2Audio( return t, nil } -func (t *formatProcessorMPEG2Audio) createEncoder() error { - t.encoder = &rtpmpeg2audio.Encoder{ +func (t *formatProcessorMPEG1Audio) createEncoder() error { + t.encoder = &rtpmpeg1audio.Encoder{ PayloadMaxSize: t.udpMaxPayloadSize - 12, } return t.encoder.Init() } -func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl - tunit := unit.(*UnitMPEG2Audio) +func (t *formatProcessorMPEG1Audio) Process(unit Unit, hasNonRTSPReaders bool) error { //nolint:dupl + tunit := unit.(*UnitMPEG1Audio) if tunit.RTPPackets != nil { pkt := tunit.RTPPackets[0] @@ -80,7 +80,7 @@ func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) e frames, pts, err := t.decoder.Decode(pkt) if err != nil { - if err == rtpmpeg2audio.ErrNonStartingPacketAndNoPrevious || err == rtpmpeg2audio.ErrMorePacketsNeeded { + if err == rtpmpeg1audio.ErrNonStartingPacketAndNoPrevious || err == rtpmpeg1audio.ErrMorePacketsNeeded { return nil } return err @@ -104,8 +104,8 @@ func (t *formatProcessorMPEG2Audio) Process(unit Unit, hasNonRTSPReaders bool) e return nil } -func (t *formatProcessorMPEG2Audio) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { - return &UnitMPEG2Audio{ +func (t *formatProcessorMPEG1Audio) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time) Unit { + return &UnitMPEG1Audio{ BaseUnit: BaseUnit{ RTPPackets: []*rtp.Packet{pkt}, NTP: ntp, diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 34f04bd5..d83c0587 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -46,8 +46,8 @@ func New( case *formats.AV1: return newAV1(udpMaxPayloadSize, forma, generateRTPPackets, log) - case *formats.MPEG2Audio: - return newMPEG2Audio(udpMaxPayloadSize, forma, generateRTPPackets, log) + case *formats.MPEG1Audio: + return newMPEG1Audio(udpMaxPayloadSize, forma, generateRTPPackets, log) case *formats.MPEG4AudioGeneric: return newMPEG4AudioGeneric(udpMaxPayloadSize, forma, generateRTPPackets, log) diff --git a/internal/rtmp/message/audio.go b/internal/rtmp/message/audio.go index 4db0c477..6d611403 100644 --- a/internal/rtmp/message/audio.go +++ b/internal/rtmp/message/audio.go @@ -14,7 +14,7 @@ const ( // supported audio codecs const ( - CodecMPEG2Audio = 2 + CodecMPEG1Audio = 2 CodecMPEG4Audio = 10 ) @@ -52,7 +52,7 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error { m.Codec = raw.Body[0] >> 4 switch m.Codec { - case CodecMPEG2Audio, CodecMPEG4Audio: + case CodecMPEG1Audio, CodecMPEG4Audio: default: return fmt.Errorf("unsupported audio codec: %d", m.Codec) } @@ -61,7 +61,7 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error { m.Depth = (raw.Body[0] >> 1) & 0x01 m.Channels = raw.Body[0] & 0x01 - if m.Codec == CodecMPEG2Audio { + if m.Codec == CodecMPEG1Audio { m.Payload = raw.Body[1:] } else { m.AACType = AudioAACType(raw.Body[1]) @@ -80,7 +80,7 @@ func (m *Audio) Unmarshal(raw *rawmessage.Message) error { // Marshal implements Message. func (m Audio) Marshal() (*rawmessage.Message, error) { var l int - if m.Codec == CodecMPEG2Audio { + if m.Codec == CodecMPEG1Audio { l = 1 + len(m.Payload) } else { l = 2 + len(m.Payload) @@ -89,7 +89,7 @@ func (m Audio) Marshal() (*rawmessage.Message, error) { body[0] = m.Codec<<4 | m.Rate<<2 | m.Depth<<1 | m.Channels - if m.Codec == CodecMPEG2Audio { + if m.Codec == CodecMPEG1Audio { copy(body[1:], m.Payload) } else { body[1] = uint8(m.AACType) diff --git a/internal/rtmp/message/reader_test.go b/internal/rtmp/message/reader_test.go index 85e2ddfb..b0e5aaa6 100644 --- a/internal/rtmp/message/reader_test.go +++ b/internal/rtmp/message/reader_test.go @@ -27,12 +27,12 @@ var readWriterCases = []struct { }, }, { - "audio mpeg2", + "audio mpeg1", &Audio{ ChunkStreamID: 7, DTS: 6013806 * time.Millisecond, MessageStreamID: 4534543, - Codec: CodecMPEG2Audio, + Codec: CodecMPEG1Audio, Rate: flvio.SOUND_44Khz, Depth: flvio.SOUND_16BIT, Channels: flvio.SOUND_STEREO, diff --git a/internal/rtmp/reader.go b/internal/rtmp/reader.go index 00af2491..5138278a 100644 --- a/internal/rtmp/reader.go +++ b/internal/rtmp/reader.go @@ -27,8 +27,8 @@ type OnDataH26xFunc func(pts time.Duration, au [][]byte) // OnDataMPEG4AudioFunc is the prototype of the callback passed to OnDataMPEG4Audio(). type OnDataMPEG4AudioFunc func(pts time.Duration, au []byte) -// OnDataMPEG2AudioFunc is the prototype of the callback passed to OnDataMPEG2Audio(). -type OnDataMPEG2AudioFunc func(pts time.Duration, frame []byte) +// OnDataMPEG1AudioFunc is the prototype of the callback passed to OnDataMPEG1Audio(). +type OnDataMPEG1AudioFunc func(pts time.Duration, frame []byte) func h265FindNALU(array []gomp4.HEVCNaluArray, typ h265.NALUType) []byte { for _, entry := range array { @@ -126,8 +126,8 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (formats.Format, form case 0: return false, nil - case message.CodecMPEG2Audio: - audioTrack = &formats.MPEG2Audio{} + case message.CodecMPEG1Audio: + audioTrack = &formats.MPEG1Audio{} return true, nil case message.CodecMPEG4Audio: @@ -340,7 +340,7 @@ outer: } if videoTrack == nil && audioTrack == nil { - return nil, nil, fmt.Errorf("no tracks found") + return nil, nil, fmt.Errorf("no supported tracks found") } return videoTrack, audioTrack, nil @@ -524,8 +524,8 @@ func (r *Reader) OnDataMPEG4Audio(cb OnDataMPEG4AudioFunc) { } } -// OnDataMPEG2Audio sets a callback that is called when MPEG-2 Audio data is received. -func (r *Reader) OnDataMPEG2Audio(cb OnDataMPEG2AudioFunc) { +// OnDataMPEG1Audio sets a callback that is called when MPEG-1 Audio data is received. +func (r *Reader) OnDataMPEG1Audio(cb OnDataMPEG1AudioFunc) { r.onDataAudio = func(msg *message.Audio) error { cb(msg.DTS, msg.Payload) return nil diff --git a/internal/rtmp/writer.go b/internal/rtmp/writer.go index 31841dc3..3d030f83 100644 --- a/internal/rtmp/writer.go +++ b/internal/rtmp/writer.go @@ -5,7 +5,7 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/mediacommon/pkg/codecs/h264" - "github.com/bluenviron/mediacommon/pkg/codecs/mpeg2audio" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/notedit/rtmp/format/flv/flvio" @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/mediamtx/internal/rtmp/message" ) -func mpeg2AudioRate(sr int) uint8 { +func mpeg1AudioRate(sr int) uint8 { switch sr { case 5500: return flvio.SOUND_5_5Khz @@ -26,8 +26,8 @@ func mpeg2AudioRate(sr int) uint8 { } } -func mpeg2AudioChannels(m mpeg2audio.ChannelMode) uint8 { - if m == mpeg2audio.ChannelModeMono { +func mpeg1AudioChannels(m mpeg1audio.ChannelMode) uint8 { + if m == mpeg1audio.ChannelModeMono { return flvio.SOUND_MONO } return flvio.SOUND_STEREO @@ -84,8 +84,8 @@ func (w *Writer) writeTracks(videoTrack formats.Format, audioTrack formats.Forma K: "audiocodecid", V: func() float64 { switch audioTrack.(type) { - case *formats.MPEG2Audio: - return message.CodecMPEG2Audio + case *formats.MPEG1Audio: + return message.CodecMPEG1Audio case *formats.MPEG4AudioGeneric, *formats.MPEG4AudioLATM: return message.CodecMPEG4Audio @@ -193,15 +193,15 @@ func (w *Writer) WriteMPEG4Audio(pts time.Duration, au []byte) error { }) } -// WriteMPEG2Audio writes MPEG-2 Audio data. -func (w *Writer) WriteMPEG2Audio(pts time.Duration, h *mpeg2audio.FrameHeader, frame []byte) error { +// WriteMPEG1Audio writes MPEG-1 Audio data. +func (w *Writer) WriteMPEG1Audio(pts time.Duration, h *mpeg1audio.FrameHeader, frame []byte) error { return w.conn.Write(&message.Audio{ ChunkStreamID: message.AudioChunkStreamID, MessageStreamID: 0x1000000, - Codec: message.CodecMPEG2Audio, - Rate: mpeg2AudioRate(h.SampleRate), + Codec: message.CodecMPEG1Audio, + Rate: mpeg1AudioRate(h.SampleRate), Depth: flvio.SOUND_16BIT, - Channels: mpeg2AudioChannels(h.ChannelMode), + Channels: mpeg1AudioChannels(h.ChannelMode), Payload: frame, DTS: pts, })