Browse Source

hls muxer: avoid infinite loop when hlsAlwaysRemux is true

when hlsAlwaysRemux is true and a muxer fails, add a pause between its
recreation, in order to avoid infinite loops
pull/1377/head
aler9 2 years ago
parent
commit
7420ef1ce4
  1. 120
      internal/core/hls_muxer.go
  2. 5
      internal/core/hls_server.go

120
internal/core/hls_muxer.go

@ -25,8 +25,9 @@ import ( @@ -25,8 +25,9 @@ import (
)
const (
closeCheckPeriod = 1 * time.Second
closeAfterInactivity = 60 * time.Second
closeCheckPeriod = 1 * time.Second
closeAfterInactivity = 60 * time.Second
hlsMuxerRecreatePause = 10 * time.Second
)
//go:embed hls_index.html
@ -57,11 +58,12 @@ type hlsMuxer struct { @@ -57,11 +58,12 @@ type hlsMuxer struct {
name string
remoteAddr string
externalAuthenticationURL string
hlsVariant conf.HLSVariant
hlsSegmentCount int
hlsSegmentDuration conf.StringDuration
hlsPartDuration conf.StringDuration
hlsSegmentMaxSize conf.StringSize
alwaysRemux bool
variant conf.HLSVariant
segmentCount int
segmentDuration conf.StringDuration
partDuration conf.StringDuration
segmentMaxSize conf.StringSize
readBufferCount int
wg *sync.WaitGroup
pathName string
@ -88,11 +90,12 @@ func newHLSMuxer( @@ -88,11 +90,12 @@ func newHLSMuxer(
name string,
remoteAddr string,
externalAuthenticationURL string,
hlsVariant conf.HLSVariant,
hlsSegmentCount int,
hlsSegmentDuration conf.StringDuration,
hlsPartDuration conf.StringDuration,
hlsSegmentMaxSize conf.StringSize,
alwaysRemux bool,
variant conf.HLSVariant,
segmentCount int,
segmentDuration conf.StringDuration,
partDuration conf.StringDuration,
segmentMaxSize conf.StringSize,
readBufferCount int,
req *hlsMuxerRequest,
wg *sync.WaitGroup,
@ -106,11 +109,12 @@ func newHLSMuxer( @@ -106,11 +109,12 @@ func newHLSMuxer(
name: name,
remoteAddr: remoteAddr,
externalAuthenticationURL: externalAuthenticationURL,
hlsVariant: hlsVariant,
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
hlsPartDuration: hlsPartDuration,
hlsSegmentMaxSize: hlsSegmentMaxSize,
alwaysRemux: alwaysRemux,
variant: variant,
segmentCount: segmentCount,
segmentDuration: segmentDuration,
partDuration: partDuration,
segmentMaxSize: segmentMaxSize,
readBufferCount: readBufferCount,
wg: wg,
pathName: pathName,
@ -161,21 +165,34 @@ func (m *hlsMuxer) PathName() string { @@ -161,21 +165,34 @@ func (m *hlsMuxer) PathName() string {
func (m *hlsMuxer) run() {
defer m.wg.Done()
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
innerReady := make(chan struct{})
innerErr := make(chan error)
go func() {
innerErr <- m.runInner(innerCtx, innerReady)
}()
err := func() error {
var innerReady chan struct{}
var innerErr chan error
var innerCtx context.Context
var innerCtxCancel func()
createInner := func() {
innerReady = make(chan struct{})
innerErr = make(chan error)
innerCtx, innerCtxCancel = context.WithCancel(context.Background())
go func() {
innerErr <- m.runInner(innerCtx, innerReady)
}()
}
isReady := false
createInner()
isReady := false
isRecreating := false
recreateTimer := newEmptyTimer()
err := func() error {
for {
select {
case <-m.ctx.Done():
innerCtxCancel()
<-innerErr
if !isRecreating {
innerCtxCancel()
<-innerErr
}
return errors.New("terminated")
case req := <-m.chRequest:
@ -208,13 +225,34 @@ func (m *hlsMuxer) run() { @@ -208,13 +225,34 @@ func (m *hlsMuxer) run() {
case err := <-innerErr:
innerCtxCancel()
return err
if m.alwaysRemux {
m.log(logger.Info, "ERR: %v", err)
m.clearQueuedRequests()
isReady = false
isRecreating = true
recreateTimer = time.NewTimer(hlsMuxerRecreatePause)
} else {
return err
}
case <-recreateTimer.C:
isRecreating = false
createInner()
}
}
}()
m.ctxCancel()
m.clearQueuedRequests()
m.parent.muxerClose(m)
m.log(logger.Info, "destroyed (%v)", err)
}
func (m *hlsMuxer) clearQueuedRequests() {
for _, req := range m.requests {
req.res <- hlsMuxerResponse{
muxer: m,
@ -223,10 +261,6 @@ func (m *hlsMuxer) run() { @@ -223,10 +261,6 @@ func (m *hlsMuxer) run() {
},
}
}
m.parent.muxerClose(m)
m.log(logger.Info, "destroyed (%v)", err)
}
func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
@ -267,11 +301,11 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -267,11 +301,11 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
var err error
m.muxer, err = hls.NewMuxer(
hls.MuxerVariant(m.hlsVariant),
m.hlsSegmentCount,
time.Duration(m.hlsSegmentDuration),
time.Duration(m.hlsPartDuration),
uint64(m.hlsSegmentMaxSize),
hls.MuxerVariant(m.variant),
m.segmentCount,
time.Duration(m.segmentDuration),
time.Duration(m.partDuration),
uint64(m.segmentMaxSize),
videoFormat,
audioFormat,
)
@ -296,11 +330,13 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -296,11 +330,13 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
for {
select {
case <-closeCheckTicker.C:
t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime))
if m.remoteAddr != "" && time.Since(t) >= closeAfterInactivity {
m.ringBuffer.Close()
<-writerDone
return fmt.Errorf("not used anymore")
if m.remoteAddr != "" {
t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime))
if time.Since(t) >= closeAfterInactivity {
m.ringBuffer.Close()
<-writerDone
return fmt.Errorf("not used anymore")
}
}
case err := <-writerDone:

5
internal/core/hls_server.go

@ -225,10 +225,6 @@ outer: @@ -225,10 +225,6 @@ outer:
}
delete(s.muxers, c.PathName())
if s.alwaysRemux && c.remoteAddr == "" {
s.findOrCreateMuxer(c.PathName(), "", nil)
}
case req := <-s.chAPIMuxerList:
muxers := make(map[string]*hlsMuxer)
@ -339,6 +335,7 @@ func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *h @@ -339,6 +335,7 @@ func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *h
pathName,
remoteAddr,
s.externalAuthenticationURL,
s.alwaysRemux,
s.variant,
s.segmentCount,
s.segmentDuration,

Loading…
Cancel
Save