From 4bafa4ea9b565f810f070d45439c6a4ca85fc595 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 13 Dec 2022 21:26:35 +0100 Subject: [PATCH] add dedicated processors for H265 and Opus --- go.mod | 2 +- go.sum | 4 +- internal/core/data.go | 43 ------- internal/core/formatprocessor.go | 6 + internal/core/formatprocessor_generic.go | 15 +++ internal/core/formatprocessor_h264.go | 17 +++ internal/core/formatprocessor_h265.go | 121 ++++++++++++++++++++ internal/core/formatprocessor_mpeg4audio.go | 17 +++ internal/core/formatprocessor_opus.go | 93 +++++++++++++++ internal/core/rtsp_session.go | 22 ++++ internal/core/rtsp_source.go | 22 ++++ 11 files changed, 316 insertions(+), 46 deletions(-) create mode 100644 internal/core/formatprocessor_h265.go create mode 100644 internal/core/formatprocessor_opus.go diff --git a/go.mod b/go.mod index 637fa017..ad938ceb 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( code.cloudfoundry.org/bytefmt v0.0.0-20211005130812-5bb3c17173e5 github.com/abema/go-mp4 v0.8.0 - github.com/aler9/gortsplib/v2 v2.0.0-20221213180201-60596c32d1bf + github.com/aler9/gortsplib/v2 v2.0.0-20221213201904-04d1de717768 github.com/asticode/go-astits v1.10.1-0.20220319093903-4abe66a9b757 github.com/fsnotify/fsnotify v1.4.9 github.com/gin-gonic/gin v1.8.1 diff --git a/go.sum b/go.sum index 4be08248..a6ddc739 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib/v2 v2.0.0-20221213180201-60596c32d1bf h1:kPSTROCTa8pyQ13LzDgcA2mYfBZKU8ma6KVi/mwWux4= -github.com/aler9/gortsplib/v2 v2.0.0-20221213180201-60596c32d1bf/go.mod h1:zJ+fWtakOMN6cKV169EMNVBLPTITArrJKu/fyM+dov8= +github.com/aler9/gortsplib/v2 v2.0.0-20221213201904-04d1de717768 h1:Q7Qm3HjO8bYo0P2dPmKXYbfvpqhA9ZJuJ+TqN6UcdqI= +github.com/aler9/gortsplib/v2 v2.0.0-20221213201904-04d1de717768/go.mod h1:zJ+fWtakOMN6cKV169EMNVBLPTITArrJKu/fyM+dov8= github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9ToSVV7sRTBYZ1GGOZUpq4+5H3SN0UZq4= github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= diff --git a/internal/core/data.go b/internal/core/data.go index fa5d6055..5a65dc93 100644 --- a/internal/core/data.go +++ b/internal/core/data.go @@ -11,46 +11,3 @@ type data interface { getRTPPackets() []*rtp.Packet getNTP() time.Time } - -type dataGeneric struct { - rtpPackets []*rtp.Packet - ntp time.Time -} - -func (d *dataGeneric) getRTPPackets() []*rtp.Packet { - return d.rtpPackets -} - -func (d *dataGeneric) getNTP() time.Time { - return d.ntp -} - -type dataH264 struct { - rtpPackets []*rtp.Packet - ntp time.Time - pts time.Duration - nalus [][]byte -} - -func (d *dataH264) getRTPPackets() []*rtp.Packet { - return d.rtpPackets -} - -func (d *dataH264) getNTP() time.Time { - return d.ntp -} - -type dataMPEG4Audio struct { - rtpPackets []*rtp.Packet - ntp time.Time - pts time.Duration - aus [][]byte -} - -func (d *dataMPEG4Audio) getRTPPackets() []*rtp.Packet { - return d.rtpPackets -} - -func (d *dataMPEG4Audio) getNTP() time.Time { - return d.ntp -} diff --git a/internal/core/formatprocessor.go b/internal/core/formatprocessor.go index 1d74ea75..92386ed0 100644 --- a/internal/core/formatprocessor.go +++ b/internal/core/formatprocessor.go @@ -13,9 +13,15 @@ func newFormatProcessor(forma format.Format, generateRTPPackets bool) (formatPro case *format.H264: return newFormatProcessorH264(forma, generateRTPPackets) + case *format.H265: + return newFormatProcessorH265(forma, generateRTPPackets) + case *format.MPEG4Audio: return newFormatProcessorMPEG4Audio(forma, generateRTPPackets) + case *format.Opus: + return newFormatProcessorOpus(forma, generateRTPPackets) + default: return newFormatProcessorGeneric(forma, generateRTPPackets) } diff --git a/internal/core/formatprocessor_generic.go b/internal/core/formatprocessor_generic.go index 884a1a3d..062291bd 100644 --- a/internal/core/formatprocessor_generic.go +++ b/internal/core/formatprocessor_generic.go @@ -2,8 +2,10 @@ package core import ( "fmt" + "time" "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/pion/rtp" ) const ( @@ -11,6 +13,19 @@ const ( maxPacketSize = 1472 ) +type dataGeneric struct { + rtpPackets []*rtp.Packet + ntp time.Time +} + +func (d *dataGeneric) getRTPPackets() []*rtp.Packet { + return d.rtpPackets +} + +func (d *dataGeneric) getNTP() time.Time { + return d.ntp +} + type formatProcessorGeneric struct{} func newFormatProcessorGeneric(forma format.Format, generateRTPPackets bool) (*formatProcessorGeneric, error) { diff --git a/internal/core/formatprocessor_h264.go b/internal/core/formatprocessor_h264.go index 109f7b43..00cf0d07 100644 --- a/internal/core/formatprocessor_h264.go +++ b/internal/core/formatprocessor_h264.go @@ -2,6 +2,7 @@ package core import ( "bytes" + "time" "github.com/aler9/gortsplib/v2/pkg/format" "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtph264" @@ -9,6 +10,22 @@ import ( "github.com/pion/rtp" ) +type dataH264 struct { + rtpPackets []*rtp.Packet + ntp time.Time + pts time.Duration + nalus [][]byte +} + +func (d *dataH264) getRTPPackets() []*rtp.Packet { + return d.rtpPackets +} + +func (d *dataH264) getNTP() time.Time { + return d.ntp +} + +// extract SPS and PPS without decoding RTP packets func rtpH264ExtractSPSPPS(pkt *rtp.Packet) ([]byte, []byte) { if len(pkt.Payload) == 0 { return nil, nil diff --git a/internal/core/formatprocessor_h265.go b/internal/core/formatprocessor_h265.go new file mode 100644 index 00000000..e16dfe12 --- /dev/null +++ b/internal/core/formatprocessor_h265.go @@ -0,0 +1,121 @@ +package core + +import ( + "fmt" + "time" + + "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtph265" + "github.com/pion/rtp" +) + +type dataH265 struct { + rtpPackets []*rtp.Packet + ntp time.Time + pts time.Duration + nalus [][]byte +} + +func (d *dataH265) getRTPPackets() []*rtp.Packet { + return d.rtpPackets +} + +func (d *dataH265) getNTP() time.Time { + return d.ntp +} + +type formatProcessorH265 struct { + format *format.H265 + + encoder *rtph265.Encoder + decoder *rtph265.Decoder +} + +func newFormatProcessorH265( + forma *format.H265, + allocateEncoder bool, +) (*formatProcessorH265, error) { + t := &formatProcessorH265{ + format: forma, + } + + if allocateEncoder { + t.encoder = forma.CreateEncoder() + } + + return t, nil +} + +func (t *formatProcessorH265) updateTrackParametersFromRTPPacket(pkt *rtp.Packet) { + // TODO: extract VPS, SPS, PPS and set them into the track +} + +func (t *formatProcessorH265) updateTrackParametersFromNALUs(nalus [][]byte) { + // TODO: extract VPS, SPS, PPS and set them into the track +} + +func (t *formatProcessorH265) remuxNALUs(nalus [][]byte) [][]byte { + // TODO: add VPS, SPS, PPS before IDRs + return nalus +} + +func (t *formatProcessorH265) generateRTPPackets(tdata *dataH265) error { + pkts, err := t.encoder.Encode(tdata.nalus, tdata.pts) + if err != nil { + return err + } + + tdata.rtpPackets = pkts + return nil +} + +func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error { + tdata := dat.(*dataH265) + + if tdata.rtpPackets != nil { + pkt := tdata.rtpPackets[0] + t.updateTrackParametersFromRTPPacket(pkt) + + if t.encoder == nil { + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + // TODO: re-encode if oversized instead of printing errors + if pkt.MarshalSize() > maxPacketSize { + return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), maxPacketSize) + } + } + + // decode from RTP + if hasNonRTSPReaders || t.encoder != nil { + if t.decoder == nil { + t.decoder = t.format.CreateDecoder() + } + + nalus, pts, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtph265.ErrNonStartingPacketAndNoPrevious || err == rtph265.ErrMorePacketsNeeded { + return nil + } + return err + } + + tdata.nalus = nalus + tdata.pts = pts + + tdata.nalus = t.remuxNALUs(tdata.nalus) + } + + // route packet as is + if t.encoder == nil { + return nil + } + } else { + t.updateTrackParametersFromNALUs(tdata.nalus) + tdata.nalus = t.remuxNALUs(tdata.nalus) + } + + return t.generateRTPPackets(tdata) +} diff --git a/internal/core/formatprocessor_mpeg4audio.go b/internal/core/formatprocessor_mpeg4audio.go index c2534d6c..ef80770c 100644 --- a/internal/core/formatprocessor_mpeg4audio.go +++ b/internal/core/formatprocessor_mpeg4audio.go @@ -2,11 +2,28 @@ package core import ( "fmt" + "time" "github.com/aler9/gortsplib/v2/pkg/format" "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpmpeg4audio" + "github.com/pion/rtp" ) +type dataMPEG4Audio struct { + rtpPackets []*rtp.Packet + ntp time.Time + pts time.Duration + aus [][]byte +} + +func (d *dataMPEG4Audio) getRTPPackets() []*rtp.Packet { + return d.rtpPackets +} + +func (d *dataMPEG4Audio) getNTP() time.Time { + return d.ntp +} + type formatProcessorMPEG4Audio struct { format *format.MPEG4Audio encoder *rtpmpeg4audio.Encoder diff --git a/internal/core/formatprocessor_opus.go b/internal/core/formatprocessor_opus.go new file mode 100644 index 00000000..99209246 --- /dev/null +++ b/internal/core/formatprocessor_opus.go @@ -0,0 +1,93 @@ +package core + +import ( + "fmt" + "time" + + "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpsimpleaudio" + "github.com/pion/rtp" +) + +type dataOpus struct { + rtpPackets []*rtp.Packet + ntp time.Time + pts time.Duration + au []byte +} + +func (d *dataOpus) getRTPPackets() []*rtp.Packet { + return d.rtpPackets +} + +func (d *dataOpus) getNTP() time.Time { + return d.ntp +} + +type formatProcessorOpus struct { + format *format.Opus + encoder *rtpsimpleaudio.Encoder + decoder *rtpsimpleaudio.Decoder +} + +func newFormatProcessorOpus( + forma *format.Opus, + allocateEncoder bool, +) (*formatProcessorOpus, error) { + t := &formatProcessorOpus{ + format: forma, + } + + if allocateEncoder { + t.encoder = forma.CreateEncoder() + } + + return t, nil +} + +func (t *formatProcessorOpus) generateRTPPackets(tdata *dataOpus) error { + pkt, err := t.encoder.Encode(tdata.au, tdata.pts) + if err != nil { + return err + } + + tdata.rtpPackets = []*rtp.Packet{pkt} + return nil +} + +func (t *formatProcessorOpus) process(dat data, hasNonRTSPReaders bool) error { + tdata := dat.(*dataOpus) + + if tdata.rtpPackets != nil { + pkt := tdata.rtpPackets[0] + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > maxPacketSize { + return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), maxPacketSize) + } + + // decode from RTP + if hasNonRTSPReaders { + if t.decoder == nil { + t.decoder = t.format.CreateDecoder() + } + + au, pts, err := t.decoder.Decode(pkt) + if err != nil { + return err + } + + tdata.au = au + tdata.pts = pts + } + + // route packet as is + return nil + } + + return t.generateRTPPackets(tdata) +} diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 1dc257dd..ae30125f 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -318,6 +318,17 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R } }) + case *format.H265: + ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + err := s.stream.writeData(cmedia, cformat, &dataH265{ + rtpPackets: []*rtp.Packet{pkt}, + ntp: time.Now(), + }) + if err != nil { + s.log(logger.Warn, "%v", err) + } + }) + case *format.MPEG4Audio: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { err := s.stream.writeData(cmedia, cformat, &dataMPEG4Audio{ @@ -329,6 +340,17 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R } }) + case *format.Opus: + ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + err := s.stream.writeData(cmedia, cformat, &dataOpus{ + rtpPackets: []*rtp.Packet{pkt}, + ntp: time.Now(), + }) + if err != nil { + s.log(logger.Warn, "%v", err) + } + }) + default: ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { err := s.stream.writeData(cmedia, cformat, &dataGeneric{ diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index ae1d31d0..1ba14d51 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -159,6 +159,17 @@ func (s *rtspSource) run(ctx context.Context) error { } }) + case *format.H265: + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + err := res.stream.writeData(cmedia, cformat, &dataH265{ + rtpPackets: []*rtp.Packet{pkt}, + ntp: time.Now(), + }) + if err != nil { + s.Log(logger.Warn, "%v", err) + } + }) + case *format.MPEG4Audio: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { err := res.stream.writeData(cmedia, cformat, &dataMPEG4Audio{ @@ -170,6 +181,17 @@ func (s *rtspSource) run(ctx context.Context) error { } }) + case *format.Opus: + c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { + err := res.stream.writeData(cmedia, cformat, &dataOpus{ + rtpPackets: []*rtp.Packet{pkt}, + ntp: time.Now(), + }) + if err != nil { + s.Log(logger.Warn, "%v", err) + } + }) + default: c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { err := res.stream.writeData(cmedia, cformat, &dataGeneric{