Browse Source

fix multiple freezes

pull/483/head
aler9 5 years ago
parent
commit
fd27ed941e
  1. 22
      internal/core/path.go
  2. 52
      internal/core/path_manager.go

22
internal/core/path.go

@ -100,6 +100,7 @@ type pathPublisherRemoveReq struct { @@ -100,6 +100,7 @@ type pathPublisherRemoveReq struct {
}
type pathDescribeRes struct {
Path *path
Stream *gortsplib.ServerStream
Redirect string
Err error
@ -881,12 +882,13 @@ func (pa *path) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) { @@ -881,12 +882,13 @@ func (pa *path) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
}
}
// OnDescribe is called by pathManager (asynchronous).
func (pa *path) OnDescribe(req pathDescribeReq) {
// OnDescribe is called by a reader or publisher through pathManager.
func (pa *path) OnDescribe(req pathDescribeReq) pathDescribeRes {
select {
case pa.describe <- req:
return <-req.Res
case <-pa.ctx.Done():
req.Res <- pathDescribeRes{Err: fmt.Errorf("terminated")}
return pathDescribeRes{Err: fmt.Errorf("terminated")}
}
}
@ -900,12 +902,13 @@ func (pa *path) OnPublisherRemove(req pathPublisherRemoveReq) { @@ -900,12 +902,13 @@ func (pa *path) OnPublisherRemove(req pathPublisherRemoveReq) {
}
}
// OnPublisherAnnounce is called by pathManager (asynchronous).
func (pa *path) OnPublisherAnnounce(req pathPublisherAnnounceReq) {
// OnPublisherAnnounce is called by a publisher through pathManager.
func (pa *path) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
select {
case pa.publisherAnnounce <- req:
return <-req.Res
case <-pa.ctx.Done():
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
return pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
}
}
@ -940,12 +943,13 @@ func (pa *path) OnReaderRemove(req pathReaderRemoveReq) { @@ -940,12 +943,13 @@ func (pa *path) OnReaderRemove(req pathReaderRemoveReq) {
}
}
// OnReaderSetupPlay is called by pathManager (asynchronous).
func (pa *path) OnReaderSetupPlay(req pathReaderSetupPlayReq) {
// OnReaderSetupPlay is called by a reader through pathManager.
func (pa *path) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
select {
case pa.readerSetupPlay <- req:
return <-req.Res
case <-pa.ctx.Done():
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
return pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
}
}

52
internal/core/path_manager.go

@ -76,7 +76,11 @@ func newPathManager( @@ -76,7 +76,11 @@ func newPathManager(
hlsServerSet: make(chan *hlsServer),
}
pm.createPaths()
for pathName, pathConf := range pm.pathConfs {
if pathConf.Regexp == nil {
pm.createPath(pathName, pathConf, pathName)
}
}
pm.wg.Add(1)
go pm.run()
@ -131,8 +135,12 @@ outer: @@ -131,8 +135,12 @@ outer:
}
}
// add paths
pm.createPaths()
// add new paths
for pathName, pathConf := range pm.pathConfs {
if _, ok := pm.paths[pathName]; !ok && pathConf.Regexp == nil {
pm.createPath(pathName, pathConf, pathName)
}
}
case pa := <-pm.pathClose:
if pmpa, ok := pm.paths[pa.Name()]; !ok || pmpa != pa {
@ -171,7 +179,7 @@ outer: @@ -171,7 +179,7 @@ outer:
pm.createPath(pathName, pathConf, req.PathName)
}
pm.paths[req.PathName].OnDescribe(req)
req.Res <- pathDescribeRes{Path: pm.paths[req.PathName]}
case req := <-pm.readerSetupPlay:
pathName, pathConf, err := pm.findPathConf(req.PathName)
@ -198,7 +206,7 @@ outer: @@ -198,7 +206,7 @@ outer:
pm.createPath(pathName, pathConf, req.PathName)
}
pm.paths[req.PathName].OnReaderSetupPlay(req)
req.Res <- pathReaderSetupPlayRes{Path: pm.paths[req.PathName]}
case req := <-pm.publisherAnnounce:
pathName, pathConf, err := pm.findPathConf(req.PathName)
@ -225,7 +233,7 @@ outer: @@ -225,7 +233,7 @@ outer:
pm.createPath(pathName, pathConf, req.PathName)
}
pm.paths[req.PathName].OnPublisherAnnounce(req)
req.Res <- pathPublisherAnnounceRes{Path: pm.paths[req.PathName]}
case s := <-pm.hlsServerSet:
pm.hlsServer = s
@ -254,14 +262,6 @@ func (pm *pathManager) createPath(confName string, conf *conf.PathConf, name str @@ -254,14 +262,6 @@ func (pm *pathManager) createPath(confName string, conf *conf.PathConf, name str
pm)
}
func (pm *pathManager) createPaths() {
for pathName, pathConf := range pm.pathConfs {
if _, ok := pm.paths[pathName]; !ok && pathConf.Regexp == nil {
pm.createPath(pathName, pathConf, pathName)
}
}
}
func (pm *pathManager) findPathConf(name string) (string, *conf.PathConf, error) {
err := conf.CheckPathName(name)
if err != nil {
@ -343,7 +343,13 @@ func (pm *pathManager) OnDescribe(req pathDescribeReq) pathDescribeRes { @@ -343,7 +343,13 @@ func (pm *pathManager) OnDescribe(req pathDescribeReq) pathDescribeRes {
req.Res = make(chan pathDescribeRes)
select {
case pm.describe <- req:
return <-req.Res
res := <-req.Res
if res.Err != nil {
return res
}
return res.Path.OnDescribe(req)
case <-pm.ctx.Done():
return pathDescribeRes{Err: fmt.Errorf("terminated")}
}
@ -354,7 +360,13 @@ func (pm *pathManager) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPub @@ -354,7 +360,13 @@ func (pm *pathManager) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPub
req.Res = make(chan pathPublisherAnnounceRes)
select {
case pm.publisherAnnounce <- req:
return <-req.Res
res := <-req.Res
if res.Err != nil {
return res
}
return res.Path.OnPublisherAnnounce(req)
case <-pm.ctx.Done():
return pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
}
@ -365,7 +377,13 @@ func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderS @@ -365,7 +377,13 @@ func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderS
req.Res = make(chan pathReaderSetupPlayRes)
select {
case pm.readerSetupPlay <- req:
return <-req.Res
res := <-req.Res
if res.Err != nil {
return res
}
return res.Path.OnReaderSetupPlay(req)
case <-pm.ctx.Done():
return pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
}

Loading…
Cancel
Save