Browse Source

hls muxer, rtmp server: extract DTS from samples

pull/1003/head
aler9 3 years ago
parent
commit
2ed1aa3d11
  1. 30
      internal/core/rtmp_conn.go
  2. 16
      internal/hls/muxer_variant_mpegts_segment.go
  3. 28
      internal/hls/muxer_variant_mpegts_segmenter.go

30
internal/core/rtmp_conn.go

@ -27,6 +27,7 @@ import ( @@ -27,6 +27,7 @@ import (
const (
rtmpConnPauseAfterAuthError = 2 * time.Second
rtmpPTSDTSOffset = 400 * time.Millisecond // 2 samples @ 5fps
)
func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) {
@ -330,7 +331,8 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -330,7 +331,8 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
var videoInitialPTS *time.Duration
videoFirstIDRFound := false
var videoFirstIDRPTS time.Duration
var videoDTSEst *h264.DTSEstimator
videoDTSExtractor := h264.NewDTSExtractor()
var videoSPS *h264.SPS
for {
item, ok := c.ringBuffer.Pull()
@ -361,16 +363,18 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -361,16 +363,18 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
videoFirstIDRFound = true
videoFirstIDRPTS = pts
videoDTSEst = h264.NewDTSEstimator()
}
if h264.IDRPresent(data.h264NALUs) {
sps := videoTrack.SPS()
pps := videoTrack.PPS()
codec := nh264.Codec{
SPS: map[int][]byte{
0: videoTrack.SPS(),
0: sps,
},
PPS: map[int][]byte{
0: videoTrack.PPS(),
0: pps,
},
}
b := make([]byte, 128)
@ -385,6 +389,13 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -385,6 +389,13 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
if err != nil {
return err
}
var psps h264.SPS
err := psps.Unmarshal(sps)
if err != nil {
return err
}
videoSPS = &psps
}
avcc, err := h264.AVCCEncode(data.h264NALUs)
@ -393,14 +404,18 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -393,14 +404,18 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
}
pts -= videoFirstIDRPTS
dts := videoDTSEst.Feed(pts)
dts, err := videoDTSExtractor.Extract(
data.h264NALUs, h264.IDRPresent(data.h264NALUs), pts, videoSPS)
if err != nil {
return err
}
c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = c.conn.WritePacket(av.Packet{
Type: av.H264,
Data: avcc,
Time: dts,
CTime: pts - dts,
CTime: rtmpPTSDTSOffset + pts - dts,
})
if err != nil {
return err
@ -428,7 +443,8 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -428,7 +443,8 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
err := c.conn.WritePacket(av.Packet{
Type: av.AAC,
Data: au,
Time: pts + time.Duration(i)*aac.SamplesPerAccessUnit*time.Second/time.Duration(audioTrack.ClockRate()),
Time: rtmpPTSDTSOffset + pts +
time.Duration(i)*aac.SamplesPerAccessUnit*time.Second/time.Duration(audioTrack.ClockRate()),
})
if err != nil {
return err

16
internal/hls/muxer_variant_mpegts_segment.go

@ -14,8 +14,8 @@ import ( @@ -14,8 +14,8 @@ import (
)
const (
// an offset between PCR and PTS/DTS is needed to avoid PCR > PTS
pcrOffset = 500 * time.Millisecond
mpegtsPCROffset = 400 * time.Millisecond // 2 samples @ 5fps
mpegtsPTSDTSOffset = 400 * time.Millisecond // 2 samples @ 5fps
)
type muxerVariantMPEGTSSegment struct {
@ -110,13 +110,15 @@ func (t *muxerVariantMPEGTSSegment) writeH264( @@ -110,13 +110,15 @@ func (t *muxerVariantMPEGTSSegment) writeH264(
MarkerBits: 2,
}
pts += mpegtsPTSDTSOffset
if dts == pts {
oh.PTSDTSIndicator = astits.PTSDTSIndicatorOnlyPTS
oh.PTS = &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)}
oh.PTS = &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)}
} else {
oh.PTSDTSIndicator = astits.PTSDTSIndicatorBothPresent
oh.DTS = &astits.ClockReference{Base: int64((dts + pcrOffset).Seconds() * 90000)}
oh.PTS = &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)}
oh.DTS = &astits.ClockReference{Base: int64((dts + mpegtsPCROffset).Seconds() * 90000)}
oh.PTS = &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)}
}
_, err = t.writeData(&astits.MuxerData{
@ -180,6 +182,8 @@ func (t *muxerVariantMPEGTSSegment) writeAAC( @@ -180,6 +182,8 @@ func (t *muxerVariantMPEGTSSegment) writeAAC(
t.pcrSendCounter--
}
pts += mpegtsPTSDTSOffset
_, err = t.writeData(&astits.MuxerData{
PID: 257,
AdaptationField: af,
@ -188,7 +192,7 @@ func (t *muxerVariantMPEGTSSegment) writeAAC( @@ -188,7 +192,7 @@ func (t *muxerVariantMPEGTSSegment) writeAAC(
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS,
PTS: &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)},
PTS: &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)},
},
PacketLength: uint16(len(enc) + 8),
StreamID: 192, // audio

28
internal/hls/muxer_variant_mpegts_segmenter.go

@ -28,7 +28,8 @@ type muxerVariantMPEGTSSegmenter struct { @@ -28,7 +28,8 @@ type muxerVariantMPEGTSSegmenter struct {
writer *astits.Muxer
currentSegment *muxerVariantMPEGTSSegment
videoDTSEst *h264.DTSEstimator
videoSPS *h264.SPS
videoDTSExtractor *h264.DTSExtractor
startPCR time.Time
startPTS time.Duration
}
@ -46,6 +47,7 @@ func newMuxerVariantMPEGTSSegmenter( @@ -46,6 +47,7 @@ func newMuxerVariantMPEGTSSegmenter(
videoTrack: videoTrack,
audioTrack: audioTrack,
onSegmentReady: onSegmentReady,
videoDTSExtractor: h264.NewDTSExtractor(),
}
m.writer = astits.NewMuxer(
@ -91,7 +93,6 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt @@ -91,7 +93,6 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt
m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize,
m.videoTrack, m.audioTrack, m.writer.WriteData)
m.startPCR = now
m.videoDTSEst = h264.NewDTSEstimator()
m.startPTS = pts
pts = 0
} else {
@ -108,10 +109,27 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt @@ -108,10 +109,27 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt
}
}
dts := m.videoDTSEst.Feed(pts)
if idrPresent {
sps := m.videoTrack.SPS()
var psps h264.SPS
err := psps.Unmarshal(sps)
if err != nil {
return err
}
m.videoSPS = &psps
}
dts, err := m.videoDTSExtractor.Extract(nalus, idrPresent, pts, m.videoSPS)
if err != nil {
return err
}
err := m.currentSegment.writeH264(now.Sub(m.startPCR), dts,
pts, idrPresent, nalus)
err = m.currentSegment.writeH264(
now.Sub(m.startPCR),
dts,
pts,
idrPresent,
nalus)
if err != nil {
if m.currentSegment.buf.Len() > 0 {
m.onSegmentReady(m.currentSegment)

Loading…
Cancel
Save