Browse Source

add dedicated processors for H265 and Opus

pull/1242/head
aler9 2 years ago
parent
commit
4bafa4ea9b
  1. 2
      go.mod
  2. 4
      go.sum
  3. 43
      internal/core/data.go
  4. 6
      internal/core/formatprocessor.go
  5. 15
      internal/core/formatprocessor_generic.go
  6. 17
      internal/core/formatprocessor_h264.go
  7. 121
      internal/core/formatprocessor_h265.go
  8. 17
      internal/core/formatprocessor_mpeg4audio.go
  9. 93
      internal/core/formatprocessor_opus.go
  10. 22
      internal/core/rtsp_session.go
  11. 22
      internal/core/rtsp_source.go

2
go.mod

@ -5,7 +5,7 @@ go 1.18 @@ -5,7 +5,7 @@ go 1.18
require (
code.cloudfoundry.org/bytefmt v0.0.0-20211005130812-5bb3c17173e5
github.com/abema/go-mp4 v0.8.0
github.com/aler9/gortsplib/v2 v2.0.0-20221213180201-60596c32d1bf
github.com/aler9/gortsplib/v2 v2.0.0-20221213201904-04d1de717768
github.com/asticode/go-astits v1.10.1-0.20220319093903-4abe66a9b757
github.com/fsnotify/fsnotify v1.4.9
github.com/gin-gonic/gin v1.8.1

4
go.sum

@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib/v2 v2.0.0-20221213180201-60596c32d1bf h1:kPSTROCTa8pyQ13LzDgcA2mYfBZKU8ma6KVi/mwWux4=
github.com/aler9/gortsplib/v2 v2.0.0-20221213180201-60596c32d1bf/go.mod h1:zJ+fWtakOMN6cKV169EMNVBLPTITArrJKu/fyM+dov8=
github.com/aler9/gortsplib/v2 v2.0.0-20221213201904-04d1de717768 h1:Q7Qm3HjO8bYo0P2dPmKXYbfvpqhA9ZJuJ+TqN6UcdqI=
github.com/aler9/gortsplib/v2 v2.0.0-20221213201904-04d1de717768/go.mod h1:zJ+fWtakOMN6cKV169EMNVBLPTITArrJKu/fyM+dov8=
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9ToSVV7sRTBYZ1GGOZUpq4+5H3SN0UZq4=
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=

43
internal/core/data.go

@ -11,46 +11,3 @@ type data interface { @@ -11,46 +11,3 @@ type data interface {
getRTPPackets() []*rtp.Packet
getNTP() time.Time
}
type dataGeneric struct {
rtpPackets []*rtp.Packet
ntp time.Time
}
func (d *dataGeneric) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataGeneric) getNTP() time.Time {
return d.ntp
}
type dataH264 struct {
rtpPackets []*rtp.Packet
ntp time.Time
pts time.Duration
nalus [][]byte
}
func (d *dataH264) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataH264) getNTP() time.Time {
return d.ntp
}
type dataMPEG4Audio struct {
rtpPackets []*rtp.Packet
ntp time.Time
pts time.Duration
aus [][]byte
}
func (d *dataMPEG4Audio) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataMPEG4Audio) getNTP() time.Time {
return d.ntp
}

6
internal/core/formatprocessor.go

@ -13,9 +13,15 @@ func newFormatProcessor(forma format.Format, generateRTPPackets bool) (formatPro @@ -13,9 +13,15 @@ func newFormatProcessor(forma format.Format, generateRTPPackets bool) (formatPro
case *format.H264:
return newFormatProcessorH264(forma, generateRTPPackets)
case *format.H265:
return newFormatProcessorH265(forma, generateRTPPackets)
case *format.MPEG4Audio:
return newFormatProcessorMPEG4Audio(forma, generateRTPPackets)
case *format.Opus:
return newFormatProcessorOpus(forma, generateRTPPackets)
default:
return newFormatProcessorGeneric(forma, generateRTPPackets)
}

15
internal/core/formatprocessor_generic.go

@ -2,8 +2,10 @@ package core @@ -2,8 +2,10 @@ package core
import (
"fmt"
"time"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/pion/rtp"
)
const (
@ -11,6 +13,19 @@ const ( @@ -11,6 +13,19 @@ const (
maxPacketSize = 1472
)
type dataGeneric struct {
rtpPackets []*rtp.Packet
ntp time.Time
}
func (d *dataGeneric) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataGeneric) getNTP() time.Time {
return d.ntp
}
type formatProcessorGeneric struct{}
func newFormatProcessorGeneric(forma format.Format, generateRTPPackets bool) (*formatProcessorGeneric, error) {

17
internal/core/formatprocessor_h264.go

@ -2,6 +2,7 @@ package core @@ -2,6 +2,7 @@ package core
import (
"bytes"
"time"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtph264"
@ -9,6 +10,22 @@ import ( @@ -9,6 +10,22 @@ import (
"github.com/pion/rtp"
)
type dataH264 struct {
rtpPackets []*rtp.Packet
ntp time.Time
pts time.Duration
nalus [][]byte
}
func (d *dataH264) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataH264) getNTP() time.Time {
return d.ntp
}
// extract SPS and PPS without decoding RTP packets
func rtpH264ExtractSPSPPS(pkt *rtp.Packet) ([]byte, []byte) {
if len(pkt.Payload) == 0 {
return nil, nil

121
internal/core/formatprocessor_h265.go

@ -0,0 +1,121 @@ @@ -0,0 +1,121 @@
package core
import (
"fmt"
"time"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtph265"
"github.com/pion/rtp"
)
type dataH265 struct {
rtpPackets []*rtp.Packet
ntp time.Time
pts time.Duration
nalus [][]byte
}
func (d *dataH265) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataH265) getNTP() time.Time {
return d.ntp
}
type formatProcessorH265 struct {
format *format.H265
encoder *rtph265.Encoder
decoder *rtph265.Decoder
}
func newFormatProcessorH265(
forma *format.H265,
allocateEncoder bool,
) (*formatProcessorH265, error) {
t := &formatProcessorH265{
format: forma,
}
if allocateEncoder {
t.encoder = forma.CreateEncoder()
}
return t, nil
}
func (t *formatProcessorH265) updateTrackParametersFromRTPPacket(pkt *rtp.Packet) {
// TODO: extract VPS, SPS, PPS and set them into the track
}
func (t *formatProcessorH265) updateTrackParametersFromNALUs(nalus [][]byte) {
// TODO: extract VPS, SPS, PPS and set them into the track
}
func (t *formatProcessorH265) remuxNALUs(nalus [][]byte) [][]byte {
// TODO: add VPS, SPS, PPS before IDRs
return nalus
}
func (t *formatProcessorH265) generateRTPPackets(tdata *dataH265) error {
pkts, err := t.encoder.Encode(tdata.nalus, tdata.pts)
if err != nil {
return err
}
tdata.rtpPackets = pkts
return nil
}
func (t *formatProcessorH265) process(dat data, hasNonRTSPReaders bool) error {
tdata := dat.(*dataH265)
if tdata.rtpPackets != nil {
pkt := tdata.rtpPackets[0]
t.updateTrackParametersFromRTPPacket(pkt)
if t.encoder == nil {
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
// TODO: re-encode if oversized instead of printing errors
if pkt.MarshalSize() > maxPacketSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), maxPacketSize)
}
}
// decode from RTP
if hasNonRTSPReaders || t.encoder != nil {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
}
nalus, pts, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtph265.ErrNonStartingPacketAndNoPrevious || err == rtph265.ErrMorePacketsNeeded {
return nil
}
return err
}
tdata.nalus = nalus
tdata.pts = pts
tdata.nalus = t.remuxNALUs(tdata.nalus)
}
// route packet as is
if t.encoder == nil {
return nil
}
} else {
t.updateTrackParametersFromNALUs(tdata.nalus)
tdata.nalus = t.remuxNALUs(tdata.nalus)
}
return t.generateRTPPackets(tdata)
}

17
internal/core/formatprocessor_mpeg4audio.go

@ -2,11 +2,28 @@ package core @@ -2,11 +2,28 @@ package core
import (
"fmt"
"time"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpmpeg4audio"
"github.com/pion/rtp"
)
type dataMPEG4Audio struct {
rtpPackets []*rtp.Packet
ntp time.Time
pts time.Duration
aus [][]byte
}
func (d *dataMPEG4Audio) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataMPEG4Audio) getNTP() time.Time {
return d.ntp
}
type formatProcessorMPEG4Audio struct {
format *format.MPEG4Audio
encoder *rtpmpeg4audio.Encoder

93
internal/core/formatprocessor_opus.go

@ -0,0 +1,93 @@ @@ -0,0 +1,93 @@
package core
import (
"fmt"
"time"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/formatdecenc/rtpsimpleaudio"
"github.com/pion/rtp"
)
type dataOpus struct {
rtpPackets []*rtp.Packet
ntp time.Time
pts time.Duration
au []byte
}
func (d *dataOpus) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataOpus) getNTP() time.Time {
return d.ntp
}
type formatProcessorOpus struct {
format *format.Opus
encoder *rtpsimpleaudio.Encoder
decoder *rtpsimpleaudio.Decoder
}
func newFormatProcessorOpus(
forma *format.Opus,
allocateEncoder bool,
) (*formatProcessorOpus, error) {
t := &formatProcessorOpus{
format: forma,
}
if allocateEncoder {
t.encoder = forma.CreateEncoder()
}
return t, nil
}
func (t *formatProcessorOpus) generateRTPPackets(tdata *dataOpus) error {
pkt, err := t.encoder.Encode(tdata.au, tdata.pts)
if err != nil {
return err
}
tdata.rtpPackets = []*rtp.Packet{pkt}
return nil
}
func (t *formatProcessorOpus) process(dat data, hasNonRTSPReaders bool) error {
tdata := dat.(*dataOpus)
if tdata.rtpPackets != nil {
pkt := tdata.rtpPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > maxPacketSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), maxPacketSize)
}
// decode from RTP
if hasNonRTSPReaders {
if t.decoder == nil {
t.decoder = t.format.CreateDecoder()
}
au, pts, err := t.decoder.Decode(pkt)
if err != nil {
return err
}
tdata.au = au
tdata.pts = pts
}
// route packet as is
return nil
}
return t.generateRTPPackets(tdata)
}

22
internal/core/rtsp_session.go

@ -318,6 +318,17 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -318,6 +318,17 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
}
})
case *format.H265:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &dataH265{
rtpPackets: []*rtp.Packet{pkt},
ntp: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
case *format.MPEG4Audio:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &dataMPEG4Audio{
@ -329,6 +340,17 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -329,6 +340,17 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
}
})
case *format.Opus:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &dataOpus{
rtpPackets: []*rtp.Packet{pkt},
ntp: time.Now(),
})
if err != nil {
s.log(logger.Warn, "%v", err)
}
})
default:
ctx.Session.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := s.stream.writeData(cmedia, cformat, &dataGeneric{

22
internal/core/rtsp_source.go

@ -159,6 +159,17 @@ func (s *rtspSource) run(ctx context.Context) error { @@ -159,6 +159,17 @@ func (s *rtspSource) run(ctx context.Context) error {
}
})
case *format.H265:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &dataH265{
rtpPackets: []*rtp.Packet{pkt},
ntp: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
case *format.MPEG4Audio:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &dataMPEG4Audio{
@ -170,6 +181,17 @@ func (s *rtspSource) run(ctx context.Context) error { @@ -170,6 +181,17 @@ func (s *rtspSource) run(ctx context.Context) error {
}
})
case *format.Opus:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &dataOpus{
rtpPackets: []*rtp.Packet{pkt},
ntp: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
})
default:
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
err := res.stream.writeData(cmedia, cformat, &dataGeneric{

Loading…
Cancel
Save