Browse Source

hls: when alwaysRemux is true, prevent clients from creating muxers (#3015)

pull/3016/head
Alessandro Ros 1 year ago committed by GitHub
parent
commit
c2883f2ce9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 22
      internal/servers/hls/http_server.go
  2. 36
      internal/servers/hls/muxer.go
  3. 52
      internal/servers/hls/server.go

22
internal/servers/hls/http_server.go

@ -191,10 +191,24 @@ func (s *httpServer) onRequest(ctx *gin.Context) {
ctx.Writer.Write(hlsIndex) ctx.Writer.Write(hlsIndex)
default: default:
s.parent.handleRequest(muxerHandleRequestReq{ mux, err := s.parent.getMuxer(serverGetMuxerReq{
path: dir, path: dir,
file: fname, remoteAddr: httpp.RemoteAddr(ctx),
ctx: ctx,
}) })
if err != nil {
s.Log(logger.Error, err.Error())
ctx.Writer.WriteHeader(http.StatusInternalServerError)
return
}
err = mux.processRequest(muxerProcessRequestReq{})
if err != nil {
s.Log(logger.Error, err.Error())
ctx.Writer.WriteHeader(http.StatusInternalServerError)
return
}
ctx.Request.URL.Path = fname
mux.handleRequest(ctx)
} }
} }

36
internal/servers/hls/muxer.go

@ -51,11 +51,8 @@ func (w *responseWriterWithCounter) Write(p []byte) (int, error) {
return n, err return n, err
} }
type muxerHandleRequestReq struct { type muxerProcessRequestReq struct {
path string res chan error
file string
ctx *gin.Context
res chan *muxer
} }
type muxer struct { type muxer struct {
@ -81,11 +78,11 @@ type muxer struct {
writer *asyncwriter.Writer writer *asyncwriter.Writer
lastRequestTime *int64 lastRequestTime *int64
muxer *gohlslib.Muxer muxer *gohlslib.Muxer
requests []*muxerHandleRequestReq requests []*muxerProcessRequestReq
bytesSent *uint64 bytesSent *uint64
// in // in
chRequest chan *muxerHandleRequestReq chProcessRequest chan muxerProcessRequestReq
} }
func (m *muxer) initialize() { func (m *muxer) initialize() {
@ -96,7 +93,7 @@ func (m *muxer) initialize() {
m.created = time.Now() m.created = time.Now()
m.lastRequestTime = int64Ptr(time.Now().UnixNano()) m.lastRequestTime = int64Ptr(time.Now().UnixNano())
m.bytesSent = new(uint64) m.bytesSent = new(uint64)
m.chRequest = make(chan *muxerHandleRequestReq) m.chProcessRequest = make(chan muxerProcessRequestReq)
m.Log(logger.Info, "created %s", func() string { m.Log(logger.Info, "created %s", func() string {
if m.remoteAddr == "" { if m.remoteAddr == "" {
@ -156,22 +153,22 @@ func (m *muxer) run() {
} }
return errors.New("terminated") return errors.New("terminated")
case req := <-m.chRequest: case req := <-m.chProcessRequest:
switch { switch {
case isRecreating: case isRecreating:
req.res <- nil req.res <- fmt.Errorf("recreating")
case isReady: case isReady:
req.res <- m req.res <- nil
default: default:
m.requests = append(m.requests, req) m.requests = append(m.requests, &req)
} }
case <-innerReady: case <-innerReady:
isReady = true isReady = true
for _, req := range m.requests { for _, req := range m.requests {
req.res <- m req.res <- nil
} }
m.requests = nil m.requests = nil
@ -206,7 +203,7 @@ func (m *muxer) run() {
func (m *muxer) clearQueuedRequests() { func (m *muxer) clearQueuedRequests() {
for _, req := range m.requests { for _, req := range m.requests {
req.res <- nil req.res <- fmt.Errorf("terminated")
} }
m.requests = nil m.requests = nil
} }
@ -479,12 +476,15 @@ func (m *muxer) handleRequest(ctx *gin.Context) {
m.muxer.Handle(w, ctx.Request) m.muxer.Handle(w, ctx.Request)
} }
// processRequest is called by hlsserver.Server (forwarded from ServeHTTP). func (m *muxer) processRequest(req muxerProcessRequestReq) error {
func (m *muxer) processRequest(req *muxerHandleRequestReq) { req.res = make(chan error)
select { select {
case m.chRequest <- req: case m.chProcessRequest <- req:
return <-req.res
case <-m.ctx.Done(): case <-m.ctx.Done():
req.res <- nil return fmt.Errorf("terminated")
} }
} }

52
internal/servers/hls/server.go

@ -11,12 +11,22 @@ import (
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpp"
) )
// ErrMuxerNotFound is returned when a muxer is not found. // ErrMuxerNotFound is returned when a muxer is not found.
var ErrMuxerNotFound = errors.New("muxer not found") var ErrMuxerNotFound = errors.New("muxer not found")
type serverGetMuxerRes struct {
muxer *muxer
err error
}
type serverGetMuxerReq struct {
path string
remoteAddr string
res chan serverGetMuxerRes
}
type serverAPIMuxersListRes struct { type serverAPIMuxersListRes struct {
data *defs.APIHLSMuxerList data *defs.APIHLSMuxerList
err error err error
@ -68,12 +78,12 @@ type Server struct {
muxers map[string]*muxer muxers map[string]*muxer
// in // in
chPathReady chan defs.Path chPathReady chan defs.Path
chPathNotReady chan defs.Path chPathNotReady chan defs.Path
chHandleRequest chan muxerHandleRequestReq chGetMuxer chan serverGetMuxerReq
chCloseMuxer chan *muxer chCloseMuxer chan *muxer
chAPIMuxerList chan serverAPIMuxersListReq chAPIMuxerList chan serverAPIMuxersListReq
chAPIMuxerGet chan serverAPIMuxersGetReq chAPIMuxerGet chan serverAPIMuxersGetReq
} }
// Initialize initializes the server. // Initialize initializes the server.
@ -85,7 +95,7 @@ func (s *Server) Initialize() error {
s.muxers = make(map[string]*muxer) s.muxers = make(map[string]*muxer)
s.chPathReady = make(chan defs.Path) s.chPathReady = make(chan defs.Path)
s.chPathNotReady = make(chan defs.Path) s.chPathNotReady = make(chan defs.Path)
s.chHandleRequest = make(chan muxerHandleRequestReq) s.chGetMuxer = make(chan serverGetMuxerReq)
s.chCloseMuxer = make(chan *muxer) s.chCloseMuxer = make(chan *muxer)
s.chAPIMuxerList = make(chan serverAPIMuxersListReq) s.chAPIMuxerList = make(chan serverAPIMuxersListReq)
s.chAPIMuxerGet = make(chan serverAPIMuxersGetReq) s.chAPIMuxerGet = make(chan serverAPIMuxersGetReq)
@ -147,15 +157,15 @@ outer:
delete(s.muxers, pa.Name()) delete(s.muxers, pa.Name())
} }
case req := <-s.chHandleRequest: case req := <-s.chGetMuxer:
r, ok := s.muxers[req.path] mux, ok := s.muxers[req.path]
switch { switch {
case ok: case ok:
r.processRequest(&req) req.res <- serverGetMuxerRes{muxer: mux}
case !s.AlwaysRemux:
req.res <- serverGetMuxerRes{muxer: s.createMuxer(req.path, req.remoteAddr)}
default: default:
r := s.createMuxer(req.path, httpp.RemoteAddr(req.ctx)) req.res <- serverGetMuxerRes{err: fmt.Errorf("muxer is waiting to be created")}
r.processRequest(&req)
} }
case c := <-s.chCloseMuxer: case c := <-s.chCloseMuxer:
@ -230,18 +240,16 @@ func (s *Server) closeMuxer(c *muxer) {
} }
} }
func (s *Server) handleRequest(req muxerHandleRequestReq) { func (s *Server) getMuxer(req serverGetMuxerReq) (*muxer, error) {
req.res = make(chan *muxer) req.res = make(chan serverGetMuxerRes)
select { select {
case s.chHandleRequest <- req: case s.chGetMuxer <- req:
muxer := <-req.res res := <-req.res
if muxer != nil { return res.muxer, res.err
req.ctx.Request.URL.Path = req.file
muxer.handleRequest(req.ctx)
}
case <-s.ctx.Done(): case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
} }
} }

Loading…
Cancel
Save