Browse Source

hls: move mpegts writer into dedicated folder

pull/1128/head
aler9 3 years ago
parent
commit
5ad2ea8924
  1. 200
      internal/hls/mpegts/writer.go
  2. 6
      internal/hls/muxer_variant_fmp4_part.go
  3. 6
      internal/hls/muxer_variant_fmp4_segment.go
  4. 150
      internal/hls/muxer_variant_mpegts_segment.go
  5. 52
      internal/hls/muxer_variant_mpegts_segmenter.go

200
internal/hls/mpegts/writer.go

@ -0,0 +1,200 @@ @@ -0,0 +1,200 @@
package mpegts
import (
"bytes"
"context"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/asticode/go-astits"
)
const (
pcrOffset = 400 * time.Millisecond // 2 samples @ 5fps
)
type writerFunc func(p []byte) (int, error)
func (f writerFunc) Write(p []byte) (int, error) {
return f(p)
}
// Writer is a MPEG-TS writer.
type Writer struct {
videoTrack *gortsplib.TrackH264
audioTrack *gortsplib.TrackMPEG4Audio
buf *bytes.Buffer
inner *astits.Muxer
pcrCounter int
}
// NewWriter allocates a Writer.
func NewWriter(
videoTrack *gortsplib.TrackH264,
audioTrack *gortsplib.TrackMPEG4Audio,
) *Writer {
w := &Writer{
videoTrack: videoTrack,
audioTrack: audioTrack,
buf: bytes.NewBuffer(nil),
}
w.inner = astits.NewMuxer(
context.Background(),
writerFunc(func(p []byte) (int, error) {
return w.buf.Write(p)
}))
if videoTrack != nil {
w.inner.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 256,
StreamType: astits.StreamTypeH264Video,
})
}
if audioTrack != nil {
w.inner.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 257,
StreamType: astits.StreamTypeAACAudio,
})
}
if videoTrack != nil {
w.inner.SetPCRPID(256)
} else {
w.inner.SetPCRPID(257)
}
// WriteTable() is not necessary
// since it's called automatically when WriteData() is called with
// * PID == PCRPID
// * AdaptationField != nil
// * RandomAccessIndicator = true
return w
}
// GenerateSegment generates a MPEG-TS segment.
func (w *Writer) GenerateSegment() []byte {
w.pcrCounter = 0
ret := w.buf.Bytes()
w.buf = bytes.NewBuffer(nil)
return ret
}
// WriteH264 writes a group of H264 NALUs.
func (w *Writer) WriteH264(
pcr time.Duration,
dts time.Duration,
pts time.Duration,
idrPresent bool,
nalus [][]byte,
) error {
// prepend an AUD. This is required by video.js and iOS
nalus = append([][]byte{{byte(h264.NALUTypeAccessUnitDelimiter), 240}}, nalus...)
enc, err := h264.AnnexBMarshal(nalus)
if err != nil {
return err
}
var af *astits.PacketAdaptationField
if idrPresent {
af = &astits.PacketAdaptationField{}
af.RandomAccessIndicator = true
}
// send PCR once in a while
if w.pcrCounter == 0 {
if af == nil {
af = &astits.PacketAdaptationField{}
}
af.HasPCR = true
af.PCR = &astits.ClockReference{Base: int64(pcr.Seconds() * 90000)}
w.pcrCounter = 3
}
w.pcrCounter--
oh := &astits.PESOptionalHeader{
MarkerBits: 2,
}
if dts == pts {
oh.PTSDTSIndicator = astits.PTSDTSIndicatorOnlyPTS
oh.PTS = &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)}
} else {
oh.PTSDTSIndicator = astits.PTSDTSIndicatorBothPresent
oh.DTS = &astits.ClockReference{Base: int64((dts + pcrOffset).Seconds() * 90000)}
oh.PTS = &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)}
}
_, err = w.inner.WriteData(&astits.MuxerData{
PID: 256,
AdaptationField: af,
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: oh,
StreamID: 224, // video
},
Data: enc,
},
})
return err
}
// WriteAAC writes an AAC AU.
func (w *Writer) WriteAAC(
pcr time.Duration,
pts time.Duration,
au []byte,
) error {
pkts := mpeg4audio.ADTSPackets{
{
Type: w.audioTrack.Config.Type,
SampleRate: w.audioTrack.Config.SampleRate,
ChannelCount: w.audioTrack.Config.ChannelCount,
AU: au,
},
}
enc, err := pkts.Marshal()
if err != nil {
return err
}
af := &astits.PacketAdaptationField{
RandomAccessIndicator: true,
}
if w.videoTrack == nil {
// send PCR once in a while
if w.pcrCounter == 0 {
af.HasPCR = true
af.PCR = &astits.ClockReference{Base: int64(pcr.Seconds() * 90000)}
w.pcrCounter = 3
}
w.pcrCounter--
}
_, err = w.inner.WriteData(&astits.MuxerData{
PID: 257,
AdaptationField: af,
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS,
PTS: &astits.ClockReference{Base: int64((pts + pcrOffset).Seconds() * 90000)},
},
PacketLength: uint16(len(enc) + 8),
StreamID: 192, // audio
},
Data: enc,
},
})
return err
}

6
internal/hls/muxer_variant_fmp4_part.go

@ -24,7 +24,7 @@ type muxerVariantFMP4Part struct { @@ -24,7 +24,7 @@ type muxerVariantFMP4Part struct {
isIndependent bool
videoSamples []*fmp4.VideoSample
audioSamples []*fmp4.AudioSample
renderedContent []byte
content []byte
renderedDuration time.Duration
}
@ -51,7 +51,7 @@ func (p *muxerVariantFMP4Part) name() string { @@ -51,7 +51,7 @@ func (p *muxerVariantFMP4Part) name() string {
}
func (p *muxerVariantFMP4Part) reader() io.Reader {
return bytes.NewReader(p.renderedContent)
return bytes.NewReader(p.content)
}
func (p *muxerVariantFMP4Part) duration() time.Duration {
@ -73,7 +73,7 @@ func (p *muxerVariantFMP4Part) duration() time.Duration { @@ -73,7 +73,7 @@ func (p *muxerVariantFMP4Part) duration() time.Duration {
func (p *muxerVariantFMP4Part) finalize() error {
if len(p.videoSamples) > 0 || len(p.audioSamples) > 0 {
var err error
p.renderedContent, err = fmp4.GeneratePart(
p.content, err = fmp4.GeneratePart(
p.videoTrack,
p.audioTrack,
p.videoSamples,

6
internal/hls/muxer_variant_fmp4_segment.go

@ -26,11 +26,11 @@ func (mbr *partsReader) Read(p []byte) (int, error) { @@ -26,11 +26,11 @@ func (mbr *partsReader) Read(p []byte) (int, error) {
return n, io.EOF
}
copied := copy(p[n:], mbr.parts[mbr.curPart].renderedContent[mbr.curPos:])
copied := copy(p[n:], mbr.parts[mbr.curPart].content[mbr.curPos:])
mbr.curPos += copied
n += copied
if mbr.curPos == len(mbr.parts[mbr.curPart].renderedContent) {
if mbr.curPos == len(mbr.parts[mbr.curPart].content) {
mbr.curPart++
mbr.curPos = 0
}
@ -111,7 +111,7 @@ func (s *muxerVariantFMP4Segment) finalize( @@ -111,7 +111,7 @@ func (s *muxerVariantFMP4Segment) finalize(
return err
}
if s.currentPart.renderedContent != nil {
if s.currentPart.content != nil {
s.onPartFinalized(s.currentPart)
s.parts = append(s.parts, s.currentPart)
}

150
internal/hls/muxer_variant_mpegts_segment.go

@ -8,28 +8,23 @@ import ( @@ -8,28 +8,23 @@ import (
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/asticode/go-astits"
)
const (
mpegtsPCROffset = 400 * time.Millisecond // 2 samples @ 5fps
"github.com/aler9/rtsp-simple-server/internal/hls/mpegts"
)
type muxerVariantMPEGTSSegment struct {
segmentMaxSize uint64
videoTrack *gortsplib.TrackH264
audioTrack *gortsplib.TrackMPEG4Audio
writeData func(*astits.MuxerData) (int, error)
startTime time.Time
name string
buf bytes.Buffer
startDTS *time.Duration
endDTS time.Duration
pcrSendCounter int
audioAUCount int
writer *mpegts.Writer
size uint64
startTime time.Time
name string
startDTS *time.Duration
endDTS time.Duration
audioAUCount int
content []byte
}
func newMuxerVariantMPEGTSSegment(
@ -37,22 +32,17 @@ func newMuxerVariantMPEGTSSegment( @@ -37,22 +32,17 @@ func newMuxerVariantMPEGTSSegment(
segmentMaxSize uint64,
videoTrack *gortsplib.TrackH264,
audioTrack *gortsplib.TrackMPEG4Audio,
writeData func(*astits.MuxerData) (int, error),
writer *mpegts.Writer,
) *muxerVariantMPEGTSSegment {
t := &muxerVariantMPEGTSSegment{
segmentMaxSize: segmentMaxSize,
videoTrack: videoTrack,
audioTrack: audioTrack,
writeData: writeData,
writer: writer,
startTime: startTime,
name: strconv.FormatInt(startTime.Unix(), 10),
}
// WriteTable() is called automatically when WriteData() is called with
// - PID == PCRPID
// - AdaptationField != nil
// - RandomAccessIndicator = true
return t
}
@ -60,16 +50,13 @@ func (t *muxerVariantMPEGTSSegment) duration() time.Duration { @@ -60,16 +50,13 @@ func (t *muxerVariantMPEGTSSegment) duration() time.Duration {
return t.endDTS - *t.startDTS
}
func (t *muxerVariantMPEGTSSegment) write(p []byte) (int, error) {
if uint64(len(p)+t.buf.Len()) > t.segmentMaxSize {
return 0, fmt.Errorf("reached maximum segment size")
}
return t.buf.Write(p)
func (t *muxerVariantMPEGTSSegment) reader() io.Reader {
return bytes.NewReader(t.content)
}
func (t *muxerVariantMPEGTSSegment) reader() io.Reader {
return bytes.NewReader(t.buf.Bytes())
func (t *muxerVariantMPEGTSSegment) finalize(endDTS time.Duration) {
t.endDTS = endDTS
t.content = t.writer.GenerateSegment()
}
func (t *muxerVariantMPEGTSSegment) writeH264(
@ -79,56 +66,16 @@ func (t *muxerVariantMPEGTSSegment) writeH264( @@ -79,56 +66,16 @@ func (t *muxerVariantMPEGTSSegment) writeH264(
idrPresent bool,
nalus [][]byte,
) error {
// prepend an AUD. This is required by video.js and iOS
nalus = append([][]byte{{byte(h264.NALUTypeAccessUnitDelimiter), 240}}, nalus...)
enc, err := h264.AnnexBMarshal(nalus)
if err != nil {
return err
}
var af *astits.PacketAdaptationField
if idrPresent {
af = &astits.PacketAdaptationField{}
af.RandomAccessIndicator = true
}
// send PCR once in a while
if t.pcrSendCounter == 0 {
if af == nil {
af = &astits.PacketAdaptationField{}
}
af.HasPCR = true
af.PCR = &astits.ClockReference{Base: int64(pcr.Seconds() * 90000)}
t.pcrSendCounter = 3
size := uint64(0)
for _, nalu := range nalus {
size += uint64(len(nalu))
}
t.pcrSendCounter--
oh := &astits.PESOptionalHeader{
MarkerBits: 2,
}
if dts == pts {
oh.PTSDTSIndicator = astits.PTSDTSIndicatorOnlyPTS
oh.PTS = &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)}
} else {
oh.PTSDTSIndicator = astits.PTSDTSIndicatorBothPresent
oh.DTS = &astits.ClockReference{Base: int64((dts + mpegtsPCROffset).Seconds() * 90000)}
oh.PTS = &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)}
if (t.size + size) > t.segmentMaxSize {
return fmt.Errorf("reached maximum segment size")
}
t.size += size
_, err = t.writeData(&astits.MuxerData{
PID: 256,
AdaptationField: af,
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: oh,
StreamID: 224, // video
},
Data: enc,
},
})
err := t.writer.WriteH264(pcr, dts, pts, idrPresent, nalus)
if err != nil {
return err
}
@ -136,7 +83,6 @@ func (t *muxerVariantMPEGTSSegment) writeH264( @@ -136,7 +83,6 @@ func (t *muxerVariantMPEGTSSegment) writeH264(
if t.startDTS == nil {
t.startDTS = &dts
}
t.endDTS = dts
return nil
@ -147,50 +93,13 @@ func (t *muxerVariantMPEGTSSegment) writeAAC( @@ -147,50 +93,13 @@ func (t *muxerVariantMPEGTSSegment) writeAAC(
pts time.Duration,
au []byte,
) error {
pkts := mpeg4audio.ADTSPackets{
{
Type: t.audioTrack.Config.Type,
SampleRate: t.audioTrack.Config.SampleRate,
ChannelCount: t.audioTrack.Config.ChannelCount,
AU: au,
},
size := uint64(len(au))
if (t.size + size) > t.segmentMaxSize {
return fmt.Errorf("reached maximum segment size")
}
t.size += size
enc, err := pkts.Marshal()
if err != nil {
return err
}
af := &astits.PacketAdaptationField{
RandomAccessIndicator: true,
}
if t.videoTrack == nil {
// send PCR once in a while
if t.pcrSendCounter == 0 {
af.HasPCR = true
af.PCR = &astits.ClockReference{Base: int64(pcr.Seconds() * 90000)}
t.pcrSendCounter = 3
}
t.pcrSendCounter--
}
_, err = t.writeData(&astits.MuxerData{
PID: 257,
AdaptationField: af,
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS,
PTS: &astits.ClockReference{Base: int64((pts + mpegtsPCROffset).Seconds() * 90000)},
},
PacketLength: uint16(len(enc) + 8),
StreamID: 192, // audio
},
Data: enc,
},
})
err := t.writer.WriteAAC(pcr, pts, au)
if err != nil {
return err
}
@ -201,7 +110,6 @@ func (t *muxerVariantMPEGTSSegment) writeAAC( @@ -201,7 +110,6 @@ func (t *muxerVariantMPEGTSSegment) writeAAC(
if t.startDTS == nil {
t.startDTS = &pts
}
t.endDTS = pts
}

52
internal/hls/muxer_variant_mpegts_segmenter.go

@ -1,24 +1,18 @@ @@ -1,24 +1,18 @@
package hls
import (
"context"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/asticode/go-astits"
"github.com/aler9/rtsp-simple-server/internal/hls/mpegts"
)
const (
mpegtsSegmentMinAUCount = 100
)
type writerFunc func(p []byte) (int, error)
func (f writerFunc) Write(p []byte) (int, error) {
return f(p)
}
type muxerVariantMPEGTSSegmenter struct {
segmentDuration time.Duration
segmentMaxSize uint64
@ -26,7 +20,7 @@ type muxerVariantMPEGTSSegmenter struct { @@ -26,7 +20,7 @@ type muxerVariantMPEGTSSegmenter struct {
audioTrack *gortsplib.TrackMPEG4Audio
onSegmentReady func(*muxerVariantMPEGTSSegment)
writer *astits.Muxer
writer *mpegts.Writer
currentSegment *muxerVariantMPEGTSSegment
videoDTSExtractor *h264.DTSExtractor
startPCR time.Time
@ -48,31 +42,9 @@ func newMuxerVariantMPEGTSSegmenter( @@ -48,31 +42,9 @@ func newMuxerVariantMPEGTSSegmenter(
onSegmentReady: onSegmentReady,
}
m.writer = astits.NewMuxer(
context.Background(),
writerFunc(func(p []byte) (int, error) {
return m.currentSegment.write(p)
}))
if videoTrack != nil {
m.writer.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 256,
StreamType: astits.StreamTypeH264Video,
})
}
if audioTrack != nil {
m.writer.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 257,
StreamType: astits.StreamTypeAACAudio,
})
}
if videoTrack != nil {
m.writer.SetPCRPID(256)
} else {
m.writer.SetPCRPID(257)
}
m.writer = mpegts.NewWriter(
videoTrack,
audioTrack)
return m
}
@ -117,7 +89,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt @@ -117,7 +89,7 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt
// create first segment
m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize,
m.videoTrack, m.audioTrack, m.writer.WriteData)
m.videoTrack, m.audioTrack, m.writer)
} else {
if !idrPresent && !nonIDRPresent {
return nil
@ -135,10 +107,10 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt @@ -135,10 +107,10 @@ func (m *muxerVariantMPEGTSSegmenter) writeH264(pts time.Duration, nalus [][]byt
// switch segment
if idrPresent &&
(dts-*m.currentSegment.startDTS) >= m.segmentDuration {
m.currentSegment.endDTS = dts
m.currentSegment.finalize(dts)
m.onSegmentReady(m.currentSegment)
m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize,
m.videoTrack, m.audioTrack, m.writer.WriteData)
m.videoTrack, m.audioTrack, m.writer)
}
}
@ -166,17 +138,17 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(pts time.Duration, au []byte) err @@ -166,17 +138,17 @@ func (m *muxerVariantMPEGTSSegmenter) writeAAC(pts time.Duration, au []byte) err
// create first segment
m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize,
m.videoTrack, m.audioTrack, m.writer.WriteData)
m.videoTrack, m.audioTrack, m.writer)
} else {
pts -= m.startDTS
// switch segment
if m.currentSegment.audioAUCount >= mpegtsSegmentMinAUCount &&
(pts-*m.currentSegment.startDTS) >= m.segmentDuration {
m.currentSegment.endDTS = pts
m.currentSegment.finalize(pts)
m.onSegmentReady(m.currentSegment)
m.currentSegment = newMuxerVariantMPEGTSSegment(now, m.segmentMaxSize,
m.videoTrack, m.audioTrack, m.writer.WriteData)
m.videoTrack, m.audioTrack, m.writer)
}
}
} else {

Loading…
Cancel
Save