Browse Source

produce same absolute time in RTSP and HLS (#1249)

* add a NTP timestamp to each data unit
* use that NTP timestamp in all protocols
pull/1301/head
Alessandro Ros 3 years ago committed by GitHub
parent
commit
e605727c78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      internal/core/data.go
  2. 4
      internal/core/hls_muxer.go
  3. 2
      internal/core/hls_source.go
  4. 1
      internal/core/rpicamera_source.go
  5. 3
      internal/core/rtmp_conn.go
  6. 2
      internal/core/rtmp_source.go
  7. 3
      internal/core/rtsp_session.go
  8. 3
      internal/core/rtsp_source.go
  9. 2
      internal/core/stream.go
  10. 8
      internal/hls/muxer.go
  11. 4
      internal/hls/muxer_variant.go
  12. 8
      internal/hls/muxer_variant_fmp4.go
  13. 20
      internal/hls/muxer_variant_fmp4_segmenter.go
  14. 8
      internal/hls/muxer_variant_mpegts.go
  15. 20
      internal/hls/muxer_variant_mpegts_segmenter.go

16
internal/core/data.go

@ -10,11 +10,13 @@ import ( @@ -10,11 +10,13 @@ import (
type data interface {
getTrackID() int
getRTPPackets() []*rtp.Packet
getNTP() time.Time
}
type dataGeneric struct {
trackID int
rtpPackets []*rtp.Packet
ntp time.Time
}
func (d *dataGeneric) getTrackID() int {
@ -25,9 +27,14 @@ func (d *dataGeneric) getRTPPackets() []*rtp.Packet { @@ -25,9 +27,14 @@ func (d *dataGeneric) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataGeneric) getNTP() time.Time {
return d.ntp
}
type dataH264 struct {
trackID int
rtpPackets []*rtp.Packet
ntp time.Time
pts time.Duration
nalus [][]byte
}
@ -40,9 +47,14 @@ func (d *dataH264) getRTPPackets() []*rtp.Packet { @@ -40,9 +47,14 @@ func (d *dataH264) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataH264) getNTP() time.Time {
return d.ntp
}
type dataMPEG4Audio struct {
trackID int
rtpPackets []*rtp.Packet
ntp time.Time
pts time.Duration
aus [][]byte
}
@ -54,3 +66,7 @@ func (d *dataMPEG4Audio) getTrackID() int { @@ -54,3 +66,7 @@ func (d *dataMPEG4Audio) getTrackID() int {
func (d *dataMPEG4Audio) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}
func (d *dataMPEG4Audio) getNTP() time.Time {
return d.ntp
}

4
internal/core/hls_muxer.go

@ -433,7 +433,7 @@ func (m *hlsMuxer) runWriter( @@ -433,7 +433,7 @@ func (m *hlsMuxer) runWriter(
}
pts := tdata.pts - videoStartPTS
err := m.muxer.WriteH264(time.Now(), pts, tdata.nalus)
err := m.muxer.WriteH264(tdata.ntp, pts, tdata.nalus)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
@ -452,7 +452,7 @@ func (m *hlsMuxer) runWriter( @@ -452,7 +452,7 @@ func (m *hlsMuxer) runWriter(
for i, au := range tdata.aus {
err := m.muxer.WriteAAC(
time.Now(),
tdata.ntp,
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioTrack.ClockRate()),
au)

2
internal/core/hls_source.go

@ -82,6 +82,7 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -82,6 +82,7 @@ func (s *hlsSource) run(ctx context.Context) error {
trackID: videoTrackID,
pts: pts,
nalus: nalus,
ntp: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
@ -93,6 +94,7 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -93,6 +94,7 @@ func (s *hlsSource) run(ctx context.Context) error {
trackID: audioTrackID,
pts: pts,
aus: [][]byte{au},
ntp: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)

1
internal/core/rpicamera_source.go

@ -62,6 +62,7 @@ func (s *rpiCameraSource) run(ctx context.Context) error { @@ -62,6 +62,7 @@ func (s *rpiCameraSource) run(ctx context.Context) error {
trackID: 0,
pts: dts,
nalus: nalus,
ntp: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)

3
internal/core/rtmp_conn.go

@ -562,6 +562,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -562,6 +562,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
trackID: videoTrackID,
pts: tmsg.DTS + tmsg.PTSDelta,
nalus: nalus,
ntp: time.Now(),
})
if err != nil {
c.log(logger.Warn, "%v", err)
@ -600,6 +601,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -600,6 +601,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
trackID: videoTrackID,
pts: tmsg.DTS + tmsg.PTSDelta,
nalus: validNALUs,
ntp: time.Now(),
})
if err != nil {
c.log(logger.Warn, "%v", err)
@ -616,6 +618,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -616,6 +618,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
trackID: audioTrackID,
pts: tmsg.DTS,
aus: [][]byte{tmsg.Payload},
ntp: time.Now(),
})
if err != nil {
c.log(logger.Warn, "%v", err)

2
internal/core/rtmp_source.go

@ -174,6 +174,7 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -174,6 +174,7 @@ func (s *rtmpSource) run(ctx context.Context) error {
trackID: videoTrackID,
pts: tmsg.DTS + tmsg.PTSDelta,
nalus: nalus,
ntp: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)
@ -190,6 +191,7 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -190,6 +191,7 @@ func (s *rtmpSource) run(ctx context.Context) error {
trackID: audioTrackID,
pts: tmsg.DTS,
aus: [][]byte{tmsg.Payload},
ntp: time.Now(),
})
if err != nil {
s.Log(logger.Warn, "%v", err)

3
internal/core/rtsp_session.go

@ -386,18 +386,21 @@ func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) { @@ -386,18 +386,21 @@ func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
err = s.stream.writeData(&dataH264{
trackID: ctx.TrackID,
rtpPackets: []*rtp.Packet{ctx.Packet},
ntp: time.Now(),
})
case *gortsplib.TrackMPEG4Audio:
err = s.stream.writeData(&dataMPEG4Audio{
trackID: ctx.TrackID,
rtpPackets: []*rtp.Packet{ctx.Packet},
ntp: time.Now(),
})
default:
err = s.stream.writeData(&dataGeneric{
trackID: ctx.TrackID,
rtpPackets: []*rtp.Packet{ctx.Packet},
ntp: time.Now(),
})
}

3
internal/core/rtsp_source.go

@ -151,18 +151,21 @@ func (s *rtspSource) run(ctx context.Context) error { @@ -151,18 +151,21 @@ func (s *rtspSource) run(ctx context.Context) error {
err = res.stream.writeData(&dataH264{
trackID: ctx.TrackID,
rtpPackets: []*rtp.Packet{ctx.Packet},
ntp: time.Now(),
})
case *gortsplib.TrackMPEG4Audio:
err = res.stream.writeData(&dataMPEG4Audio{
trackID: ctx.TrackID,
rtpPackets: []*rtp.Packet{ctx.Packet},
ntp: time.Now(),
})
default:
err = res.stream.writeData(&dataGeneric{
trackID: ctx.TrackID,
rtpPackets: []*rtp.Packet{ctx.Packet},
ntp: time.Now(),
})
}

2
internal/core/stream.go

@ -112,7 +112,7 @@ func (s *stream) writeData(data data) error { @@ -112,7 +112,7 @@ func (s *stream) writeData(data data) error {
// forward RTP packets to RTSP readers
for _, pkt := range data.getRTPPackets() {
atomic.AddUint64(s.bytesReceived, uint64(pkt.MarshalSize()))
s.rtspStream.WritePacketRTP(data.getTrackID(), pkt)
s.rtspStream.WritePacketRTPWithNTP(data.getTrackID(), pkt, data.getNTP())
}
// forward data to non-RTSP readers

8
internal/hls/muxer.go

@ -77,13 +77,13 @@ func (m *Muxer) Close() { @@ -77,13 +77,13 @@ func (m *Muxer) Close() {
}
// WriteH264 writes H264 NALUs, grouped by timestamp.
func (m *Muxer) WriteH264(now time.Time, pts time.Duration, nalus [][]byte) error {
return m.variant.writeH264(now, pts, nalus)
func (m *Muxer) WriteH264(ntp time.Time, pts time.Duration, nalus [][]byte) error {
return m.variant.writeH264(ntp, pts, nalus)
}
// WriteAAC writes AAC AUs, grouped by timestamp.
func (m *Muxer) WriteAAC(now time.Time, pts time.Duration, au []byte) error {
return m.variant.writeAAC(now, pts, au)
func (m *Muxer) WriteAAC(ntp time.Time, pts time.Duration, au []byte) error {
return m.variant.writeAAC(ntp, pts, au)
}
// File returns a file reader.

4
internal/hls/muxer_variant.go

@ -16,7 +16,7 @@ const ( @@ -16,7 +16,7 @@ const (
type muxerVariant interface {
close()
writeH264(now time.Time, pts time.Duration, nalus [][]byte) error
writeAAC(now time.Time, pts time.Duration, au []byte) error
writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error
writeAAC(ntp time.Time, pts time.Duration, au []byte) error
file(name string, msn string, part string, skip string) *MuxerFileResponse
}

8
internal/hls/muxer_variant_fmp4.go

@ -63,12 +63,12 @@ func (v *muxerVariantFMP4) close() { @@ -63,12 +63,12 @@ func (v *muxerVariantFMP4) close() {
v.playlist.close()
}
func (v *muxerVariantFMP4) writeH264(now time.Time, pts time.Duration, nalus [][]byte) error {
return v.segmenter.writeH264(now, pts, nalus)
func (v *muxerVariantFMP4) writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error {
return v.segmenter.writeH264(ntp, pts, nalus)
}
func (v *muxerVariantFMP4) writeAAC(now time.Time, pts time.Duration, au []byte) error {
return v.segmenter.writeAAC(now, pts, au)
func (v *muxerVariantFMP4) writeAAC(ntp time.Time, pts time.Duration, au []byte) error {
return v.segmenter.writeAAC(ntp, pts, au)
}
func (v *muxerVariantFMP4) file(name string, msn string, part string, skip string) *MuxerFileResponse {

20
internal/hls/muxer_variant_fmp4_segmenter.go

@ -48,11 +48,13 @@ func findCompatiblePartDuration( @@ -48,11 +48,13 @@ func findCompatiblePartDuration(
type augmentedVideoSample struct {
fmp4.PartSample
dts time.Duration
ntp time.Time
}
type augmentedAudioSample struct {
fmp4.PartSample
dts time.Duration
ntp time.Time
}
type muxerVariantFMP4Segmenter struct {
@ -138,7 +140,7 @@ func (m *muxerVariantFMP4Segmenter) adjustPartDuration(du time.Duration) { @@ -138,7 +140,7 @@ func (m *muxerVariantFMP4Segmenter) adjustPartDuration(du time.Duration) {
}
}
func (m *muxerVariantFMP4Segmenter) writeH264(now time.Time, pts time.Duration, nalus [][]byte) error {
func (m *muxerVariantFMP4Segmenter) writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error {
idrPresent := false
nonIDRPresent := false
@ -157,11 +159,11 @@ func (m *muxerVariantFMP4Segmenter) writeH264(now time.Time, pts time.Duration, @@ -157,11 +159,11 @@ func (m *muxerVariantFMP4Segmenter) writeH264(now time.Time, pts time.Duration,
return nil
}
return m.writeH264Entry(now, pts, nalus, idrPresent)
return m.writeH264Entry(ntp, pts, nalus, idrPresent)
}
func (m *muxerVariantFMP4Segmenter) writeH264Entry(
now time.Time,
ntp time.Time,
pts time.Duration,
nalus [][]byte,
idrPresent bool,
@ -210,6 +212,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry( @@ -210,6 +212,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry(
Payload: avcc,
},
dts: dts,
ntp: ntp,
}
// put samples into a queue in order to
@ -226,7 +229,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry( @@ -226,7 +229,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry(
m.currentSegment = newMuxerVariantFMP4Segment(
m.lowLatency,
m.genSegmentID(),
now,
sample.ntp,
sample.dts,
m.segmentMaxSize,
m.videoTrack,
@ -261,7 +264,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry( @@ -261,7 +264,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry(
m.currentSegment = newMuxerVariantFMP4Segment(
m.lowLatency,
m.genSegmentID(),
now,
m.nextVideoSample.ntp,
m.nextVideoSample.dts,
m.segmentMaxSize,
m.videoTrack,
@ -282,7 +285,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry( @@ -282,7 +285,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry(
return nil
}
func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, au []byte) error {
func (m *muxerVariantFMP4Segmenter) writeAAC(ntp time.Time, dts time.Duration, au []byte) error {
if m.videoTrack != nil {
// wait for the video track
if !m.videoFirstIDRReceived {
@ -300,6 +303,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a @@ -300,6 +303,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a
Payload: au,
},
dts: dts,
ntp: ntp,
}
// put samples into a queue in order to compute the sample duration
@ -315,7 +319,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a @@ -315,7 +319,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a
m.currentSegment = newMuxerVariantFMP4Segment(
m.lowLatency,
m.genSegmentID(),
now,
sample.ntp,
sample.dts,
m.segmentMaxSize,
m.videoTrack,
@ -350,7 +354,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a @@ -350,7 +354,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a
m.currentSegment = newMuxerVariantFMP4Segment(
m.lowLatency,
m.genSegmentID(),
now,
m.nextAudioSample.ntp,
m.nextAudioSample.dts,
m.segmentMaxSize,
m.videoTrack,

8
internal/hls/muxer_variant_mpegts.go

@ -39,12 +39,12 @@ func (v *muxerVariantMPEGTS) close() { @@ -39,12 +39,12 @@ func (v *muxerVariantMPEGTS) close() {
v.playlist.close()
}
func (v *muxerVariantMPEGTS) writeH264(now time.Time, pts time.Duration, nalus [][]byte) error {
return v.segmenter.writeH264(now, pts, nalus)
func (v *muxerVariantMPEGTS) writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error {
return v.segmenter.writeH264(ntp, pts, nalus)
}
func (v *muxerVariantMPEGTS) writeAAC(now time.Time, pts time.Duration, au []byte) error {
return v.segmenter.writeAAC(now, pts, au)
func (v *muxerVariantMPEGTS) writeAAC(ntp time.Time, pts time.Duration, au []byte) error {
return v.segmenter.writeAAC(ntp, pts, au)
}
func (v *muxerVariantMPEGTS) file(name string, msn string, part string, skip string) *MuxerFileResponse {

20
internal/hls/muxer_variant_mpegts_segmenter.go

@ -56,7 +56,7 @@ func (m *muxerVariantMPEGTSSegmenter) genSegmentID() uint64 { @@ -56,7 +56,7 @@ func (m *muxerVariantMPEGTSSegmenter) genSegmentID() uint64 {
return id
}
func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration, nalus [][]byte) error {
func (m *muxerVariantMPEGTSSegmenter) writeH264(ntp time.Time, pts time.Duration, nalus [][]byte) error {
idrPresent := false
nonIDRPresent := false
@ -87,7 +87,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration @@ -87,7 +87,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration
return err
}
m.startPCR = now
m.startPCR = ntp
m.startDTS = dts
dts = 0
pts -= m.startDTS
@ -95,7 +95,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration @@ -95,7 +95,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration
// create first segment
m.currentSegment = newMuxerVariantMPEGTSSegment(
m.genSegmentID(),
now,
ntp,
m.segmentMaxSize,
m.videoTrack,
m.audioTrack,
@ -121,7 +121,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration @@ -121,7 +121,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration
m.onSegmentReady(m.currentSegment)
m.currentSegment = newMuxerVariantMPEGTSSegment(
m.genSegmentID(),
now,
ntp,
m.segmentMaxSize,
m.videoTrack,
m.audioTrack,
@ -130,7 +130,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration @@ -130,7 +130,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration
}
err := m.currentSegment.writeH264(
now.Sub(m.startPCR),
ntp.Sub(m.startPCR),
dts,
pts,
idrPresent,
@ -142,17 +142,17 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration @@ -142,17 +142,17 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(now time.Time, pts time.Duration
return nil
}
func (m *muxerVariantMPEGTSSegmenter) writeAAC(now time.Time, pts time.Duration, au []byte) error {
func (m *muxerVariantMPEGTSSegmenter) writeAAC(ntp time.Time, pts time.Duration, au []byte) error {
if m.videoTrack == nil {
if m.currentSegment == nil {
m.startPCR = now
m.startPCR = ntp
m.startDTS = pts
pts = 0
// create first segment
m.currentSegment = newMuxerVariantMPEGTSSegment(
m.genSegmentID(),
now,
ntp,
m.segmentMaxSize,
m.videoTrack,
m.audioTrack,
@ -167,7 +167,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(now time.Time, pts time.Duration, @@ -167,7 +167,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(now time.Time, pts time.Duration,
m.onSegmentReady(m.currentSegment)
m.currentSegment = newMuxerVariantMPEGTSSegment(
m.genSegmentID(),
now,
ntp,
m.segmentMaxSize,
m.videoTrack,
m.audioTrack,
@ -183,7 +183,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(now time.Time, pts time.Duration, @@ -183,7 +183,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(now time.Time, pts time.Duration,
pts -= m.startDTS
}
err := m.currentSegment.writeAAC(now.Sub(m.startPCR), pts, au)
err := m.currentSegment.writeAAC(ntp.Sub(m.startPCR), pts, au)
if err != nil {
return err
}

Loading…
Cancel
Save