21 changed files with 1018 additions and 799 deletions
@ -1,114 +0,0 @@
@@ -1,114 +0,0 @@
|
||||
package hls |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/aler9/gortsplib" |
||||
"github.com/aler9/gortsplib/pkg/mpeg4audio" |
||||
) |
||||
|
||||
type clientAudioProcessorData struct { |
||||
data []byte |
||||
pts time.Duration |
||||
} |
||||
|
||||
type clientAudioProcessor struct { |
||||
ctx context.Context |
||||
onTrack func(*gortsplib.TrackMPEG4Audio) error |
||||
onData func(time.Duration, []byte) |
||||
|
||||
trackInitialized bool |
||||
queue chan clientAudioProcessorData |
||||
clockStartRTC time.Time |
||||
} |
||||
|
||||
func newClientAudioProcessor( |
||||
ctx context.Context, |
||||
onTrack func(*gortsplib.TrackMPEG4Audio) error, |
||||
onData func(time.Duration, []byte), |
||||
) *clientAudioProcessor { |
||||
p := &clientAudioProcessor{ |
||||
ctx: ctx, |
||||
onTrack: onTrack, |
||||
onData: onData, |
||||
queue: make(chan clientAudioProcessorData, clientQueueSize), |
||||
} |
||||
|
||||
return p |
||||
} |
||||
|
||||
func (p *clientAudioProcessor) run() error { |
||||
for { |
||||
select { |
||||
case item := <-p.queue: |
||||
err := p.doProcess(item.data, item.pts) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
case <-p.ctx.Done(): |
||||
return nil |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (p *clientAudioProcessor) doProcess( |
||||
data []byte, |
||||
pts time.Duration, |
||||
) error { |
||||
elapsed := time.Since(p.clockStartRTC) |
||||
if pts > elapsed { |
||||
select { |
||||
case <-p.ctx.Done(): |
||||
return fmt.Errorf("terminated") |
||||
case <-time.After(pts - elapsed): |
||||
} |
||||
} |
||||
|
||||
var adtsPkts mpeg4audio.ADTSPackets |
||||
err := adtsPkts.Unmarshal(data) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for i, pkt := range adtsPkts { |
||||
if !p.trackInitialized { |
||||
p.trackInitialized = true |
||||
|
||||
track := &gortsplib.TrackMPEG4Audio{ |
||||
PayloadType: 96, |
||||
Config: &mpeg4audio.Config{ |
||||
Type: pkt.Type, |
||||
SampleRate: pkt.SampleRate, |
||||
ChannelCount: pkt.ChannelCount, |
||||
}, |
||||
SizeLength: 13, |
||||
IndexLength: 3, |
||||
IndexDeltaLength: 3, |
||||
} |
||||
|
||||
err = p.onTrack(track) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
p.onData( |
||||
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*time.Second/time.Duration(pkt.SampleRate), |
||||
pkt.AU) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (p *clientAudioProcessor) process( |
||||
data []byte, |
||||
pts time.Duration, |
||||
) { |
||||
select { |
||||
case p.queue <- clientAudioProcessorData{data, pts}: |
||||
case <-p.ctx.Done(): |
||||
} |
||||
} |
@ -0,0 +1,278 @@
@@ -0,0 +1,278 @@
|
||||
package hls |
||||
|
||||
import ( |
||||
"context" |
||||
"crypto/sha256" |
||||
"crypto/tls" |
||||
"encoding/hex" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"net/url" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/aler9/gortsplib" |
||||
"github.com/grafov/m3u8" |
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/logger" |
||||
) |
||||
|
||||
type clientDownloader struct { |
||||
primaryPlaylistURL *url.URL |
||||
segmentQueue *clientSegmentQueue |
||||
logger ClientLogger |
||||
onTracks func(*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio) error |
||||
onVideoData func(time.Duration, [][]byte) |
||||
onAudioData func(time.Duration, []byte) |
||||
rp *clientRoutinePool |
||||
|
||||
streamPlaylistURL *url.URL |
||||
downloadedSegmentURIs []string |
||||
httpClient *http.Client |
||||
lastDownloadTime time.Time |
||||
firstPlaylistReceived bool |
||||
} |
||||
|
||||
func newClientDownloader( |
||||
primaryPlaylistURL *url.URL, |
||||
fingerprint string, |
||||
segmentQueue *clientSegmentQueue, |
||||
logger ClientLogger, |
||||
onTracks func(*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio) error, |
||||
onVideoData func(time.Duration, [][]byte), |
||||
onAudioData func(time.Duration, []byte), |
||||
rp *clientRoutinePool, |
||||
) *clientDownloader { |
||||
var tlsConfig *tls.Config |
||||
if fingerprint != "" { |
||||
tlsConfig = &tls.Config{ |
||||
InsecureSkipVerify: true, |
||||
VerifyConnection: func(cs tls.ConnectionState) error { |
||||
h := sha256.New() |
||||
h.Write(cs.PeerCertificates[0].Raw) |
||||
hstr := hex.EncodeToString(h.Sum(nil)) |
||||
fingerprintLower := strings.ToLower(fingerprint) |
||||
|
||||
if hstr != fingerprintLower { |
||||
return fmt.Errorf("server fingerprint do not match: expected %s, got %s", |
||||
fingerprintLower, hstr) |
||||
} |
||||
|
||||
return nil |
||||
}, |
||||
} |
||||
} |
||||
|
||||
return &clientDownloader{ |
||||
primaryPlaylistURL: primaryPlaylistURL, |
||||
segmentQueue: segmentQueue, |
||||
logger: logger, |
||||
onTracks: onTracks, |
||||
onVideoData: onVideoData, |
||||
onAudioData: onAudioData, |
||||
rp: rp, |
||||
httpClient: &http.Client{ |
||||
Transport: &http.Transport{ |
||||
TLSClientConfig: tlsConfig, |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (d *clientDownloader) run(ctx context.Context) error { |
||||
for { |
||||
ok := d.segmentQueue.waitUntilSizeIsBelow(ctx, clientMinSegmentsBeforeDownloading) |
||||
if !ok { |
||||
return fmt.Errorf("terminated") |
||||
} |
||||
|
||||
_, err := d.fillSegmentQueue(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (d *clientDownloader) fillSegmentQueue(ctx context.Context) (bool, error) { |
||||
minTime := d.lastDownloadTime.Add(clientMinDownloadPause) |
||||
now := time.Now() |
||||
if now.Before(minTime) { |
||||
select { |
||||
case <-time.After(minTime.Sub(now)): |
||||
case <-ctx.Done(): |
||||
return false, fmt.Errorf("terminated") |
||||
} |
||||
} |
||||
|
||||
d.lastDownloadTime = now |
||||
|
||||
pl, err := func() (*m3u8.MediaPlaylist, error) { |
||||
if d.streamPlaylistURL == nil { |
||||
return d.downloadPrimaryPlaylist(ctx) |
||||
} |
||||
return d.downloadStreamPlaylist(ctx) |
||||
}() |
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
if !d.firstPlaylistReceived { |
||||
d.firstPlaylistReceived = true |
||||
|
||||
if pl.Map != nil && pl.Map.URI != "" { |
||||
return false, fmt.Errorf("fMP4 streams are not supported yet") |
||||
} |
||||
|
||||
proc := newClientProcessorMPEGTS( |
||||
d.segmentQueue, |
||||
d.logger, |
||||
d.rp, |
||||
d.onTracks, |
||||
d.onVideoData, |
||||
d.onAudioData, |
||||
) |
||||
d.rp.add(proc.run) |
||||
} |
||||
|
||||
added := false |
||||
|
||||
for _, seg := range pl.Segments { |
||||
if seg == nil { |
||||
break |
||||
} |
||||
|
||||
if !d.segmentWasDownloaded(seg.URI) { |
||||
d.downloadedSegmentURIs = append(d.downloadedSegmentURIs, seg.URI) |
||||
byts, err := d.downloadSegment(ctx, seg.URI) |
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
d.segmentQueue.push(byts) |
||||
added = true |
||||
} |
||||
} |
||||
|
||||
return added, nil |
||||
} |
||||
|
||||
func (d *clientDownloader) segmentWasDownloaded(ur string) bool { |
||||
for _, q := range d.downloadedSegmentURIs { |
||||
if q == ur { |
||||
return true |
||||
} |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func (d *clientDownloader) downloadPrimaryPlaylist(ctx context.Context) (*m3u8.MediaPlaylist, error) { |
||||
d.logger.Log(logger.Debug, "downloading primary playlist %s", d.primaryPlaylistURL) |
||||
|
||||
pl, err := d.downloadPlaylist(ctx, d.primaryPlaylistURL) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
switch plt := pl.(type) { |
||||
case *m3u8.MediaPlaylist: |
||||
d.streamPlaylistURL = d.primaryPlaylistURL |
||||
return plt, nil |
||||
|
||||
case *m3u8.MasterPlaylist: |
||||
// choose the variant with the highest bandwidth
|
||||
var chosenVariant *m3u8.Variant |
||||
for _, v := range plt.Variants { |
||||
if chosenVariant == nil || |
||||
v.VariantParams.Bandwidth > chosenVariant.VariantParams.Bandwidth { |
||||
chosenVariant = v |
||||
} |
||||
} |
||||
|
||||
if chosenVariant == nil { |
||||
return nil, fmt.Errorf("no variants found") |
||||
} |
||||
|
||||
u, err := clientURLAbsolute(d.primaryPlaylistURL, chosenVariant.URI) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
d.streamPlaylistURL = u |
||||
|
||||
return d.downloadStreamPlaylist(ctx) |
||||
|
||||
default: |
||||
return nil, fmt.Errorf("invalid playlist") |
||||
} |
||||
} |
||||
|
||||
func (d *clientDownloader) downloadStreamPlaylist(ctx context.Context) (*m3u8.MediaPlaylist, error) { |
||||
d.logger.Log(logger.Debug, "downloading stream playlist %s", d.streamPlaylistURL.String()) |
||||
|
||||
pl, err := d.downloadPlaylist(ctx, d.streamPlaylistURL) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
plt, ok := pl.(*m3u8.MediaPlaylist) |
||||
if !ok { |
||||
return nil, fmt.Errorf("invalid playlist") |
||||
} |
||||
|
||||
return plt, nil |
||||
} |
||||
|
||||
func (d *clientDownloader) downloadPlaylist(ctx context.Context, ur *url.URL) (m3u8.Playlist, error) { |
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, ur.String(), nil) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
res, err := d.httpClient.Do(req) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer res.Body.Close() |
||||
|
||||
if res.StatusCode != http.StatusOK { |
||||
return nil, fmt.Errorf("bad status code: %d", res.StatusCode) |
||||
} |
||||
|
||||
pl, _, err := m3u8.DecodeFrom(res.Body, true) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return pl, nil |
||||
} |
||||
|
||||
func (d *clientDownloader) downloadSegment(ctx context.Context, segmentURI string) ([]byte, error) { |
||||
u, err := clientURLAbsolute(d.streamPlaylistURL, segmentURI) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
d.logger.Log(logger.Debug, "downloading segment %s", u) |
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
res, err := d.httpClient.Do(req) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer res.Body.Close() |
||||
|
||||
if res.StatusCode != http.StatusOK { |
||||
return nil, fmt.Errorf("bad status code: %d", res.StatusCode) |
||||
} |
||||
|
||||
byts, err := io.ReadAll(res.Body) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return byts, nil |
||||
} |
@ -0,0 +1,294 @@
@@ -0,0 +1,294 @@
|
||||
package hls |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"fmt" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/aler9/gortsplib" |
||||
"github.com/aler9/gortsplib/pkg/h264" |
||||
"github.com/aler9/gortsplib/pkg/mpeg4audio" |
||||
"github.com/asticode/go-astits" |
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/hls/mpegtstimedec" |
||||
"github.com/aler9/rtsp-simple-server/internal/logger" |
||||
) |
||||
|
||||
type clientProcessorMPEGTS struct { |
||||
segmentQueue *clientSegmentQueue |
||||
logger ClientLogger |
||||
rp *clientRoutinePool |
||||
onTracks func(*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio) error |
||||
onVideoData func(time.Duration, [][]byte) |
||||
onAudioData func(time.Duration, []byte) |
||||
|
||||
tracksParsed bool |
||||
clockInitialized bool |
||||
timeDec *mpegtstimedec.Decoder |
||||
startDTS time.Duration |
||||
videoPID *uint16 |
||||
audioPID *uint16 |
||||
videoTrack *gortsplib.TrackH264 |
||||
audioTrack *gortsplib.TrackMPEG4Audio |
||||
videoProc *clientProcessorMPEGTSTrack |
||||
audioProc *clientProcessorMPEGTSTrack |
||||
} |
||||
|
||||
func newClientProcessorMPEGTS( |
||||
segmentQueue *clientSegmentQueue, |
||||
logger ClientLogger, |
||||
rp *clientRoutinePool, |
||||
onTracks func(*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio) error, |
||||
onVideoData func(time.Duration, [][]byte), |
||||
onAudioData func(time.Duration, []byte), |
||||
) *clientProcessorMPEGTS { |
||||
return &clientProcessorMPEGTS{ |
||||
segmentQueue: segmentQueue, |
||||
logger: logger, |
||||
rp: rp, |
||||
timeDec: mpegtstimedec.New(), |
||||
onTracks: onTracks, |
||||
onVideoData: onVideoData, |
||||
onAudioData: onAudioData, |
||||
} |
||||
} |
||||
|
||||
func (p *clientProcessorMPEGTS) run(ctx context.Context) error { |
||||
for { |
||||
seg, ok := p.segmentQueue.pull(ctx) |
||||
if !ok { |
||||
return fmt.Errorf("terminated") |
||||
} |
||||
|
||||
err := p.processSegment(ctx, seg) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (p *clientProcessorMPEGTS) processSegment(ctx context.Context, byts []byte) error { |
||||
p.logger.Log(logger.Debug, "processing segment") |
||||
|
||||
dem := astits.NewDemuxer(context.Background(), bytes.NewReader(byts)) |
||||
|
||||
if !p.tracksParsed { |
||||
p.tracksParsed = true |
||||
|
||||
err := p.parseTracks(dem) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// rewind demuxer in order to read again the audio packet that was used to create the track
|
||||
if p.audioTrack != nil { |
||||
dem = astits.NewDemuxer(context.Background(), bytes.NewReader(byts)) |
||||
} |
||||
} |
||||
|
||||
for { |
||||
data, err := dem.NextData() |
||||
if err != nil { |
||||
if err == astits.ErrNoMorePackets { |
||||
return nil |
||||
} |
||||
if strings.HasPrefix(err.Error(), "astits: parsing PES data failed") { |
||||
continue |
||||
} |
||||
return err |
||||
} |
||||
|
||||
if data.PES == nil { |
||||
continue |
||||
} |
||||
|
||||
if data.PES.Header.OptionalHeader == nil || |
||||
data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorNoPTSOrDTS || |
||||
data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorIsForbidden { |
||||
return fmt.Errorf("PTS is missing") |
||||
} |
||||
|
||||
pts := p.timeDec.Decode(data.PES.Header.OptionalHeader.PTS.Base) |
||||
|
||||
if p.videoPID != nil && data.PID == *p.videoPID { |
||||
var dts time.Duration |
||||
if data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorBothPresent { |
||||
diff := time.Duration((data.PES.Header.OptionalHeader.PTS.Base- |
||||
data.PES.Header.OptionalHeader.DTS.Base)&0x1FFFFFFFF) * |
||||
time.Second / 90000 |
||||
dts = pts - diff |
||||
} else { |
||||
dts = pts |
||||
} |
||||
|
||||
if !p.clockInitialized { |
||||
p.clockInitialized = true |
||||
p.startDTS = dts |
||||
now := time.Now() |
||||
p.initializeTrackProcs(now) |
||||
} |
||||
|
||||
pts -= p.startDTS |
||||
dts -= p.startDTS |
||||
|
||||
p.videoProc.push(ctx, &clientProcessorMPEGTSTrackEntryVideo{ |
||||
data: data.PES.Data, |
||||
pts: pts, |
||||
dts: dts, |
||||
}) |
||||
} else if p.audioPID != nil && data.PID == *p.audioPID { |
||||
if !p.clockInitialized { |
||||
p.clockInitialized = true |
||||
p.startDTS = pts |
||||
now := time.Now() |
||||
p.initializeTrackProcs(now) |
||||
} |
||||
|
||||
pts -= p.startDTS |
||||
|
||||
p.audioProc.push(ctx, &clientProcessorMPEGTSTrackEntryAudio{ |
||||
data: data.PES.Data, |
||||
pts: pts, |
||||
}) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (p *clientProcessorMPEGTS) parseTracks(dem *astits.Demuxer) error { |
||||
// find and parse PMT
|
||||
for { |
||||
data, err := dem.NextData() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if data.PMT != nil { |
||||
for _, e := range data.PMT.ElementaryStreams { |
||||
switch e.StreamType { |
||||
case astits.StreamTypeH264Video: |
||||
if p.videoPID != nil { |
||||
return fmt.Errorf("multiple video/audio tracks are not supported") |
||||
} |
||||
|
||||
v := e.ElementaryPID |
||||
p.videoPID = &v |
||||
|
||||
case astits.StreamTypeAACAudio: |
||||
if p.audioPID != nil { |
||||
return fmt.Errorf("multiple video/audio tracks are not supported") |
||||
} |
||||
|
||||
v := e.ElementaryPID |
||||
p.audioPID = &v |
||||
} |
||||
} |
||||
break |
||||
} |
||||
} |
||||
|
||||
if p.videoPID == nil && p.audioPID == nil { |
||||
return fmt.Errorf("stream doesn't contain tracks with supported codecs (H264 or AAC)") |
||||
} |
||||
|
||||
if p.videoPID != nil { |
||||
p.videoTrack = &gortsplib.TrackH264{ |
||||
PayloadType: 96, |
||||
} |
||||
|
||||
if p.audioPID == nil { |
||||
err := p.onTracks(p.videoTrack, nil) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
|
||||
// find and parse first audio packet
|
||||
if p.audioPID != nil { |
||||
for { |
||||
data, err := dem.NextData() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if data.PES == nil || data.PID != *p.audioPID { |
||||
continue |
||||
} |
||||
|
||||
var adtsPkts mpeg4audio.ADTSPackets |
||||
err = adtsPkts.Unmarshal(data.PES.Data) |
||||
if err != nil { |
||||
return fmt.Errorf("unable to decode ADTS: %s", err) |
||||
} |
||||
|
||||
pkt := adtsPkts[0] |
||||
p.audioTrack = &gortsplib.TrackMPEG4Audio{ |
||||
PayloadType: 96, |
||||
Config: &mpeg4audio.Config{ |
||||
Type: pkt.Type, |
||||
SampleRate: pkt.SampleRate, |
||||
ChannelCount: pkt.ChannelCount, |
||||
}, |
||||
SizeLength: 13, |
||||
IndexLength: 3, |
||||
IndexDeltaLength: 3, |
||||
} |
||||
|
||||
err = p.onTracks(p.videoTrack, p.audioTrack) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
break |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (p *clientProcessorMPEGTS) initializeTrackProcs(clockStartRTC time.Time) { |
||||
if p.videoTrack != nil { |
||||
p.videoProc = newClientProcessorMPEGTSTrack( |
||||
clockStartRTC, |
||||
func(e clientProcessorMPEGTSTrackEntry) error { |
||||
vd := e.(*clientProcessorMPEGTSTrackEntryVideo) |
||||
|
||||
nalus, err := h264.AnnexBUnmarshal(vd.data) |
||||
if err != nil { |
||||
p.logger.Log(logger.Warn, "unable to decode Annex-B: %s", err) |
||||
return nil |
||||
} |
||||
|
||||
p.onVideoData(vd.pts, nalus) |
||||
return nil |
||||
}, |
||||
) |
||||
p.rp.add(p.videoProc.run) |
||||
} |
||||
|
||||
if p.audioTrack != nil { |
||||
p.audioProc = newClientProcessorMPEGTSTrack( |
||||
clockStartRTC, |
||||
func(e clientProcessorMPEGTSTrackEntry) error { |
||||
ad := e.(*clientProcessorMPEGTSTrackEntryAudio) |
||||
|
||||
var adtsPkts mpeg4audio.ADTSPackets |
||||
err := adtsPkts.Unmarshal(ad.data) |
||||
if err != nil { |
||||
return fmt.Errorf("unable to decode ADTS: %s", err) |
||||
} |
||||
|
||||
for i, pkt := range adtsPkts { |
||||
p.onAudioData( |
||||
ad.pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*time.Second/time.Duration(pkt.SampleRate), |
||||
pkt.AU) |
||||
} |
||||
|
||||
return nil |
||||
}, |
||||
) |
||||
p.rp.add(p.audioProc.run) |
||||
} |
||||
} |
@ -0,0 +1,83 @@
@@ -0,0 +1,83 @@
|
||||
package hls |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
) |
||||
|
||||
type clientProcessorMPEGTSTrackEntry interface { |
||||
DTS() time.Duration |
||||
} |
||||
|
||||
type clientProcessorMPEGTSTrackEntryVideo struct { |
||||
data []byte |
||||
pts time.Duration |
||||
dts time.Duration |
||||
} |
||||
|
||||
func (e clientProcessorMPEGTSTrackEntryVideo) DTS() time.Duration { |
||||
return e.dts |
||||
} |
||||
|
||||
type clientProcessorMPEGTSTrackEntryAudio struct { |
||||
data []byte |
||||
pts time.Duration |
||||
} |
||||
|
||||
func (e clientProcessorMPEGTSTrackEntryAudio) DTS() time.Duration { |
||||
return e.pts |
||||
} |
||||
|
||||
type clientProcessorMPEGTSTrack struct { |
||||
clockStartRTC time.Time |
||||
onEntry func(e clientProcessorMPEGTSTrackEntry) error |
||||
|
||||
queue chan clientProcessorMPEGTSTrackEntry |
||||
} |
||||
|
||||
func newClientProcessorMPEGTSTrack( |
||||
clockStartRTC time.Time, |
||||
onEntry func(e clientProcessorMPEGTSTrackEntry) error, |
||||
) *clientProcessorMPEGTSTrack { |
||||
return &clientProcessorMPEGTSTrack{ |
||||
clockStartRTC: clockStartRTC, |
||||
onEntry: onEntry, |
||||
queue: make(chan clientProcessorMPEGTSTrackEntry, clientQueueSize), |
||||
} |
||||
} |
||||
|
||||
func (t *clientProcessorMPEGTSTrack) run(ctx context.Context) error { |
||||
for { |
||||
select { |
||||
case entry := <-t.queue: |
||||
err := t.processEntry(ctx, entry) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
case <-ctx.Done(): |
||||
return nil |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (t *clientProcessorMPEGTSTrack) processEntry(ctx context.Context, entry clientProcessorMPEGTSTrackEntry) error { |
||||
elapsed := time.Since(t.clockStartRTC) |
||||
if entry.DTS() > elapsed { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return fmt.Errorf("terminated") |
||||
case <-time.After(entry.DTS() - elapsed): |
||||
} |
||||
} |
||||
|
||||
return t.onEntry(entry) |
||||
} |
||||
|
||||
func (t *clientProcessorMPEGTSTrack) push(ctx context.Context, entry clientProcessorMPEGTSTrackEntry) { |
||||
select { |
||||
case t.queue <- entry: |
||||
case <-ctx.Done(): |
||||
} |
||||
} |
@ -0,0 +1,44 @@
@@ -0,0 +1,44 @@
|
||||
package hls |
||||
|
||||
import ( |
||||
"context" |
||||
"sync" |
||||
) |
||||
|
||||
type clientRoutinePool struct { |
||||
ctx context.Context |
||||
ctxCancel func() |
||||
wg sync.WaitGroup |
||||
|
||||
err chan error |
||||
} |
||||
|
||||
func newClientRoutinePool() *clientRoutinePool { |
||||
ctx, ctxCancel := context.WithCancel(context.Background()) |
||||
|
||||
return &clientRoutinePool{ |
||||
ctx: ctx, |
||||
ctxCancel: ctxCancel, |
||||
err: make(chan error), |
||||
} |
||||
} |
||||
|
||||
func (rp *clientRoutinePool) close() { |
||||
rp.ctxCancel() |
||||
rp.wg.Wait() |
||||
} |
||||
|
||||
func (rp *clientRoutinePool) errorChan() chan error { |
||||
return rp.err |
||||
} |
||||
|
||||
func (rp *clientRoutinePool) add(cb func(context.Context) error) { |
||||
rp.wg.Add(1) |
||||
go func() { |
||||
defer rp.wg.Done() |
||||
select { |
||||
case rp.err <- cb(rp.ctx): |
||||
case <-rp.ctx.Done(): |
||||
} |
||||
}() |
||||
} |
@ -1,104 +0,0 @@
@@ -1,104 +0,0 @@
|
||||
package hls |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/aler9/gortsplib" |
||||
"github.com/aler9/gortsplib/pkg/h264" |
||||
|
||||
"github.com/aler9/rtsp-simple-server/internal/logger" |
||||
) |
||||
|
||||
type clientVideoProcessorData struct { |
||||
data []byte |
||||
pts time.Duration |
||||
dts time.Duration |
||||
} |
||||
|
||||
type clientVideoProcessor struct { |
||||
ctx context.Context |
||||
onTrack func(*gortsplib.TrackH264) error |
||||
onData func(time.Duration, [][]byte) |
||||
logger ClientLogger |
||||
|
||||
queue chan clientVideoProcessorData |
||||
clockStartRTC time.Time |
||||
} |
||||
|
||||
func newClientVideoProcessor( |
||||
ctx context.Context, |
||||
onTrack func(*gortsplib.TrackH264) error, |
||||
onData func(time.Duration, [][]byte), |
||||
logger ClientLogger, |
||||
) *clientVideoProcessor { |
||||
p := &clientVideoProcessor{ |
||||
ctx: ctx, |
||||
onTrack: onTrack, |
||||
onData: onData, |
||||
logger: logger, |
||||
queue: make(chan clientVideoProcessorData, clientQueueSize), |
||||
} |
||||
|
||||
return p |
||||
} |
||||
|
||||
func (p *clientVideoProcessor) run() error { |
||||
track := &gortsplib.TrackH264{ |
||||
PayloadType: 96, |
||||
} |
||||
|
||||
err := p.onTrack(track) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for { |
||||
select { |
||||
case item := <-p.queue: |
||||
err := p.doProcess(item.data, item.pts, item.dts) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
case <-p.ctx.Done(): |
||||
return nil |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (p *clientVideoProcessor) doProcess( |
||||
data []byte, |
||||
pts time.Duration, |
||||
dts time.Duration, |
||||
) error { |
||||
elapsed := time.Since(p.clockStartRTC) |
||||
if dts > elapsed { |
||||
select { |
||||
case <-p.ctx.Done(): |
||||
return fmt.Errorf("terminated") |
||||
case <-time.After(dts - elapsed): |
||||
} |
||||
} |
||||
|
||||
nalus, err := h264.AnnexBUnmarshal(data) |
||||
if err != nil { |
||||
p.logger.Log(logger.Warn, "unable to unmarshal Annex-B: %s", err) |
||||
return nil |
||||
} |
||||
|
||||
p.onData(pts, nalus) |
||||
return nil |
||||
} |
||||
|
||||
func (p *clientVideoProcessor) process( |
||||
data []byte, |
||||
pts time.Duration, |
||||
dts time.Duration, |
||||
) { |
||||
select { |
||||
case p.queue <- clientVideoProcessorData{data, pts, dts}: |
||||
case <-p.ctx.Done(): |
||||
} |
||||
} |
@ -0,0 +1,2 @@
@@ -0,0 +1,2 @@
|
||||
// Package fmp4 contains a fMP4 reader and writer.
|
||||
package fmp4 |
@ -0,0 +1,104 @@
@@ -0,0 +1,104 @@
|
||||
package fmp4 |
||||
|
||||
import ( |
||||
"bytes" |
||||
"fmt" |
||||
|
||||
gomp4 "github.com/abema/go-mp4" |
||||
"github.com/aler9/gortsplib" |
||||
) |
||||
|
||||
type initReadState int |
||||
|
||||
const ( |
||||
waitingTrak initReadState = iota |
||||
waitingCodec |
||||
waitingAVCC |
||||
) |
||||
|
||||
// InitRead reads a FMP4 initialization file.
|
||||
func InitRead(byts []byte) (*gortsplib.TrackH264, *gortsplib.TrackMPEG4Audio, error) { |
||||
state := waitingTrak |
||||
var videoTrack *gortsplib.TrackH264 |
||||
var audioTrack *gortsplib.TrackMPEG4Audio |
||||
|
||||
_, err := gomp4.ReadBoxStructure(bytes.NewReader(byts), func(h *gomp4.ReadHandle) (interface{}, error) { |
||||
switch h.BoxInfo.Type.String() { |
||||
case "trak": |
||||
if state != waitingTrak { |
||||
return nil, fmt.Errorf("parse error") |
||||
} |
||||
state = waitingCodec |
||||
|
||||
case "avc1": |
||||
if state != waitingCodec { |
||||
return nil, fmt.Errorf("parse error") |
||||
} |
||||
|
||||
if videoTrack != nil { |
||||
return nil, fmt.Errorf("multiple video tracks are not supported") |
||||
} |
||||
|
||||
state = waitingAVCC |
||||
|
||||
case "avcC": |
||||
if state != waitingAVCC { |
||||
return nil, fmt.Errorf("parse error") |
||||
} |
||||
|
||||
box, _, err := h.ReadPayload() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
conf := box.(*gomp4.AVCDecoderConfiguration) |
||||
|
||||
if len(conf.SequenceParameterSets) > 1 { |
||||
return nil, fmt.Errorf("multiple SPS are not supported") |
||||
} |
||||
|
||||
var sps []byte |
||||
if len(conf.SequenceParameterSets) == 1 { |
||||
sps = conf.SequenceParameterSets[0].NALUnit |
||||
} |
||||
|
||||
if len(conf.PictureParameterSets) > 1 { |
||||
return nil, fmt.Errorf("multiple PPS are not supported") |
||||
} |
||||
|
||||
var pps []byte |
||||
if len(conf.PictureParameterSets) == 1 { |
||||
pps = conf.PictureParameterSets[0].NALUnit |
||||
} |
||||
|
||||
videoTrack = &gortsplib.TrackH264{ |
||||
PayloadType: 96, |
||||
SPS: sps, |
||||
PPS: pps, |
||||
} |
||||
|
||||
state = waitingTrak |
||||
|
||||
case "mp4a": |
||||
if state != waitingCodec { |
||||
return nil, fmt.Errorf("parse error") |
||||
} |
||||
|
||||
if audioTrack != nil { |
||||
return nil, fmt.Errorf("multiple audio tracks are not supported") |
||||
} |
||||
|
||||
return nil, fmt.Errorf("TODO: MP4a") |
||||
} |
||||
|
||||
return h.Expand() |
||||
}) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
|
||||
if state != waitingTrak { |
||||
return nil, nil, fmt.Errorf("parse error") |
||||
} |
||||
|
||||
return videoTrack, audioTrack, nil |
||||
} |
@ -0,0 +1,96 @@
@@ -0,0 +1,96 @@
|
||||
package fmp4 |
||||
|
||||
import ( |
||||
"bytes" |
||||
"fmt" |
||||
|
||||
gomp4 "github.com/abema/go-mp4" |
||||
) |
||||
|
||||
type partReadState int |
||||
|
||||
const ( |
||||
waitingTraf partReadState = iota |
||||
waitingTfhd |
||||
waitingTfdt |
||||
waitingTrun |
||||
) |
||||
|
||||
// PartRead reads a FMP4 part file.
|
||||
func PartRead( |
||||
byts []byte, |
||||
cb func(), |
||||
) error { |
||||
state := waitingTraf |
||||
var trackID uint32 |
||||
var baseTime uint64 |
||||
var entries []gomp4.TrunEntry |
||||
|
||||
_, err := gomp4.ReadBoxStructure(bytes.NewReader(byts), func(h *gomp4.ReadHandle) (interface{}, error) { |
||||
switch h.BoxInfo.Type.String() { |
||||
case "traf": |
||||
if state != waitingTraf { |
||||
return nil, fmt.Errorf("decode error") |
||||
} |
||||
state = waitingTfhd |
||||
|
||||
case "tfhd": |
||||
if state != waitingTfhd { |
||||
return nil, fmt.Errorf("decode error") |
||||
} |
||||
|
||||
box, _, err := h.ReadPayload() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
trackID = box.(*gomp4.Tfhd).TrackID |
||||
|
||||
state = waitingTfdt |
||||
|
||||
case "tfdt": |
||||
if state != waitingTfdt { |
||||
return nil, fmt.Errorf("decode error") |
||||
} |
||||
|
||||
box, _, err := h.ReadPayload() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
t := box.(*gomp4.Tfdt) |
||||
|
||||
if t.FullBox.Version != 1 { |
||||
return nil, fmt.Errorf("unsupported tfdt version") |
||||
} |
||||
|
||||
baseTime = t.BaseMediaDecodeTimeV1 |
||||
state = waitingTrun |
||||
|
||||
case "trun": |
||||
if state != waitingTrun { |
||||
return nil, fmt.Errorf("decode error") |
||||
} |
||||
|
||||
box, _, err := h.ReadPayload() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
t := box.(*gomp4.Trun) |
||||
|
||||
entries = t.Entries |
||||
state = waitingTraf |
||||
} |
||||
|
||||
return h.Expand() |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if state != waitingTraf { |
||||
return fmt.Errorf("parse error") |
||||
} |
||||
|
||||
fmt.Println("TODO", trackID, baseTime, entries) |
||||
|
||||
return nil |
||||
} |
Loading…
Reference in new issue