Browse Source

generate RTP packets after H264 remuxing

Previously, RTP packets coming from sources other than RTSP (that
actually are RTMP and HLS) were generated before the H264 remuxing, and
that leaded to invalid streams, expecially when sourceOnDemand is true
and the stream has invalid or dynamic SPS/PPS.
pull/1057/head
aler9 3 years ago
parent
commit
3606472e82
  1. 19
      internal/core/data.go
  2. 6
      internal/core/hls_muxer.go
  3. 66
      internal/core/hls_source.go
  4. 6
      internal/core/hls_source_test.go
  5. 68
      internal/core/path.go
  6. 103
      internal/core/rtmp_conn.go
  7. 68
      internal/core/rtmp_source.go
  8. 11
      internal/core/rtsp_session.go
  9. 11
      internal/core/rtsp_source.go
  10. 78
      internal/core/stream.go
  11. 27
      internal/core/streamtrack.go
  12. 19
      internal/core/streamtrack_generic.go
  13. 140
      internal/core/streamtrack_h264.go
  14. 58
      internal/core/streamtrack_mpeg4audio.go

19
internal/core/data.go

@ -6,10 +6,21 @@ import ( @@ -6,10 +6,21 @@ import (
"github.com/pion/rtp"
)
// data is the data unit routed across the server.
// it must contain one or more of the following:
// - a single RTP packet
// - a group of H264 NALUs (grouped by timestamp)
// - a single AAC AU
type data struct {
trackID int
rtp *rtp.Packet
trackID int
rtpPacket *rtp.Packet
// timing
ptsEqualsDTS bool
h264NALUs [][]byte
h264PTS time.Duration
pts time.Duration
h264NALUs [][]byte
mpeg4AudioAU []byte
}

6
internal/core/hls_muxer.go

@ -366,17 +366,17 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -366,17 +366,17 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
}
if videoInitialPTS == nil {
v := data.h264PTS
v := data.pts
videoInitialPTS = &v
}
pts := data.h264PTS - *videoInitialPTS
pts := data.pts - *videoInitialPTS
err = m.muxer.WriteH264(pts, data.h264NALUs)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
} else if audioTrack != nil && data.trackID == audioTrackID {
aus, pts, err := aacDecoder.Decode(data.rtp)
aus, pts, err := aacDecoder.Decode(data.rtpPacket)
if err != nil {
if err != rtpmpeg4audio.ErrMorePacketsNeeded {
m.log(logger.Warn, "unable to decode audio track: %v", err)

66
internal/core/hls_source.go

@ -6,8 +6,6 @@ import ( @@ -6,8 +6,6 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/aler9/gortsplib/pkg/rtpmpeg4audio"
"github.com/aler9/rtsp-simple-server/internal/hls"
"github.com/aler9/rtsp-simple-server/internal/logger"
@ -46,8 +44,6 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -46,8 +44,6 @@ func (s *hlsSource) run(ctx context.Context) error {
var stream *stream
var videoTrackID int
var audioTrackID int
var videoEnc *rtph264.Encoder
var audioEnc *rtpmpeg4audio.Encoder
defer func() {
if stream != nil {
@ -60,25 +56,18 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -60,25 +56,18 @@ func (s *hlsSource) run(ctx context.Context) error {
if videoTrack != nil {
videoTrackID = len(tracks)
videoEnc = &rtph264.Encoder{PayloadType: 96}
videoEnc.Init()
tracks = append(tracks, videoTrack)
}
if audioTrack != nil {
audioTrackID = len(tracks)
audioEnc = &rtpmpeg4audio.Encoder{
PayloadType: 96,
SampleRate: audioTrack.ClockRate(),
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
audioEnc.Init()
tracks = append(tracks, audioTrack)
}
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
tracks: tracks,
generateRTPPackets: true,
})
if res.err != nil {
return res.err
}
@ -94,29 +83,12 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -94,29 +83,12 @@ func (s *hlsSource) run(ctx context.Context) error {
return
}
pkts, err := videoEnc.Encode(nalus, pts)
if err != nil {
return
}
lastPkt := len(pkts) - 1
for i, pkt := range pkts {
if i != lastPkt {
stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt,
ptsEqualsDTS: false,
})
} else {
stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt,
ptsEqualsDTS: h264.IDRPresent(nalus),
h264NALUs: nalus,
h264PTS: pts,
})
}
}
stream.writeData(&data{
trackID: videoTrackID,
ptsEqualsDTS: h264.IDRPresent(nalus),
pts: pts,
h264NALUs: nalus,
})
}
onAudioData := func(pts time.Duration, au []byte) {
@ -124,18 +96,12 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -124,18 +96,12 @@ func (s *hlsSource) run(ctx context.Context) error {
return
}
pkts, err := audioEnc.Encode([][]byte{au}, pts)
if err != nil {
return
}
for _, pkt := range pkts {
stream.writeData(&data{
trackID: audioTrackID,
rtp: pkt,
ptsEqualsDTS: true,
})
}
stream.writeData(&data{
trackID: audioTrackID,
ptsEqualsDTS: true,
pts: pts,
mpeg4AudioAU: au,
})
}
c, err := hls.NewClient(

6
internal/core/hls_source_test.go

@ -135,7 +135,11 @@ func TestHLSSource(t *testing.T) { @@ -135,7 +135,11 @@ func TestHLSSource(t *testing.T) {
c := gortsplib.Client{
OnPacketRTP: func(ctx *gortsplib.ClientOnPacketRTPCtx) {
require.Equal(t, []byte{0x05}, ctx.Packet.Payload)
require.Equal(t, [][]byte{
{0x07, 0x01, 0x02, 0x03},
{0x08},
{0x05},
}, ctx.H264NALUs)
close(frameRecv)
},
}

68
internal/core/path.go

@ -92,8 +92,9 @@ type pathSourceStaticSetReadyRes struct { @@ -92,8 +92,9 @@ type pathSourceStaticSetReadyRes struct {
}
type pathSourceStaticSetReadyReq struct {
tracks gortsplib.Tracks
res chan pathSourceStaticSetReadyRes
tracks gortsplib.Tracks
generateRTPPackets bool
res chan pathSourceStaticSetReadyRes
}
type pathSourceStaticSetNotReadyReq struct {
@ -160,9 +161,10 @@ type pathPublisherRecordRes struct { @@ -160,9 +161,10 @@ type pathPublisherRecordRes struct {
}
type pathPublisherRecordReq struct {
author publisher
tracks gortsplib.Tracks
res chan pathPublisherRecordRes
author publisher
tracks gortsplib.Tracks
generateRTPPackets bool
res chan pathPublisherRecordRes
}
type pathReaderPauseReq struct {
@ -431,29 +433,32 @@ func (pa *path) run() { @@ -431,29 +433,32 @@ func (pa *path) run() {
}
case req := <-pa.chSourceStaticSetReady:
pa.sourceSetReady(req.tracks)
if pa.hasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
pa.onDemandStaticSourceScheduleClose()
err := pa.sourceSetReady(req.tracks, req.generateRTPPackets)
if err != nil {
req.res <- pathSourceStaticSetReadyRes{err: err}
} else {
if pa.hasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
pa.onDemandStaticSourceScheduleClose()
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
for _, req := range pa.setupPlayRequestsOnHold {
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequestsOnHold = nil
}
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequestsOnHold = nil
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
}
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
case req := <-pa.chSourceStaticSetNotReady:
pa.sourceSetNotReady()
@ -657,9 +662,14 @@ func (pa *path) onDemandPublisherStop() { @@ -657,9 +662,14 @@ func (pa *path) onDemandPublisherStop() {
}
}
func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
func (pa *path) sourceSetReady(tracks gortsplib.Tracks, generateRTPPackets bool) error {
stream, err := newStream(tracks, generateRTPPackets)
if err != nil {
return err
}
pa.stream = stream
pa.sourceReady = true
pa.stream = newStream(tracks)
if pa.conf.RunOnReady != "" {
pa.log(logger.Info, "runOnReady command started")
@ -674,6 +684,8 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) { @@ -674,6 +684,8 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
}
pa.parent.pathSourceReady(pa)
return nil
}
func (pa *path) sourceSetNotReady() {
@ -808,9 +820,13 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) { @@ -808,9 +820,13 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
return
}
req.author.onPublisherAccepted(len(req.tracks))
err := pa.sourceSetReady(req.tracks, req.generateRTPPackets)
if err != nil {
req.res <- pathPublisherRecordRes{err: err}
return
}
pa.sourceSetReady(req.tracks)
req.author.onPublisherAccepted(len(req.tracks))
if pa.hasOnDemandPublisher() {
pa.onDemandPublisherReadyTimer.Stop()

103
internal/core/rtmp_conn.go

@ -14,7 +14,6 @@ import ( @@ -14,7 +14,6 @@ import (
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/aler9/gortsplib/pkg/rtpmpeg4audio"
"github.com/notedit/rtmp/format/flv/flvio"
@ -346,11 +345,11 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -346,11 +345,11 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
// while audio is decoded in this routine:
// we have to sync their PTS.
if videoInitialPTS == nil {
v := data.h264PTS
v := data.pts
videoInitialPTS = &v
}
pts := data.h264PTS - *videoInitialPTS
pts := data.pts - *videoInitialPTS
idrPresent := false
nonIDRPresent := false
@ -442,7 +441,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -442,7 +441,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return err
}
} else if audioTrack != nil && data.trackID == audioTrackID {
aus, pts, err := aacDecoder.Decode(data.rtp)
aus, pts, err := aacDecoder.Decode(data.rtpPacket)
if err != nil {
if err != rtpmpeg4audio.ErrMorePacketsNeeded {
c.log(logger.Warn, "unable to decode audio track: %v", err)
@ -491,24 +490,12 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -491,24 +490,12 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
videoTrackID := -1
audioTrackID := -1
var h264Encoder *rtph264.Encoder
if videoTrack != nil {
h264Encoder = &rtph264.Encoder{PayloadType: 96}
h264Encoder.Init()
videoTrackID = len(tracks)
tracks = append(tracks, videoTrack)
}
var aacEncoder *rtpmpeg4audio.Encoder
if audioTrack != nil {
aacEncoder = &rtpmpeg4audio.Encoder{
PayloadType: 96,
SampleRate: audioTrack.ClockRate(),
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
aacEncoder.Init()
audioTrackID = len(tracks)
tracks = append(tracks, audioTrack)
}
@ -550,8 +537,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -550,8 +537,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
c.nconn.SetWriteDeadline(time.Time{})
rres := c.path.publisherRecord(pathPublisherRecordReq{
author: c,
tracks: tracks,
author: c,
tracks: tracks,
generateRTPPackets: true,
})
if rres.err != nil {
return rres.err
@ -573,35 +561,17 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -573,35 +561,17 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
return fmt.Errorf("unable to parse H264 config: %v", err)
}
pts := tmsg.DTS + tmsg.PTSDelta
nalus := [][]byte{
conf.SPS,
conf.PPS,
}
pkts, err := h264Encoder.Encode(nalus, pts)
if err != nil {
return fmt.Errorf("error while encoding H264: %v", err)
}
lastPkt := len(pkts) - 1
for i, pkt := range pkts {
if i != lastPkt {
rres.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt,
ptsEqualsDTS: false,
})
} else {
rres.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt,
ptsEqualsDTS: false,
h264NALUs: nalus,
h264PTS: pts,
})
}
}
rres.stream.writeData(&data{
trackID: videoTrackID,
ptsEqualsDTS: false,
pts: tmsg.DTS + tmsg.PTSDelta,
h264NALUs: nalus,
})
} else if tmsg.H264Type == flvio.AVC_NALU {
if videoTrack == nil {
return fmt.Errorf("received an H264 packet, but track is not set up")
@ -632,31 +602,12 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -632,31 +602,12 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
}
}
pts := tmsg.DTS + tmsg.PTSDelta
pkts, err := h264Encoder.Encode(validNALUs, pts)
if err != nil {
return fmt.Errorf("error while encoding H264: %v", err)
}
lastPkt := len(pkts) - 1
for i, pkt := range pkts {
if i != lastPkt {
rres.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt,
ptsEqualsDTS: false,
})
} else {
rres.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt,
ptsEqualsDTS: h264.IDRPresent(validNALUs),
h264NALUs: validNALUs,
h264PTS: pts,
})
}
}
rres.stream.writeData(&data{
trackID: videoTrackID,
ptsEqualsDTS: h264.IDRPresent(validNALUs),
pts: tmsg.DTS + tmsg.PTSDelta,
h264NALUs: validNALUs,
})
}
case *message.MsgAudio:
@ -665,18 +616,12 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -665,18 +616,12 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
return fmt.Errorf("received an AAC packet, but track is not set up")
}
pkts, err := aacEncoder.Encode([][]byte{tmsg.Payload}, tmsg.DTS)
if err != nil {
return fmt.Errorf("error while encoding AAC: %v", err)
}
for _, pkt := range pkts {
rres.stream.writeData(&data{
trackID: audioTrackID,
rtp: pkt,
ptsEqualsDTS: true,
})
}
rres.stream.writeData(&data{
trackID: audioTrackID,
ptsEqualsDTS: true,
pts: tmsg.DTS,
mpeg4AudioAU: tmsg.Payload,
})
}
}
}

68
internal/core/rtmp_source.go

@ -9,8 +9,6 @@ import ( @@ -9,8 +9,6 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/aler9/gortsplib/pkg/rtpmpeg4audio"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/rtsp-simple-server/internal/conf"
@ -97,29 +95,20 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -97,29 +95,20 @@ func (s *rtmpSource) run(ctx context.Context) error {
videoTrackID := -1
audioTrackID := -1
var h264Encoder *rtph264.Encoder
if videoTrack != nil {
h264Encoder = &rtph264.Encoder{PayloadType: 96}
h264Encoder.Init()
videoTrackID = len(tracks)
tracks = append(tracks, videoTrack)
}
var aacEncoder *rtpmpeg4audio.Encoder
if audioTrack != nil {
aacEncoder = &rtpmpeg4audio.Encoder{
PayloadType: 96,
SampleRate: audioTrack.ClockRate(),
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
aacEncoder.Init()
audioTrackID = len(tracks)
tracks = append(tracks, audioTrack)
}
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
tracks: tracks,
generateRTPPackets: true,
})
if res.err != nil {
return res.err
}
@ -149,31 +138,12 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -149,31 +138,12 @@ func (s *rtmpSource) run(ctx context.Context) error {
return fmt.Errorf("unable to decode AVCC: %v", err)
}
pts := tmsg.DTS + tmsg.PTSDelta
pkts, err := h264Encoder.Encode(nalus, pts)
if err != nil {
return fmt.Errorf("error while encoding H264: %v", err)
}
lastPkt := len(pkts) - 1
for i, pkt := range pkts {
if i != lastPkt {
res.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt,
ptsEqualsDTS: false,
})
} else {
res.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt,
ptsEqualsDTS: h264.IDRPresent(nalus),
h264NALUs: nalus,
h264PTS: pts,
})
}
}
res.stream.writeData(&data{
trackID: videoTrackID,
ptsEqualsDTS: h264.IDRPresent(nalus),
pts: tmsg.DTS + tmsg.PTSDelta,
h264NALUs: nalus,
})
}
case *message.MsgAudio:
@ -182,18 +152,12 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -182,18 +152,12 @@ func (s *rtmpSource) run(ctx context.Context) error {
return fmt.Errorf("received an AAC packet, but track is not set up")
}
pkts, err := aacEncoder.Encode([][]byte{tmsg.Payload}, tmsg.DTS)
if err != nil {
return fmt.Errorf("error while encoding AAC: %v", err)
}
for _, pkt := range pkts {
res.stream.writeData(&data{
trackID: audioTrackID,
rtp: pkt,
ptsEqualsDTS: true,
})
}
res.stream.writeData(&data{
trackID: audioTrackID,
ptsEqualsDTS: true,
pts: tmsg.DTS,
mpeg4AudioAU: tmsg.Payload,
})
}
}
}

11
internal/core/rtsp_session.go

@ -273,8 +273,9 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -273,8 +273,9 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
// onRecord is called by rtspServer.
func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
res := s.path.publisherRecord(pathPublisherRecordReq{
author: s,
tracks: s.announcedTracks,
author: s,
tracks: s.announcedTracks,
generateRTPPackets: false,
})
if res.err != nil {
return &base.Response{
@ -391,15 +392,15 @@ func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) { @@ -391,15 +392,15 @@ func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
if ctx.H264NALUs != nil {
s.stream.writeData(&data{
trackID: ctx.TrackID,
rtp: ctx.Packet,
rtpPacket: ctx.Packet,
ptsEqualsDTS: ctx.PTSEqualsDTS,
pts: ctx.H264PTS,
h264NALUs: append([][]byte(nil), ctx.H264NALUs...),
h264PTS: ctx.H264PTS,
})
} else {
s.stream.writeData(&data{
trackID: ctx.TrackID,
rtp: ctx.Packet,
rtpPacket: ctx.Packet,
ptsEqualsDTS: ctx.PTSEqualsDTS,
})
}

11
internal/core/rtsp_source.go

@ -125,7 +125,10 @@ func (s *rtspSource) run(ctx context.Context) error { @@ -125,7 +125,10 @@ func (s *rtspSource) run(ctx context.Context) error {
}
}
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{tracks: tracks})
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
tracks: tracks,
generateRTPPackets: false,
})
if res.err != nil {
return res.err
}
@ -140,15 +143,15 @@ func (s *rtspSource) run(ctx context.Context) error { @@ -140,15 +143,15 @@ func (s *rtspSource) run(ctx context.Context) error {
if ctx.H264NALUs != nil {
res.stream.writeData(&data{
trackID: ctx.TrackID,
rtp: ctx.Packet,
rtpPacket: ctx.Packet,
ptsEqualsDTS: ctx.PTSEqualsDTS,
pts: ctx.H264PTS,
h264NALUs: append([][]byte(nil), ctx.H264NALUs...),
h264PTS: ctx.H264PTS,
})
} else {
res.stream.writeData(&data{
trackID: ctx.TrackID,
rtp: ctx.Packet,
rtpPacket: ctx.Packet,
ptsEqualsDTS: ctx.PTSEqualsDTS,
})
}

78
internal/core/stream.go

@ -1,11 +1,9 @@ @@ -1,11 +1,9 @@
package core
import (
"bytes"
"sync"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
)
type streamNonRTSPReadersMap struct {
@ -37,7 +35,7 @@ func (m *streamNonRTSPReadersMap) remove(r reader) { @@ -37,7 +35,7 @@ func (m *streamNonRTSPReadersMap) remove(r reader) {
delete(m.ma, r)
}
func (m *streamNonRTSPReadersMap) forwardPacketRTP(data *data) {
func (m *streamNonRTSPReadersMap) writeData(data *data) {
m.mutex.RLock()
defer m.mutex.RUnlock()
@ -49,14 +47,26 @@ func (m *streamNonRTSPReadersMap) forwardPacketRTP(data *data) { @@ -49,14 +47,26 @@ func (m *streamNonRTSPReadersMap) forwardPacketRTP(data *data) {
type stream struct {
nonRTSPReaders *streamNonRTSPReadersMap
rtspStream *gortsplib.ServerStream
streamTracks []streamTrack
}
func newStream(tracks gortsplib.Tracks) *stream {
func newStream(tracks gortsplib.Tracks, generateRTPPackets bool) (*stream, error) {
s := &stream{
nonRTSPReaders: newStreamNonRTSPReadersMap(),
rtspStream: gortsplib.NewServerStream(tracks),
}
return s
s.streamTracks = make([]streamTrack, len(s.rtspStream.Tracks()))
for i, track := range s.rtspStream.Tracks() {
var err error
s.streamTracks[i], err = newStreamTrack(track, generateRTPPackets, s.writeDataInner)
if err != nil {
return nil, err
}
}
return s, nil
}
func (s *stream) close() {
@ -80,62 +90,14 @@ func (s *stream) readerRemove(r reader) { @@ -80,62 +90,14 @@ func (s *stream) readerRemove(r reader) {
}
}
func (s *stream) updateH264TrackParameters(h264track *gortsplib.TrackH264, nalus [][]byte) {
for _, nalu := range nalus {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
if !bytes.Equal(nalu, h264track.SafeSPS()) {
h264track.SafeSetSPS(append([]byte(nil), nalu...))
}
case h264.NALUTypePPS:
if !bytes.Equal(nalu, h264track.SafePPS()) {
h264track.SafeSetPPS(append([]byte(nil), nalu...))
}
}
}
}
// remux is needed to
// - fix corrupted streams
// - make streams compatible with all protocols
func (s *stream) remuxH264NALUs(h264track *gortsplib.TrackH264, data *data) {
var filteredNALUs [][]byte //nolint:prealloc
for _, nalu := range data.h264NALUs {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS:
// remove since they're automatically added before every IDR
continue
case h264.NALUTypeAccessUnitDelimiter:
// remove since it is not needed
continue
case h264.NALUTypeIDR:
// add SPS and PPS before every IDR
filteredNALUs = append(filteredNALUs, h264track.SafeSPS(), h264track.SafePPS())
}
filteredNALUs = append(filteredNALUs, nalu)
}
data.h264NALUs = filteredNALUs
}
func (s *stream) writeData(data *data) {
track := s.rtspStream.Tracks()[data.trackID]
if h264track, ok := track.(*gortsplib.TrackH264); ok {
s.updateH264TrackParameters(h264track, data.h264NALUs)
s.remuxH264NALUs(h264track, data)
}
s.streamTracks[data.trackID].writeData(data)
}
func (s *stream) writeDataInner(data *data) {
// forward to RTSP readers
s.rtspStream.WritePacketRTP(data.trackID, data.rtp, data.ptsEqualsDTS)
s.rtspStream.WritePacketRTP(data.trackID, data.rtpPacket, data.ptsEqualsDTS)
// forward to non-RTSP readers
s.nonRTSPReaders.forwardPacketRTP(data)
s.nonRTSPReaders.writeData(data)
}

27
internal/core/streamtrack.go

@ -0,0 +1,27 @@ @@ -0,0 +1,27 @@
package core
import (
"fmt"
"github.com/aler9/gortsplib"
)
type streamTrack interface {
writeData(*data)
}
func newStreamTrack(track gortsplib.Track, generateRTPPackets bool, writeDataInner func(*data)) (streamTrack, error) {
switch ttrack := track.(type) {
case *gortsplib.TrackH264:
return newStreamTrackH264(ttrack, generateRTPPackets, writeDataInner), nil
case *gortsplib.TrackMPEG4Audio:
return newStreamTrackMPEG4Audio(ttrack, generateRTPPackets, writeDataInner), nil
default:
if generateRTPPackets {
return nil, fmt.Errorf("we don't know how to generate RTP packets of track %+v", track)
}
return newStreamTrackGeneric(track, writeDataInner), nil
}
}

19
internal/core/streamtrack_generic.go

@ -0,0 +1,19 @@ @@ -0,0 +1,19 @@
package core
import (
"github.com/aler9/gortsplib"
)
type streamTrackGeneric struct {
writeDataInner func(*data)
}
func newStreamTrackGeneric(track gortsplib.Track, writeDataInner func(*data)) *streamTrackGeneric {
return &streamTrackGeneric{
writeDataInner: writeDataInner,
}
}
func (t *streamTrackGeneric) writeData(data *data) {
t.writeDataInner(data)
}

140
internal/core/streamtrack_h264.go

@ -0,0 +1,140 @@ @@ -0,0 +1,140 @@
package core
import (
"bytes"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/rtph264"
)
type streamTrackH264 struct {
track *gortsplib.TrackH264
writeDataInner func(*data)
rtpEncoder *rtph264.Encoder
}
func newStreamTrackH264(
track *gortsplib.TrackH264,
generateRTPPackets bool,
writeDataInner func(*data),
) *streamTrackH264 {
t := &streamTrackH264{
track: track,
writeDataInner: writeDataInner,
}
if generateRTPPackets {
t.rtpEncoder = &rtph264.Encoder{PayloadType: 96}
t.rtpEncoder.Init()
}
return t
}
func (t *streamTrackH264) updateTrackParameters(nalus [][]byte) {
for _, nalu := range nalus {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
if !bytes.Equal(nalu, t.track.SafeSPS()) {
t.track.SafeSetSPS(append([]byte(nil), nalu...))
}
case h264.NALUTypePPS:
if !bytes.Equal(nalu, t.track.SafePPS()) {
t.track.SafeSetPPS(append([]byte(nil), nalu...))
}
}
}
}
// remux is needed to
// - fix corrupted streams
// - make streams compatible with all protocols
func (t *streamTrackH264) remuxNALUs(dat *data) {
n := 0
for _, nalu := range dat.h264NALUs {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS:
continue
case h264.NALUTypeAccessUnitDelimiter:
continue
case h264.NALUTypeIDR:
n += 2
}
n++
}
filteredNALUs := make([][]byte, n)
i := 0
for _, nalu := range dat.h264NALUs {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS:
// remove since they're automatically added before every IDR
continue
case h264.NALUTypeAccessUnitDelimiter:
// remove since it is not needed
continue
case h264.NALUTypeIDR:
// add SPS and PPS before every IDR
filteredNALUs[i] = t.track.SafeSPS()
i++
filteredNALUs[i] = t.track.SafePPS()
i++
}
filteredNALUs[i] = nalu
i++
}
dat.h264NALUs = filteredNALUs
}
func (t *streamTrackH264) generateRTPPackets(dat *data) {
// if remuxNALUs() returned nil, do not write any data
if dat.h264NALUs == nil {
return
}
pkts, err := t.rtpEncoder.Encode(dat.h264NALUs, dat.pts)
if err != nil {
return
}
lastPkt := len(pkts) - 1
for i, pkt := range pkts {
if i != lastPkt {
t.writeDataInner(&data{
trackID: dat.trackID,
rtpPacket: pkt,
})
} else {
t.writeDataInner(&data{
trackID: dat.trackID,
rtpPacket: pkt,
ptsEqualsDTS: dat.ptsEqualsDTS,
pts: dat.pts,
h264NALUs: dat.h264NALUs,
})
}
}
}
func (t *streamTrackH264) writeData(dat *data) {
t.updateTrackParameters(dat.h264NALUs)
t.remuxNALUs(dat)
if dat.rtpPacket != nil {
t.writeDataInner(dat)
} else {
t.generateRTPPackets(dat)
}
}

58
internal/core/streamtrack_mpeg4audio.go

@ -0,0 +1,58 @@ @@ -0,0 +1,58 @@
package core
import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/rtpmpeg4audio"
)
type streamTrackMPEG4Audio struct {
writeDataInner func(*data)
rtpEncoder *rtpmpeg4audio.Encoder
}
func newStreamTrackMPEG4Audio(
track *gortsplib.TrackMPEG4Audio,
generateRTPPackets bool,
writeDataInner func(*data),
) *streamTrackMPEG4Audio {
t := &streamTrackMPEG4Audio{
writeDataInner: writeDataInner,
}
if generateRTPPackets {
t.rtpEncoder = &rtpmpeg4audio.Encoder{
PayloadType: 96,
SampleRate: track.ClockRate(),
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
t.rtpEncoder.Init()
}
return t
}
func (t *streamTrackMPEG4Audio) generateRTPPackets(dat *data) {
pkts, err := t.rtpEncoder.Encode([][]byte{dat.mpeg4AudioAU}, dat.pts)
if err != nil {
return
}
for _, pkt := range pkts {
t.writeDataInner(&data{
trackID: dat.trackID,
rtpPacket: pkt,
ptsEqualsDTS: true,
})
}
}
func (t *streamTrackMPEG4Audio) writeData(dat *data) {
if dat.rtpPacket != nil {
t.writeDataInner(dat)
} else {
t.generateRTPPackets(dat)
}
}
Loading…
Cancel
Save