8 changed files with 374 additions and 236 deletions
@ -0,0 +1,238 @@ |
|||||||
|
package hls |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"io" |
||||||
|
"math" |
||||||
|
"strconv" |
||||||
|
"strings" |
||||||
|
"sync" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/aler9/gortsplib" |
||||||
|
"github.com/aler9/gortsplib/pkg/rtpaac" |
||||||
|
|
||||||
|
"github.com/aler9/rtsp-simple-server/internal/h264" |
||||||
|
) |
||||||
|
|
||||||
|
const ( |
||||||
|
// an offset is needed to
|
||||||
|
// - avoid negative PTS values
|
||||||
|
// - avoid PTS < DTS during startup
|
||||||
|
ptsOffset = 2 * time.Second |
||||||
|
|
||||||
|
segmentMinAUCount = 100 |
||||||
|
) |
||||||
|
|
||||||
|
// Muxer is a HLS muxer.
|
||||||
|
type Muxer struct { |
||||||
|
hlsSegmentCount int |
||||||
|
hlsSegmentDuration time.Duration |
||||||
|
videoTrack *gortsplib.Track |
||||||
|
audioTrack *gortsplib.Track |
||||||
|
|
||||||
|
aacConfig rtpaac.MPEG4AudioConfig |
||||||
|
startPCR time.Time |
||||||
|
videoDTSEst *h264.DTSEstimator |
||||||
|
audioAUCount int |
||||||
|
tsCurrent *tsFile |
||||||
|
tsQueue []*tsFile |
||||||
|
tsByName map[string]*tsFile |
||||||
|
tsDeleteCount int |
||||||
|
mutex sync.RWMutex |
||||||
|
} |
||||||
|
|
||||||
|
// NewMuxer allocates a Muxer.
|
||||||
|
func NewMuxer( |
||||||
|
hlsSegmentCount int, |
||||||
|
hlsSegmentDuration time.Duration, |
||||||
|
videoTrack *gortsplib.Track, |
||||||
|
audioTrack *gortsplib.Track) (*Muxer, error) { |
||||||
|
var aacConfig rtpaac.MPEG4AudioConfig |
||||||
|
if audioTrack != nil { |
||||||
|
byts, err := audioTrack.ExtractDataAAC() |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
err = aacConfig.Decode(byts) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
m := &Muxer{ |
||||||
|
hlsSegmentCount: hlsSegmentCount, |
||||||
|
hlsSegmentDuration: hlsSegmentDuration, |
||||||
|
videoTrack: videoTrack, |
||||||
|
audioTrack: audioTrack, |
||||||
|
aacConfig: aacConfig, |
||||||
|
startPCR: time.Now(), |
||||||
|
videoDTSEst: h264.NewDTSEstimator(), |
||||||
|
tsCurrent: newTSFile(videoTrack != nil, audioTrack != nil), |
||||||
|
tsByName: make(map[string]*tsFile), |
||||||
|
} |
||||||
|
|
||||||
|
m.tsByName[m.tsCurrent.name] = m.tsCurrent |
||||||
|
m.tsQueue = append(m.tsQueue, m.tsCurrent) |
||||||
|
|
||||||
|
return m, nil |
||||||
|
} |
||||||
|
|
||||||
|
// Close closes a Muxer.
|
||||||
|
func (m *Muxer) Close() { |
||||||
|
m.tsCurrent.close() |
||||||
|
} |
||||||
|
|
||||||
|
// WriteH264 writes H264 NALUs, grouped by PTS, into the muxer.
|
||||||
|
func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error { |
||||||
|
idrPresent := func() bool { |
||||||
|
for _, nalu := range nalus { |
||||||
|
typ := h264.NALUType(nalu[0] & 0x1F) |
||||||
|
if typ == h264.NALUTypeIDR { |
||||||
|
return true |
||||||
|
} |
||||||
|
} |
||||||
|
return false |
||||||
|
}() |
||||||
|
|
||||||
|
// skip group silently until we find one with a IDR
|
||||||
|
if !m.tsCurrent.firstPacketWritten && !idrPresent { |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
m.mutex.Lock() |
||||||
|
defer m.mutex.Unlock() |
||||||
|
|
||||||
|
if idrPresent { |
||||||
|
if m.tsCurrent.firstPacketWritten && |
||||||
|
m.tsCurrent.duration() >= m.hlsSegmentDuration { |
||||||
|
if m.tsCurrent != nil { |
||||||
|
m.tsCurrent.close() |
||||||
|
} |
||||||
|
|
||||||
|
m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil) |
||||||
|
|
||||||
|
m.tsByName[m.tsCurrent.name] = m.tsCurrent |
||||||
|
m.tsQueue = append(m.tsQueue, m.tsCurrent) |
||||||
|
if len(m.tsQueue) > m.hlsSegmentCount { |
||||||
|
delete(m.tsByName, m.tsQueue[0].name) |
||||||
|
m.tsQueue = m.tsQueue[1:] |
||||||
|
m.tsDeleteCount++ |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
m.tsCurrent.setPCR(time.Since(m.startPCR)) |
||||||
|
err := m.tsCurrent.writeH264( |
||||||
|
m.videoDTSEst.Feed(pts+ptsOffset), |
||||||
|
pts+ptsOffset, |
||||||
|
idrPresent, |
||||||
|
nalus) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// WriteAAC writes AAC AUs, grouped by PTS, into the muxer.
|
||||||
|
func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error { |
||||||
|
m.mutex.Lock() |
||||||
|
defer m.mutex.Unlock() |
||||||
|
|
||||||
|
if m.videoTrack != nil { |
||||||
|
if m.tsCurrent.firstPacketWritten && |
||||||
|
m.tsCurrent.duration() >= m.hlsSegmentDuration && |
||||||
|
m.audioAUCount >= segmentMinAUCount { |
||||||
|
|
||||||
|
if m.tsCurrent != nil { |
||||||
|
m.tsCurrent.close() |
||||||
|
} |
||||||
|
|
||||||
|
m.audioAUCount = 0 |
||||||
|
m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil) |
||||||
|
m.tsByName[m.tsCurrent.name] = m.tsCurrent |
||||||
|
m.tsQueue = append(m.tsQueue, m.tsCurrent) |
||||||
|
if len(m.tsQueue) > m.hlsSegmentCount { |
||||||
|
delete(m.tsByName, m.tsQueue[0].name) |
||||||
|
m.tsQueue = m.tsQueue[1:] |
||||||
|
m.tsDeleteCount++ |
||||||
|
} |
||||||
|
} |
||||||
|
} else { |
||||||
|
if !m.tsCurrent.firstPacketWritten { |
||||||
|
return nil |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
for i, au := range aus { |
||||||
|
auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(m.aacConfig.SampleRate) |
||||||
|
|
||||||
|
m.audioAUCount++ |
||||||
|
m.tsCurrent.setPCR(time.Since(m.startPCR)) |
||||||
|
err := m.tsCurrent.writeAAC( |
||||||
|
m.aacConfig.SampleRate, |
||||||
|
m.aacConfig.ChannelCount, |
||||||
|
auPTS+ptsOffset, |
||||||
|
au) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// Playlist returns a reader to read the HLS playlist in M3U8 format.
|
||||||
|
func (m *Muxer) Playlist() io.Reader { |
||||||
|
m.mutex.RLock() |
||||||
|
defer m.mutex.RUnlock() |
||||||
|
|
||||||
|
if len(m.tsQueue) == 0 { |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
cnt := "#EXTM3U\n" |
||||||
|
cnt += "#EXT-X-VERSION:3\n" |
||||||
|
cnt += "#EXT-X-ALLOW-CACHE:NO\n" |
||||||
|
|
||||||
|
targetDuration := func() uint { |
||||||
|
ret := uint(math.Ceil(m.hlsSegmentDuration.Seconds())) |
||||||
|
|
||||||
|
// EXTINF, when rounded to the nearest integer, must be <= EXT-X-TARGETDURATION
|
||||||
|
for _, f := range m.tsQueue { |
||||||
|
v2 := uint(math.Round(f.duration().Seconds())) |
||||||
|
if v2 > ret { |
||||||
|
ret = v2 |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
return ret |
||||||
|
}() |
||||||
|
cnt += "#EXT-X-TARGETDURATION:" + strconv.FormatUint(uint64(targetDuration), 10) + "\n" |
||||||
|
|
||||||
|
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(m.tsDeleteCount), 10) + "\n" |
||||||
|
|
||||||
|
for _, f := range m.tsQueue { |
||||||
|
cnt += "#EXTINF:" + strconv.FormatFloat(f.duration().Seconds(), 'f', -1, 64) + ",\n" |
||||||
|
cnt += f.name + ".ts\n" |
||||||
|
} |
||||||
|
|
||||||
|
return bytes.NewReader([]byte(cnt)) |
||||||
|
} |
||||||
|
|
||||||
|
// TSFile returns a reader to read a given MPEG-TS file.
|
||||||
|
func (m *Muxer) TSFile(fname string) io.Reader { |
||||||
|
base := strings.TrimSuffix(fname, ".ts") |
||||||
|
|
||||||
|
m.mutex.RLock() |
||||||
|
f, ok := m.tsByName[base] |
||||||
|
m.mutex.RUnlock() |
||||||
|
|
||||||
|
if !ok { |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
return f.newReader() |
||||||
|
} |
@ -0,0 +1,54 @@ |
|||||||
|
package hls |
||||||
|
|
||||||
|
import ( |
||||||
|
"io/ioutil" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/aler9/gortsplib" |
||||||
|
"github.com/stretchr/testify/require" |
||||||
|
) |
||||||
|
|
||||||
|
func TestMuxer(t *testing.T) { |
||||||
|
videoTrack, err := gortsplib.NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
audioTrack, err := gortsplib.NewTrackAAC(97, []byte{17, 144}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
m, err := NewMuxer(3, 5*time.Second, videoTrack, audioTrack) |
||||||
|
require.NoError(t, err) |
||||||
|
defer m.Close() |
||||||
|
|
||||||
|
// group without IDR
|
||||||
|
err = m.WriteH264(1*time.Second, [][]byte{ |
||||||
|
{0x06}, |
||||||
|
{0x07}, |
||||||
|
}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
// group with IDR
|
||||||
|
err = m.WriteH264(2*time.Second, [][]byte{ |
||||||
|
{0x05}, |
||||||
|
{0x06}, |
||||||
|
}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
err = m.WriteAAC(3*time.Second, [][]byte{ |
||||||
|
{0x01, 0x02, 0x03, 0x04}, |
||||||
|
{0x05, 0x06, 0x07, 0x08}, |
||||||
|
}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
// group without IDR
|
||||||
|
err = m.WriteH264(4*time.Second, [][]byte{ |
||||||
|
{0x06}, |
||||||
|
{0x07}, |
||||||
|
}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
byts, err := ioutil.ReadAll(m.Playlist()) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
require.Regexp(t, `^#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:5\n#EXT-X-MEDIA-SEQUENCE:0\n#EXTINF:2,\n[0-9]+\.ts\n$`, string(byts)) |
||||||
|
} |
Loading…
Reference in new issue