Browse Source

speed up hls remuxing by linking together pathManager and hlsServer

pull/483/head
aler9 5 years ago
parent
commit
82e65230fd
  1. 47
      internal/core/core.go
  2. 12
      internal/core/hls_remuxer.go
  3. 12
      internal/core/hls_server.go
  4. 37
      internal/core/path_manager.go
  5. 10
      internal/core/rtmp_conn.go
  6. 8
      internal/core/rtmp_server.go
  7. 8
      internal/core/rtsp_conn.go
  8. 10
      internal/core/rtsp_server.go
  9. 10
      internal/core/rtsp_session.go

47
internal/core/core.go

@ -28,16 +28,13 @@ type Core struct { @@ -28,16 +28,13 @@ type Core struct {
logger *logger.Logger
metrics *metrics
pprof *pprof
pathMan *pathManager
pathManager *pathManager
rtspServerPlain *rtspServer
rtspServerTLS *rtspServer
rtmpServer *rtmpServer
hlsServer *hlsServer
confWatcher *confwatcher.ConfWatcher
// in
pathSourceReady chan *path
// out
done chan struct{}
}
@ -65,11 +62,10 @@ func New(args []string) (*Core, bool) { @@ -65,11 +62,10 @@ func New(args []string) (*Core, bool) {
ctx, ctxCancel := context.WithCancel(context.Background())
p := &Core{
ctx: ctx,
ctxCancel: ctxCancel,
confPath: *argConfPath,
pathSourceReady: make(chan *path),
done: make(chan struct{}),
ctx: ctx,
ctxCancel: ctxCancel,
confPath: *argConfPath,
done: make(chan struct{}),
}
var err error
@ -140,11 +136,6 @@ outer: @@ -140,11 +136,6 @@ outer:
break outer
}
case pa := <-p.pathSourceReady:
if p.hlsServer != nil {
p.hlsServer.OnPathSourceReady(pa)
}
case <-p.ctx.Done():
break outer
}
@ -206,8 +197,8 @@ func (p *Core) createResources(initial bool) error { @@ -206,8 +197,8 @@ func (p *Core) createResources(initial bool) error {
}
}
if p.pathMan == nil {
p.pathMan = newPathManager(
if p.pathManager == nil {
p.pathManager = newPathManager(
p.ctx,
p.conf.RTSPAddress,
p.conf.ReadTimeout,
@ -248,7 +239,7 @@ func (p *Core) createResources(initial bool) error { @@ -248,7 +239,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.stats,
p.pathMan,
p.pathManager,
p)
if err != nil {
return err
@ -282,7 +273,7 @@ func (p *Core) createResources(initial bool) error { @@ -282,7 +273,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.stats,
p.pathMan,
p.pathManager,
p)
if err != nil {
return err
@ -302,7 +293,7 @@ func (p *Core) createResources(initial bool) error { @@ -302,7 +293,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.stats,
p.pathMan,
p.pathManager,
p)
if err != nil {
return err
@ -321,7 +312,7 @@ func (p *Core) createResources(initial bool) error { @@ -321,7 +312,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.HLSAllowOrigin,
p.conf.ReadBufferCount,
p.stats,
p.pathMan,
p.pathManager,
p)
if err != nil {
return err
@ -372,7 +363,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -372,7 +363,7 @@ func (p *Core) closeResources(newConf *conf.Conf) {
closeStats {
closePathMan = true
} else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.pathMan.OnConfReload(newConf.Paths)
p.pathManager.OnConfReload(newConf.Paths)
}
closeServerPlain := false
@ -456,9 +447,9 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -456,9 +447,9 @@ func (p *Core) closeResources(newConf *conf.Conf) {
p.rtspServerPlain = nil
}
if closePathMan && p.pathMan != nil {
p.pathMan.close()
p.pathMan = nil
if closePathMan && p.pathManager != nil {
p.pathManager.close()
p.pathManager = nil
}
if closeServerHLS && p.hlsServer != nil {
@ -504,11 +495,3 @@ func (p *Core) reloadConf() error { @@ -504,11 +495,3 @@ func (p *Core) reloadConf() error {
p.conf = newConf
return p.createResources(false)
}
// OnPathSourceReady is called by pathManager.
func (p *Core) OnPathSourceReady(pa *path) {
select {
case p.pathSourceReady <- pa:
case <-p.done:
}
}

12
internal/core/hls_remuxer.go

@ -89,10 +89,6 @@ type hlsRemuxerTrackIDPayloadPair struct { @@ -89,10 +89,6 @@ type hlsRemuxerTrackIDPayloadPair struct {
buf []byte
}
type hlsRemuxerPathMan interface {
OnReadPublisherSetupPlay(readPublisherSetupPlayReq)
}
type hlsRemuxerParent interface {
Log(logger.Level, string, ...interface{})
OnRemuxerClose(*hlsRemuxer)
@ -106,7 +102,7 @@ type hlsRemuxer struct { @@ -106,7 +102,7 @@ type hlsRemuxer struct {
wg *sync.WaitGroup
stats *stats
pathName string
pathMan hlsRemuxerPathMan
pathManager *pathManager
parent hlsRemuxerParent
ctx context.Context
@ -129,7 +125,7 @@ func newHLSRemuxer( @@ -129,7 +125,7 @@ func newHLSRemuxer(
wg *sync.WaitGroup,
stats *stats,
pathName string,
pathMan hlsRemuxerPathMan,
pathManager *pathManager,
parent hlsRemuxerParent) *hlsRemuxer {
ctx, ctxCancel := context.WithCancel(parentCtx)
@ -141,7 +137,7 @@ func newHLSRemuxer( @@ -141,7 +137,7 @@ func newHLSRemuxer(
wg: wg,
stats: stats,
pathName: pathName,
pathMan: pathMan,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
@ -221,7 +217,7 @@ func (r *hlsRemuxer) run() { @@ -221,7 +217,7 @@ func (r *hlsRemuxer) run() {
func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
pres := make(chan readPublisherSetupPlayRes)
r.pathMan.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
r.pathManager.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
Author: r,
PathName: r.pathName,
IP: nil,

12
internal/core/hls_server.go

@ -24,7 +24,7 @@ type hlsServer struct { @@ -24,7 +24,7 @@ type hlsServer struct {
hlsAllowOrigin string
readBufferCount int
stats *stats
pathMan *pathManager
pathManager *pathManager
parent hlsServerParent
ctx context.Context
@ -48,7 +48,7 @@ func newHLSServer( @@ -48,7 +48,7 @@ func newHLSServer(
hlsAllowOrigin string,
readBufferCount int,
stats *stats,
pathMan *pathManager,
pathManager *pathManager,
parent hlsServerParent,
) (*hlsServer, error) {
ln, err := net.Listen("tcp", address)
@ -65,7 +65,7 @@ func newHLSServer( @@ -65,7 +65,7 @@ func newHLSServer(
hlsAllowOrigin: hlsAllowOrigin,
readBufferCount: readBufferCount,
stats: stats,
pathMan: pathMan,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
@ -81,6 +81,8 @@ func newHLSServer( @@ -81,6 +81,8 @@ func newHLSServer(
s.wg.Add(1)
go s.run()
s.pathManager.OnHLSServer(s)
return s, nil
}
@ -130,6 +132,8 @@ outer: @@ -130,6 +132,8 @@ outer:
}
hs.Shutdown(context.Background())
s.pathManager.OnHLSServer(nil)
}
// ServeHTTP implements http.Handler.
@ -215,7 +219,7 @@ func (s *hlsServer) createRemuxer(pathName string) *hlsRemuxer { @@ -215,7 +219,7 @@ func (s *hlsServer) createRemuxer(pathName string) *hlsRemuxer {
&s.wg,
s.stats,
pathName,
s.pathMan,
s.pathManager,
s)
s.remuxers[pathName] = r
}

37
internal/core/path_manager.go

@ -16,7 +16,6 @@ import ( @@ -16,7 +16,6 @@ import (
type pathManagerParent interface {
Log(logger.Level, string, ...interface{})
OnPathSourceReady(*path)
}
type pathManager struct {
@ -33,14 +32,17 @@ type pathManager struct { @@ -33,14 +32,17 @@ type pathManager struct {
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
hlsServer *hlsServer
paths map[string]*path
// in
confReload chan map[string]*conf.PathConf
pathClose chan *path
rpDescribe chan readPublisherDescribeReq
rpSetupPlay chan readPublisherSetupPlayReq
rpAnnounce chan readPublisherAnnounceReq
confReload chan map[string]*conf.PathConf
pathClose chan *path
pathSourceReady chan *path
rpDescribe chan readPublisherDescribeReq
rpSetupPlay chan readPublisherSetupPlayReq
rpAnnounce chan readPublisherAnnounceReq
hlsServerSet chan *hlsServer
}
func newPathManager(
@ -71,9 +73,11 @@ func newPathManager( @@ -71,9 +73,11 @@ func newPathManager(
paths: make(map[string]*path),
confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path),
pathSourceReady: make(chan *path),
rpDescribe: make(chan readPublisherDescribeReq),
rpSetupPlay: make(chan readPublisherSetupPlayReq),
rpAnnounce: make(chan readPublisherAnnounceReq),
hlsServerSet: make(chan *hlsServer),
}
pm.createPaths()
@ -141,6 +145,11 @@ outer: @@ -141,6 +145,11 @@ outer:
delete(pm.paths, pa.Name())
pa.Close()
case pa := <-pm.pathSourceReady:
if pm.hlsServer != nil {
pm.hlsServer.OnPathSourceReady(pa)
}
case req := <-pm.rpDescribe:
pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil {
@ -222,6 +231,9 @@ outer: @@ -222,6 +231,9 @@ outer:
pm.paths[req.PathName].OnPathManAnnounce(req)
case s := <-pm.hlsServerSet:
pm.hlsServer = s
case <-pm.ctx.Done():
break outer
}
@ -285,7 +297,10 @@ func (pm *pathManager) OnConfReload(pathConfs map[string]*conf.PathConf) { @@ -285,7 +297,10 @@ func (pm *pathManager) OnConfReload(pathConfs map[string]*conf.PathConf) {
// OnPathSourceReady is called by path.
func (pm *pathManager) OnPathSourceReady(pa *path) {
pm.parent.OnPathSourceReady(pa)
select {
case pm.pathSourceReady <- pa:
case <-pm.ctx.Done():
}
}
// OnPathClose is called by path.
@ -323,6 +338,14 @@ func (pm *pathManager) OnReadPublisherSetupPlay(req readPublisherSetupPlayReq) { @@ -323,6 +338,14 @@ func (pm *pathManager) OnReadPublisherSetupPlay(req readPublisherSetupPlayReq) {
}
}
// OnHLSServer is called by hlsServer.
func (pm *pathManager) OnHLSServer(s *hlsServer) {
select {
case pm.hlsServerSet <- s:
case <-pm.ctx.Done():
}
}
func (pm *pathManager) authenticate(
ip net.IP,
validateCredentials func(authMethods []headers.AuthMethod, pathUser string, pathPass string) error,

10
internal/core/rtmp_conn.go

@ -68,7 +68,7 @@ type rtmpConn struct { @@ -68,7 +68,7 @@ type rtmpConn struct {
wg *sync.WaitGroup
stats *stats
conn *rtmp.Conn
pathMan rtmpConnPathMan
pathManager rtmpConnPathMan
parent rtmpConnParent
ctx context.Context
@ -88,7 +88,7 @@ func newRTMPConn( @@ -88,7 +88,7 @@ func newRTMPConn(
wg *sync.WaitGroup,
stats *stats,
nconn net.Conn,
pathMan rtmpConnPathMan,
pathManager rtmpConnPathMan,
parent rtmpConnParent) *rtmpConn {
ctx, ctxCancel := context.WithCancel(parentCtx)
@ -102,7 +102,7 @@ func newRTMPConn( @@ -102,7 +102,7 @@ func newRTMPConn(
wg: wg,
stats: stats,
conn: rtmp.NewServerConn(nconn),
pathMan: pathMan,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
@ -208,7 +208,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -208,7 +208,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL())
sres := make(chan readPublisherSetupPlayRes)
c.pathMan.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
c.pathManager.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
Author: c,
PathName: pathName,
IP: c.ip(),
@ -406,7 +406,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { @@ -406,7 +406,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL())
resc := make(chan readPublisherAnnounceRes)
c.pathMan.OnReadPublisherAnnounce(readPublisherAnnounceReq{
c.pathManager.OnReadPublisherAnnounce(readPublisherAnnounceReq{
Author: c,
PathName: pathName,
Tracks: tracks,

8
internal/core/rtmp_server.go

@ -21,7 +21,7 @@ type rtmpServer struct { @@ -21,7 +21,7 @@ type rtmpServer struct {
runOnConnect string
runOnConnectRestart bool
stats *stats
pathMan *pathManager
pathManager *pathManager
parent rtmpServerParent
ctx context.Context
@ -44,7 +44,7 @@ func newRTMPServer( @@ -44,7 +44,7 @@ func newRTMPServer(
runOnConnect string,
runOnConnectRestart bool,
stats *stats,
pathMan *pathManager,
pathManager *pathManager,
parent rtmpServerParent) (*rtmpServer, error) {
l, err := net.Listen("tcp", address)
if err != nil {
@ -61,7 +61,7 @@ func newRTMPServer( @@ -61,7 +61,7 @@ func newRTMPServer(
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
stats: stats,
pathMan: pathMan,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
@ -135,7 +135,7 @@ outer: @@ -135,7 +135,7 @@ outer:
&s.wg,
s.stats,
nconn,
s.pathMan,
s.pathManager,
s)
s.conns[c] = struct{}{}

8
internal/core/rtsp_conn.go

@ -43,7 +43,7 @@ type rtspConn struct { @@ -43,7 +43,7 @@ type rtspConn struct {
readTimeout time.Duration
runOnConnect string
runOnConnectRestart bool
pathMan rtspConnPathMan
pathManager rtspConnPathMan
stats *stats
conn *gortsplib.ServerConn
parent rtspConnParent
@ -60,7 +60,7 @@ func newRTSPConn( @@ -60,7 +60,7 @@ func newRTSPConn(
readTimeout time.Duration,
runOnConnect string,
runOnConnectRestart bool,
pathMan rtspConnPathMan,
pathManager rtspConnPathMan,
stats *stats,
conn *gortsplib.ServerConn,
parent rtspConnParent) *rtspConn {
@ -69,7 +69,7 @@ func newRTSPConn( @@ -69,7 +69,7 @@ func newRTSPConn(
readTimeout: readTimeout,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
pathMan: pathMan,
pathManager: pathManager,
stats: stats,
conn: conn,
parent: parent,
@ -127,7 +127,7 @@ func (c *rtspConn) OnResponse(res *base.Response) { @@ -127,7 +127,7 @@ func (c *rtspConn) OnResponse(res *base.Response) {
// OnDescribe is called by rtspServer.
func (c *rtspConn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
resc := make(chan readPublisherDescribeRes)
c.pathMan.OnReadPublisherDescribe(readPublisherDescribeReq{
c.pathManager.OnReadPublisherDescribe(readPublisherDescribeReq{
PathName: ctx.Path,
URL: ctx.Req.URL,
IP: c.ip(),

10
internal/core/rtsp_server.go

@ -52,7 +52,7 @@ type rtspServer struct { @@ -52,7 +52,7 @@ type rtspServer struct {
runOnConnect string
runOnConnectRestart bool
stats *stats
pathMan *pathManager
pathManager *pathManager
parent rtspServerParent
ctx context.Context
@ -86,7 +86,7 @@ func newRTSPServer( @@ -86,7 +86,7 @@ func newRTSPServer(
runOnConnect string,
runOnConnectRestart bool,
stats *stats,
pathMan *pathManager,
pathManager *pathManager,
parent rtspServerParent) (*rtspServer, error) {
ctx, ctxCancel := context.WithCancel(parentCtx)
@ -96,7 +96,7 @@ func newRTSPServer( @@ -96,7 +96,7 @@ func newRTSPServer(
rtspAddress: rtspAddress,
protocols: protocols,
stats: stats,
pathMan: pathMan,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
@ -206,7 +206,7 @@ func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { @@ -206,7 +206,7 @@ func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
s.readTimeout,
s.runOnConnect,
s.runOnConnectRestart,
s.pathMan,
s.pathManager,
s.stats,
ctx.Conn,
s)
@ -258,7 +258,7 @@ func (s *rtspServer) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) @@ -258,7 +258,7 @@ func (s *rtspServer) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx)
visualID,
ctx.Session,
ctx.Conn,
s.pathMan,
s.pathManager,
s)
s.sessions[ctx.Session] = se

10
internal/core/rtsp_session.go

@ -33,7 +33,7 @@ type rtspSession struct { @@ -33,7 +33,7 @@ type rtspSession struct {
protocols map[conf.Protocol]struct{}
visualID string
ss *gortsplib.ServerSession
pathMan rtspSessionPathMan
pathManager rtspSessionPathMan
parent rtspSessionParent
path readPublisherPath
@ -47,14 +47,14 @@ func newRTSPSession( @@ -47,14 +47,14 @@ func newRTSPSession(
visualID string,
ss *gortsplib.ServerSession,
sc *gortsplib.ServerConn,
pathMan rtspSessionPathMan,
pathManager rtspSessionPathMan,
parent rtspSessionParent) *rtspSession {
s := &rtspSession{
rtspAddress: rtspAddress,
protocols: protocols,
visualID: visualID,
ss: ss,
pathMan: pathMan,
pathManager: pathManager,
parent: parent,
}
@ -117,7 +117,7 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{} @@ -117,7 +117,7 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{}
// OnAnnounce is called by rtspServer.
func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
resc := make(chan readPublisherAnnounceRes)
s.pathMan.OnReadPublisherAnnounce(readPublisherAnnounceReq{
s.pathManager.OnReadPublisherAnnounce(readPublisherAnnounceReq{
Author: s,
PathName: ctx.Path,
Tracks: ctx.Tracks,
@ -179,7 +179,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -179,7 +179,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
switch s.ss.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play
resc := make(chan readPublisherSetupPlayRes)
s.pathMan.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
s.pathManager.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
Author: s,
PathName: ctx.Path,
IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,

Loading…
Cancel
Save