Browse Source

move trackID into data

pull/923/head
aler9 3 years ago
parent
commit
58b2e7d24f
  1. 1
      internal/core/data.go
  2. 23
      internal/core/hls_muxer.go
  3. 9
      internal/core/hls_source.go
  4. 2
      internal/core/publisher.go
  5. 2
      internal/core/reader.go
  6. 46
      internal/core/rtmp_conn.go
  7. 9
      internal/core/rtmp_source.go
  8. 8
      internal/core/rtsp_session.go
  9. 6
      internal/core/rtsp_source.go
  10. 12
      internal/core/stream.go

1
internal/core/data.go

@ -7,6 +7,7 @@ import (
) )
type data struct { type data struct {
trackID int
rtp *rtp.Packet rtp *rtp.Packet
ptsEqualsDTS bool ptsEqualsDTS bool
h264NALUs [][]byte h264NALUs [][]byte

23
internal/core/hls_muxer.go

@ -105,11 +105,6 @@ type hlsMuxerRequest struct {
res chan hlsMuxerResponse res chan hlsMuxerResponse
} }
type hlsMuxerTrackIDDataPair struct {
trackID int
data *data
}
type hlsMuxerPathManager interface { type hlsMuxerPathManager interface {
onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
} }
@ -333,24 +328,24 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
go func() { go func() {
writerDone <- func() error { writerDone <- func() error {
for { for {
data, ok := m.ringBuffer.Pull() item, ok := m.ringBuffer.Pull()
if !ok { if !ok {
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
} }
pair := data.(hlsMuxerTrackIDDataPair) data := item.(*data)
if videoTrack != nil && pair.trackID == videoTrackID { if videoTrack != nil && data.trackID == videoTrackID {
if pair.data.h264NALUs == nil { if data.h264NALUs == nil {
continue continue
} }
err = m.muxer.WriteH264(pair.data.h264PTS, pair.data.h264NALUs) err = m.muxer.WriteH264(data.h264PTS, data.h264NALUs)
if err != nil { if err != nil {
m.log(logger.Warn, "unable to write segment: %v", err) m.log(logger.Warn, "unable to write segment: %v", err)
continue continue
} }
} else if audioTrack != nil && pair.trackID == audioTrackID { } else if audioTrack != nil && data.trackID == audioTrackID {
aus, pts, err := aacDecoder.Decode(pair.data.rtp) aus, pts, err := aacDecoder.Decode(data.rtp)
if err != nil { if err != nil {
if err != rtpaac.ErrMorePacketsNeeded { if err != rtpaac.ErrMorePacketsNeeded {
m.log(logger.Warn, "unable to decode audio track: %v", err) m.log(logger.Warn, "unable to decode audio track: %v", err)
@ -527,8 +522,8 @@ func (m *hlsMuxer) onReaderAccepted() {
} }
// onReaderData implements reader. // onReaderData implements reader.
func (m *hlsMuxer) onReaderData(trackID int, data *data) { func (m *hlsMuxer) onReaderData(data *data) {
m.ringBuffer.Push(hlsMuxerTrackIDDataPair{trackID, data}) m.ringBuffer.Push(data)
} }
// onReaderAPIDescribe implements reader. // onReaderAPIDescribe implements reader.

9
internal/core/hls_source.go

@ -150,12 +150,14 @@ func (s *hlsSource) runInner() bool {
lastPkt := len(pkts) - 1 lastPkt := len(pkts) - 1
for i, pkt := range pkts { for i, pkt := range pkts {
if i != lastPkt { if i != lastPkt {
stream.writeData(videoTrackID, &data{ stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: false, ptsEqualsDTS: false,
}) })
} else { } else {
stream.writeData(videoTrackID, &data{ stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: h264.IDRPresent(nalus), ptsEqualsDTS: h264.IDRPresent(nalus),
h264NALUs: nalus, h264NALUs: nalus,
@ -176,7 +178,8 @@ func (s *hlsSource) runInner() bool {
} }
for _, pkt := range pkts { for _, pkt := range pkts {
stream.writeData(audioTrackID, &data{ stream.writeData(&data{
trackID: audioTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: true, ptsEqualsDTS: true,
}) })

2
internal/core/publisher.go

@ -1,6 +1,6 @@
package core package core
// publisher is an entity that can publish a stream dynamically. // publisher is an entity that can publish a stream.
type publisher interface { type publisher interface {
source source
close() close()

2
internal/core/reader.go

@ -4,6 +4,6 @@ package core
type reader interface { type reader interface {
close() close()
onReaderAccepted() onReaderAccepted()
onReaderData(int, *data) onReaderData(*data)
onReaderAPIDescribe() interface{} onReaderAPIDescribe() interface{}
} }

46
internal/core/rtmp_conn.go

@ -44,11 +44,6 @@ const (
rtmpConnStatePublish rtmpConnStatePublish
) )
type rtmpConnTrackIDDataPair struct {
trackID int
data *data
}
type rtmpConnPathManager interface { type rtmpConnPathManager interface {
onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
@ -331,14 +326,14 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
videoFirstIDRFound := false videoFirstIDRFound := false
for { for {
data, ok := c.ringBuffer.Pull() item, ok := c.ringBuffer.Pull()
if !ok { if !ok {
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
} }
pair := data.(rtmpConnTrackIDDataPair) data := item.(*data)
if videoTrack != nil && pair.trackID == videoTrackID { if videoTrack != nil && data.trackID == videoTrackID {
if pair.data.h264NALUs == nil { if data.h264NALUs == nil {
continue continue
} }
@ -346,35 +341,35 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
// wait until we receive an IDR // wait until we receive an IDR
if !videoFirstIDRFound { if !videoFirstIDRFound {
if !h264.IDRPresent(pair.data.h264NALUs) { if !h264.IDRPresent(data.h264NALUs) {
continue continue
} }
videoFirstIDRFound = true videoFirstIDRFound = true
videoStartPTS = pair.data.h264PTS videoStartPTS = data.h264PTS
videoDTSEst = h264.NewDTSEstimator() videoDTSEst = h264.NewDTSEstimator()
} }
data, err := h264.EncodeAVCC(pair.data.h264NALUs) avcc, err := h264.EncodeAVCC(data.h264NALUs)
if err != nil { if err != nil {
return err return err
} }
pts := pair.data.h264PTS - videoStartPTS pts := data.h264PTS - videoStartPTS
dts := videoDTSEst.Feed(pts) dts := videoDTSEst.Feed(pts)
c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = c.conn.WritePacket(av.Packet{ err = c.conn.WritePacket(av.Packet{
Type: av.H264, Type: av.H264,
Data: data, Data: avcc,
Time: dts, Time: dts,
CTime: pts - dts, CTime: pts - dts,
}) })
if err != nil { if err != nil {
return err return err
} }
} else if audioTrack != nil && pair.trackID == audioTrackID { } else if audioTrack != nil && data.trackID == audioTrackID {
aus, pts, err := aacDecoder.Decode(pair.data.rtp) aus, pts, err := aacDecoder.Decode(data.rtp)
if err != nil { if err != nil {
if err != rtpaac.ErrMorePacketsNeeded { if err != rtpaac.ErrMorePacketsNeeded {
c.log(logger.Warn, "unable to decode audio track: %v", err) c.log(logger.Warn, "unable to decode audio track: %v", err)
@ -510,12 +505,14 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
lastPkt := len(pkts) - 1 lastPkt := len(pkts) - 1
for i, pkt := range pkts { for i, pkt := range pkts {
if i != lastPkt { if i != lastPkt {
rres.stream.writeData(videoTrackID, &data{ rres.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: false, ptsEqualsDTS: false,
}) })
} else { } else {
rres.stream.writeData(videoTrackID, &data{ rres.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: h264.IDRPresent(nalus), ptsEqualsDTS: h264.IDRPresent(nalus),
h264NALUs: nalus, h264NALUs: nalus,
@ -544,12 +541,14 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
lastPkt := len(pkts) - 1 lastPkt := len(pkts) - 1
for i, pkt := range pkts { for i, pkt := range pkts {
if i != lastPkt { if i != lastPkt {
rres.stream.writeData(videoTrackID, &data{ rres.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: false, ptsEqualsDTS: false,
}) })
} else { } else {
rres.stream.writeData(videoTrackID, &data{ rres.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: h264.IDRPresent(nalus), ptsEqualsDTS: h264.IDRPresent(nalus),
h264NALUs: nalus, h264NALUs: nalus,
@ -569,7 +568,8 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
} }
for _, pkt := range pkts { for _, pkt := range pkts {
rres.stream.writeData(audioTrackID, &data{ rres.stream.writeData(&data{
trackID: audioTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: true, ptsEqualsDTS: true,
}) })
@ -630,8 +630,8 @@ func (c *rtmpConn) onReaderAccepted() {
} }
// onReaderData implements reader. // onReaderData implements reader.
func (c *rtmpConn) onReaderData(trackID int, data *data) { func (c *rtmpConn) onReaderData(data *data) {
c.ringBuffer.Push(rtmpConnTrackIDDataPair{trackID, data}) c.ringBuffer.Push(data)
} }
// onReaderAPIDescribe implements reader. // onReaderAPIDescribe implements reader.

9
internal/core/rtmp_source.go

@ -194,12 +194,14 @@ func (s *rtmpSource) runInner() bool {
lastPkt := len(pkts) - 1 lastPkt := len(pkts) - 1
for i, pkt := range pkts { for i, pkt := range pkts {
if i != lastPkt { if i != lastPkt {
res.stream.writeData(videoTrackID, &data{ res.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: false, ptsEqualsDTS: false,
}) })
} else { } else {
res.stream.writeData(videoTrackID, &data{ res.stream.writeData(&data{
trackID: videoTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: h264.IDRPresent(nalus), ptsEqualsDTS: h264.IDRPresent(nalus),
h264NALUs: nalus, h264NALUs: nalus,
@ -219,7 +221,8 @@ func (s *rtmpSource) runInner() bool {
} }
for _, pkt := range pkts { for _, pkt := range pkts {
res.stream.writeData(audioTrackID, &data{ res.stream.writeData(&data{
trackID: audioTrackID,
rtp: pkt, rtp: pkt,
ptsEqualsDTS: true, ptsEqualsDTS: true,
}) })

8
internal/core/rtsp_session.go

@ -342,7 +342,7 @@ func (s *rtspSession) onReaderAccepted() {
} }
// onReaderData implements reader. // onReaderData implements reader.
func (s *rtspSession) onReaderData(trackID int, data *data) { func (s *rtspSession) onReaderData(data *data) {
// packets are routed to the session by gortsplib.ServerStream. // packets are routed to the session by gortsplib.ServerStream.
} }
@ -393,14 +393,16 @@ func (s *rtspSession) onPublisherAccepted(tracksLen int) {
// onPacketRTP is called by rtspServer. // onPacketRTP is called by rtspServer.
func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) { func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
if ctx.H264NALUs != nil { if ctx.H264NALUs != nil {
s.stream.writeData(ctx.TrackID, &data{ s.stream.writeData(&data{
trackID: ctx.TrackID,
rtp: ctx.Packet, rtp: ctx.Packet,
ptsEqualsDTS: ctx.PTSEqualsDTS, ptsEqualsDTS: ctx.PTSEqualsDTS,
h264NALUs: append([][]byte(nil), ctx.H264NALUs...), h264NALUs: append([][]byte(nil), ctx.H264NALUs...),
h264PTS: ctx.H264PTS, h264PTS: ctx.H264PTS,
}) })
} else { } else {
s.stream.writeData(ctx.TrackID, &data{ s.stream.writeData(&data{
trackID: ctx.TrackID,
rtp: ctx.Packet, rtp: ctx.Packet,
ptsEqualsDTS: ctx.PTSEqualsDTS, ptsEqualsDTS: ctx.PTSEqualsDTS,
}) })

6
internal/core/rtsp_source.go

@ -199,14 +199,16 @@ func (s *rtspSource) runInner() bool {
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) { c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
if ctx.H264NALUs != nil { if ctx.H264NALUs != nil {
res.stream.writeData(ctx.TrackID, &data{ res.stream.writeData(&data{
trackID: ctx.TrackID,
rtp: ctx.Packet, rtp: ctx.Packet,
ptsEqualsDTS: ctx.PTSEqualsDTS, ptsEqualsDTS: ctx.PTSEqualsDTS,
h264NALUs: append([][]byte(nil), ctx.H264NALUs...), h264NALUs: append([][]byte(nil), ctx.H264NALUs...),
h264PTS: ctx.H264PTS, h264PTS: ctx.H264PTS,
}) })
} else { } else {
res.stream.writeData(ctx.TrackID, &data{ res.stream.writeData(&data{
trackID: ctx.TrackID,
rtp: ctx.Packet, rtp: ctx.Packet,
ptsEqualsDTS: ctx.PTSEqualsDTS, ptsEqualsDTS: ctx.PTSEqualsDTS,
}) })

12
internal/core/stream.go

@ -37,12 +37,12 @@ func (m *streamNonRTSPReadersMap) remove(r reader) {
delete(m.ma, r) delete(m.ma, r)
} }
func (m *streamNonRTSPReadersMap) forwardPacketRTP(trackID int, data *data) { func (m *streamNonRTSPReadersMap) forwardPacketRTP(data *data) {
m.mutex.RLock() m.mutex.RLock()
defer m.mutex.RUnlock() defer m.mutex.RUnlock()
for c := range m.ma { for c := range m.ma {
c.onReaderData(trackID, data) c.onReaderData(data)
} }
} }
@ -126,16 +126,16 @@ func (s *stream) remuxH264NALUs(h264track *gortsplib.TrackH264, data *data) {
data.h264NALUs = filteredNALUs data.h264NALUs = filteredNALUs
} }
func (s *stream) writeData(trackID int, data *data) { func (s *stream) writeData(data *data) {
track := s.rtspStream.Tracks()[trackID] track := s.rtspStream.Tracks()[data.trackID]
if h264track, ok := track.(*gortsplib.TrackH264); ok { if h264track, ok := track.(*gortsplib.TrackH264); ok {
s.updateH264TrackParameters(h264track, data.h264NALUs) s.updateH264TrackParameters(h264track, data.h264NALUs)
s.remuxH264NALUs(h264track, data) s.remuxH264NALUs(h264track, data)
} }
// forward to RTSP readers // forward to RTSP readers
s.rtspStream.WritePacketRTP(trackID, data.rtp, data.ptsEqualsDTS) s.rtspStream.WritePacketRTP(data.trackID, data.rtp, data.ptsEqualsDTS)
// forward to non-RTSP readers // forward to non-RTSP readers
s.nonRTSPReaders.forwardPacketRTP(trackID, data) s.nonRTSPReaders.forwardPacketRTP(data)
} }

Loading…
Cancel
Save