Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

204 lines
3.8 KiB

package core
import (
"context"
"sync"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/rtpaac"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/aler9/rtsp-simple-server/internal/hls"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtcpsenderset"
)
const (
hlsSourceRetryPause = 5 * time.Second
)
type hlsSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
}
type hlsSource struct {
ur string
fingerprint string
wg *sync.WaitGroup
parent hlsSourceParent
ctx context.Context
ctxCancel func()
}
func newHLSSource(
parentCtx context.Context,
ur string,
fingerprint string,
wg *sync.WaitGroup,
parent hlsSourceParent) *hlsSource {
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &hlsSource{
ur: ur,
fingerprint: fingerprint,
wg: wg,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
}
s.Log(logger.Info, "started")
s.wg.Add(1)
go s.run()
return s
}
func (s *hlsSource) close() {
s.Log(logger.Info, "stopped")
s.ctxCancel()
}
func (s *hlsSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.log(level, "[hls source] "+format, args...)
}
func (s *hlsSource) run() {
defer s.wg.Done()
outer:
for {
ok := s.runInner()
if !ok {
break outer
}
select {
case <-time.After(hlsSourceRetryPause):
case <-s.ctx.Done():
break outer
}
}
s.ctxCancel()
}
func (s *hlsSource) runInner() bool {
var stream *stream
var rtcpSenders *rtcpsenderset.RTCPSenderSet
var videoTrackID int
var audioTrackID int
var videoEnc *rtph264.Encoder
var audioEnc *rtpaac.Encoder
defer func() {
if stream != nil {
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{source: s})
rtcpSenders.Close()
}
}()
onTracks := func(videoTrack *gortsplib.TrackH264, audioTrack *gortsplib.TrackAAC) error {
var tracks gortsplib.Tracks
if videoTrack != nil {
videoTrackID = len(tracks)
videoEnc = &rtph264.Encoder{PayloadType: 96}
videoEnc.Init()
tracks = append(tracks, videoTrack)
}
if audioTrack != nil {
audioTrackID = len(tracks)
audioEnc = &rtpaac.Encoder{
PayloadType: 97,
SampleRate: audioTrack.ClockRate(),
}
audioEnc.Init()
tracks = append(tracks, audioTrack)
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
source: s,
tracks: tracks,
})
if res.err != nil {
return res.err
}
s.Log(logger.Info, "ready")
stream = res.stream
rtcpSenders = rtcpsenderset.New(tracks, stream.writePacketRTCP)
return nil
}
onVideoData := func(pts time.Duration, nalus [][]byte) {
if stream == nil {
return
}
pkts, err := videoEnc.Encode(nalus, pts)
if err != nil {
return
}
for _, pkt := range pkts {
rtcpSenders.OnPacketRTP(videoTrackID, pkt)
stream.writePacketRTP(videoTrackID, pkt)
}
}
onAudioData := func(pts time.Duration, aus [][]byte) {
if stream == nil {
return
}
pkts, err := audioEnc.Encode(aus, pts)
if err != nil {
return
}
for _, pkt := range pkts {
rtcpSenders.OnPacketRTP(audioTrackID, pkt)
stream.writePacketRTP(audioTrackID, pkt)
}
}
c, err := hls.NewClient(
s.ur,
s.fingerprint,
onTracks,
onVideoData,
onAudioData,
s,
)
if err != nil {
s.Log(logger.Info, "ERR: %v", err)
return true
}
select {
case err := <-c.Wait():
s.Log(logger.Info, "ERR: %v", err)
return true
case <-s.ctx.Done():
c.Close()
<-c.Wait()
return false
}
}
// onSourceAPIDescribe implements source.
func (*hlsSource) onSourceAPIDescribe() interface{} {
return struct {
Type string `json:"type"`
}{"hlsSource"}
}