From 2ed1aa3d1172e84c1cbf03331e07b87f7f3b8327 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 2 Jun 2022 12:42:59 +0200 Subject: [PATCH] hls muxer, rtmp server: extract DTS from samples --- internal/core/rtmp_conn.go | 30 +++++++++--- internal/hls/muxer_variant_mpegts_segment.go | 16 ++++--- .../hls/muxer_variant_mpegts_segmenter.go | 46 +++++++++++++------ 3 files changed, 65 insertions(+), 27 deletions(-) diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index e646df02..90441d0a 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -27,6 +27,7 @@ import ( const ( rtmpConnPauseAfterAuthError = 2 * time.Second + rtmpPTSDTSOffset = 400 * time.Millisecond // 2 samples @ 5fps ) func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) { @@ -330,7 +331,8 @@ func (c *rtmpConn) runRead(ctx context.Context) error { var videoInitialPTS *time.Duration videoFirstIDRFound := false var videoFirstIDRPTS time.Duration - var videoDTSEst *h264.DTSEstimator + videoDTSExtractor := h264.NewDTSExtractor() + var videoSPS *h264.SPS for { item, ok := c.ringBuffer.Pull() @@ -361,16 +363,18 @@ func (c *rtmpConn) runRead(ctx context.Context) error { videoFirstIDRFound = true videoFirstIDRPTS = pts - videoDTSEst = h264.NewDTSEstimator() } if h264.IDRPresent(data.h264NALUs) { + sps := videoTrack.SPS() + pps := videoTrack.PPS() + codec := nh264.Codec{ SPS: map[int][]byte{ - 0: videoTrack.SPS(), + 0: sps, }, PPS: map[int][]byte{ - 0: videoTrack.PPS(), + 0: pps, }, } b := make([]byte, 128) @@ -385,6 +389,13 @@ func (c *rtmpConn) runRead(ctx context.Context) error { if err != nil { return err } + + var psps h264.SPS + err := psps.Unmarshal(sps) + if err != nil { + return err + } + videoSPS = &psps } avcc, err := h264.AVCCEncode(data.h264NALUs) @@ -393,14 +404,18 @@ func (c *rtmpConn) runRead(ctx context.Context) error { } pts -= videoFirstIDRPTS - dts := videoDTSEst.Feed(pts) + dts, err := videoDTSExtractor.Extract( + data.h264NALUs, h264.IDRPresent(data.h264NALUs), pts, videoSPS) + if err != nil { + return err + } c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) err = c.conn.WritePacket(av.Packet{ Type: av.H264, Data: avcc, Time: dts, - CTime: pts - dts, + CTime: rtmpPTSDTSOffset + pts - dts, }) if err != nil { return err @@ -428,7 +443,8 @@ func (c *rtmpConn) runRead(ctx context.Context) error { err := c.conn.WritePacket(av.Packet{ Type: av.AAC, Data: au, - Time: pts + time.Duration(i)*aac.SamplesPerAccessUnit*time.Second/time.Duration(audioTrack.ClockRate()), + Time: rtmpPTSDTSOffset + pts + + time.Duration(i)*aac.SamplesPerAccessUnit*time.Second/time.Duration(audioTrack.ClockRate()), }) if err != nil { return err diff --git a/internal/hls/muxer_variant_mpegts_segment.go b/internal/hls/muxer_variant_mpegts_segment.go index bde62618..ec1e0f92 100644 --- a/internal/hls/muxer_variant_mpegts_segment.go +++ b/internal/hls/muxer_variant_mpegts_segment.go @@ -14,8 +14,8 @@ import ( ) const ( - // an offset between PCR and PTS/DTS is needed to avoid PCR > PTS - pcrOffset = 500 * time.Millisecond + mpegtsPCROffset = 400 * time.Millisecond // 2 samples @ 5fps + mpegtsPTSDTSOffset = 400 * time.Millisecond // 2 samples @ 5fps ) type muxerVariantMPEGTSSegment struct { @@ -110,13 +110,15 @@ func (t *muxerVariantMPEGTSSegment) writeH264( MarkerBits: 2, } + pts += mpegtsPTSDTSOffset + if dts == pts { oh.PTSDTSIndicator = astits.PTSDTSIndicatorOnlyPTS - oh.PTS = &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)} + oh.PTS = &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)} } else { oh.PTSDTSIndicator = astits.PTSDTSIndicatorBothPresent - oh.DTS = &astits.ClockReference{Base: int64((dts + pcrOffset).Seconds() * 90000)} - oh.PTS = &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)} + oh.DTS = &astits.ClockReference{Base: int64((dts + mpegtsPCROffset).Seconds() * 90000)} + oh.PTS = &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)} } _, err = t.writeData(&astits.MuxerData{ @@ -180,6 +182,8 @@ func (t *muxerVariantMPEGTSSegment) writeAAC( t.pcrSendCounter-- } + pts += mpegtsPTSDTSOffset + _, err = t.writeData(&astits.MuxerData{ PID: 257, AdaptationField: af, @@ -188,7 +192,7 @@ func (t *muxerVariantMPEGTSSegment) writeAAC( OptionalHeader: &astits.PESOptionalHeader{ MarkerBits: 2, PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, - PTS: &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)}, + PTS: &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)}, }, PacketLength: uint16(len(enc) + 8), StreamID: 192, // audio diff --git a/internal/hls/muxer_variant_mpegts_segmenter.go b/internal/hls/muxer_variant_mpegts_segmenter.go index 40d3cb25..8536e763 100644 --- a/internal/hls/muxer_variant_mpegts_segmenter.go +++ b/internal/hls/muxer_variant_mpegts_segmenter.go @@ -26,11 +26,12 @@ type muxerVariantMPEGTSSegmenter struct { audioTrack *gortsplib.TrackAAC onSegmentReady func(*muxerVariantMPEGTSSegment) - writer *astits.Muxer - currentSegment *muxerVariantMPEGTSSegment - videoDTSEst *h264.DTSEstimator - startPCR time.Time - startPTS time.Duration + writer *astits.Muxer + currentSegment *muxerVariantMPEGTSSegment + videoSPS *h264.SPS + videoDTSExtractor *h264.DTSExtractor + startPCR time.Time + startPTS time.Duration } func newMuxerVariantMPEGTSSegmenter( @@ -41,11 +42,12 @@ func newMuxerVariantMPEGTSSegmenter( onSegmentReady func(*muxerVariantMPEGTSSegment), ) *muxerVariantMPEGTSSegmenter { m := &muxerVariantMPEGTSSegmenter{ - segmentDuration: segmentDuration, - segmentMaxSize: segmentMaxSize, - videoTrack: videoTrack, - audioTrack: audioTrack, - onSegmentReady: onSegmentReady, + segmentDuration: segmentDuration, + segmentMaxSize: segmentMaxSize, + videoTrack: videoTrack, + audioTrack: audioTrack, + onSegmentReady: onSegmentReady, + videoDTSExtractor: h264.NewDTSExtractor(), } m.writer = astits.NewMuxer( @@ -91,7 +93,6 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize, m.videoTrack, m.audioTrack, m.writer.WriteData) m.startPCR = now - m.videoDTSEst = h264.NewDTSEstimator() m.startPTS = pts pts = 0 } else { @@ -108,10 +109,27 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt } } - dts := m.videoDTSEst.Feed(pts) + if idrPresent { + sps := m.videoTrack.SPS() + var psps h264.SPS + err := psps.Unmarshal(sps) + if err != nil { + return err + } + m.videoSPS = &psps + } + + dts, err := m.videoDTSExtractor.Extract(nalus, idrPresent, pts, m.videoSPS) + if err != nil { + return err + } - err := m.currentSegment.writeH264(now.Sub(m.startPCR), dts, - pts, idrPresent, nalus) + err = m.currentSegment.writeH264( + now.Sub(m.startPCR), + dts, + pts, + idrPresent, + nalus) if err != nil { if m.currentSegment.buf.Len() > 0 { m.onSegmentReady(m.currentSegment)