Browse Source

cleanup

pull/509/head
aler9 4 years ago
parent
commit
e57177ef49
  1. 34
      internal/core/api.go
  2. 11
      internal/core/hls_remuxer.go
  3. 9
      internal/core/hls_server.go
  4. 84
      internal/core/path.go
  5. 12
      internal/core/path_manager.go
  6. 9
      internal/core/rtmp_conn.go
  7. 9
      internal/core/rtsp_session.go

34
internal/core/api.go

@ -218,6 +218,20 @@ type apiRTMPConnsKickReq struct { @@ -218,6 +218,20 @@ type apiRTMPConnsKickReq struct {
Res chan apiRTMPConnsKickRes
}
type apiPathManager interface {
OnAPIPathsList(req apiPathsListReq1) apiPathsListRes1
}
type apiRTSPServer interface {
OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes
OnAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSessionsKickRes
}
type apiRTMPServer interface {
OnAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes
OnAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes
}
type apiParent interface {
Log(logger.Level, string, ...interface{})
OnAPIConfigSet(conf *conf.Conf)
@ -225,10 +239,10 @@ type apiParent interface { @@ -225,10 +239,10 @@ type apiParent interface {
type api struct {
conf *conf.Conf
pathManager *pathManager
rtspServer *rtspServer
rtspsServer *rtspServer
rtmpServer *rtmpServer
pathManager apiPathManager
rtspServer apiRTSPServer
rtspsServer apiRTSPServer
rtmpServer apiRTMPServer
parent apiParent
mutex sync.Mutex
@ -238,10 +252,10 @@ type api struct { @@ -238,10 +252,10 @@ type api struct {
func newAPI(
address string,
conf *conf.Conf,
pathManager *pathManager,
rtspServer *rtspServer,
rtspsServer *rtspServer,
rtmpServer *rtmpServer,
pathManager apiPathManager,
rtspServer apiRTSPServer,
rtspsServer apiRTSPServer,
rtmpServer apiRTMPServer,
parent apiParent,
) (*api, error) {
ln, err := net.Listen("tcp", address)
@ -506,7 +520,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) { @@ -506,7 +520,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) {
Items: make(map[string]apiRTSPSessionsListItem),
}
if a.rtspServer != nil {
if !reflect.ValueOf(a.rtspServer).IsNil() {
res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
@ -514,7 +528,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) { @@ -514,7 +528,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) {
}
}
if a.rtspsServer != nil {
if !reflect.ValueOf(a.rtspsServer).IsNil() {
res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)

11
internal/core/hls_remuxer.go

@ -89,6 +89,10 @@ type hlsRemuxerTrackIDPayloadPair struct { @@ -89,6 +89,10 @@ type hlsRemuxerTrackIDPayloadPair struct {
buf []byte
}
type hlsRemuxerPathManager interface {
OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
}
type hlsRemuxerParent interface {
Log(logger.Level, string, ...interface{})
OnRemuxerClose(*hlsRemuxer)
@ -100,9 +104,8 @@ type hlsRemuxer struct { @@ -100,9 +104,8 @@ type hlsRemuxer struct {
hlsSegmentDuration time.Duration
readBufferCount int
wg *sync.WaitGroup
stats *stats
pathName string
pathManager *pathManager
pathManager hlsRemuxerPathManager
parent hlsRemuxerParent
ctx context.Context
@ -124,9 +127,8 @@ func newHLSRemuxer( @@ -124,9 +127,8 @@ func newHLSRemuxer(
hlsSegmentDuration time.Duration,
readBufferCount int,
wg *sync.WaitGroup,
stats *stats,
pathName string,
pathManager *pathManager,
pathManager hlsRemuxerPathManager,
parent hlsRemuxerParent) *hlsRemuxer {
ctx, ctxCancel := context.WithCancel(parentCtx)
@ -136,7 +138,6 @@ func newHLSRemuxer( @@ -136,7 +138,6 @@ func newHLSRemuxer(
hlsSegmentDuration: hlsSegmentDuration,
readBufferCount: readBufferCount,
wg: wg,
stats: stats,
pathName: pathName,
pathManager: pathManager,
parent: parent,

9
internal/core/hls_server.go

@ -36,7 +36,7 @@ type hlsServer struct { @@ -36,7 +36,7 @@ type hlsServer struct {
// in
pathSourceReady chan *path
request chan hlsRemuxerRequest
connClose chan *hlsRemuxer
remuxerClose chan *hlsRemuxer
}
func newHLSServer(
@ -73,7 +73,7 @@ func newHLSServer( @@ -73,7 +73,7 @@ func newHLSServer(
remuxers: make(map[string]*hlsRemuxer),
pathSourceReady: make(chan *path),
request: make(chan hlsRemuxerRequest),
connClose: make(chan *hlsRemuxer),
remuxerClose: make(chan *hlsRemuxer),
}
s.Log(logger.Info, "listener opened on "+address)
@ -115,7 +115,7 @@ outer: @@ -115,7 +115,7 @@ outer:
r := s.findOrCreateRemuxer(req.Dir)
r.OnRequest(req)
case c := <-s.connClose:
case c := <-s.remuxerClose:
if c2, ok := s.remuxers[c.PathName()]; !ok || c2 != c {
continue
}
@ -226,7 +226,6 @@ func (s *hlsServer) findOrCreateRemuxer(pathName string) *hlsRemuxer { @@ -226,7 +226,6 @@ func (s *hlsServer) findOrCreateRemuxer(pathName string) *hlsRemuxer {
s.hlsSegmentDuration,
s.readBufferCount,
&s.wg,
s.stats,
pathName,
s.pathManager,
s)
@ -243,7 +242,7 @@ func (s *hlsServer) doRemuxerClose(c *hlsRemuxer) { @@ -243,7 +242,7 @@ func (s *hlsServer) doRemuxerClose(c *hlsRemuxer) {
// OnRemuxerClose is called by hlsRemuxer.
func (s *hlsServer) OnRemuxerClose(c *hlsRemuxer) {
select {
case s.connClose <- c:
case s.remuxerClose <- c:
case <-s.ctx.Done():
}
}

84
internal/core/path.go

@ -366,60 +366,42 @@ outer: @@ -366,60 +366,42 @@ outer:
}
case req := <-pa.describe:
pa.onDescribe(req)
pa.handleDescribe(req)
case req := <-pa.publisherRemove:
pa.onPublisherRemove(req)
pa.handlePublisherRemove(req)
if pa.source == nil && pa.conf.Regexp != nil {
break outer
}
case req := <-pa.publisherAnnounce:
pa.onPublisherAnnounce(req)
pa.handlePublisherAnnounce(req)
case req := <-pa.publisherRecord:
pa.onPublisherRecord(req)
pa.handlePublisherRecord(req)
case req := <-pa.publisherPause:
pa.onPublisherPause(req)
pa.handlePublisherPause(req)
if pa.source == nil && pa.conf.Regexp != nil {
break outer
}
case req := <-pa.readerRemove:
pa.onReaderRemove(req)
pa.handleReaderRemove(req)
case req := <-pa.readerSetupPlay:
pa.onReaderSetupPlay(req)
pa.handleReaderSetupPlay(req)
case req := <-pa.readerPlay:
pa.onReaderPlay(req)
pa.handleReaderPlay(req)
case req := <-pa.readerPause:
pa.onReaderPause(req)
pa.handleReaderPause(req)
case req := <-pa.apiPathsList:
req.Data.Items[pa.name] = apiPathsItem{
ConfName: pa.confName,
Conf: pa.conf,
Source: func() interface{} {
if pa.source == nil {
return nil
}
return pa.source.OnSourceAPIDescribe()
}(),
SourceReady: pa.sourceReady,
Readers: func() []interface{} {
ret := []interface{}{}
for r := range pa.readers {
ret = append(ret, r.OnReaderAPIDescribe())
}
return ret
}(),
}
req.Res <- apiPathsListRes2{}
pa.handleAPIPathsList(req)
case <-pa.ctx.Done():
break outer
@ -558,7 +540,7 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) { @@ -558,7 +540,7 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
pa.onReaderSetupPlayPost(req)
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequests = nil
@ -646,7 +628,7 @@ func (pa *path) doPublisherRemove() { @@ -646,7 +628,7 @@ func (pa *path) doPublisherRemove() {
}
}
func (pa *path) onDescribe(req pathDescribeReq) {
func (pa *path) handleDescribe(req pathDescribeReq) {
if _, ok := pa.source.(*sourceRedirect); ok {
req.Res <- pathDescribeRes{
Redirect: pa.conf.SourceRedirect,
@ -689,14 +671,14 @@ func (pa *path) onDescribe(req pathDescribeReq) { @@ -689,14 +671,14 @@ func (pa *path) onDescribe(req pathDescribeReq) {
req.Res <- pathDescribeRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
}
func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) {
func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) {
if pa.source == req.Author {
pa.doPublisherRemove()
}
close(req.Res)
}
func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) {
func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
if pa.source != nil {
if pa.hasStaticSource() {
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("path '%s' is assigned to a static source", pa.name)}
@ -718,7 +700,7 @@ func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) { @@ -718,7 +700,7 @@ func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) {
req.Res <- pathPublisherAnnounceRes{Path: pa}
}
func (pa *path) onPublisherRecord(req pathPublisherRecordReq) {
func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
if pa.source != req.Author {
req.Res <- pathPublisherRecordRes{Err: fmt.Errorf("publisher is not assigned to this path anymore")}
return
@ -741,7 +723,7 @@ func (pa *path) onPublisherRecord(req pathPublisherRecordReq) { @@ -741,7 +723,7 @@ func (pa *path) onPublisherRecord(req pathPublisherRecordReq) {
req.Res <- pathPublisherRecordRes{Stream: pa.stream}
}
func (pa *path) onPublisherPause(req pathPublisherPauseReq) {
func (pa *path) handlePublisherPause(req pathPublisherPauseReq) {
if req.Author == pa.source && pa.sourceReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
@ -754,7 +736,7 @@ func (pa *path) onPublisherPause(req pathPublisherPauseReq) { @@ -754,7 +736,7 @@ func (pa *path) onPublisherPause(req pathPublisherPauseReq) {
close(req.Res)
}
func (pa *path) onReaderRemove(req pathReaderRemoveReq) {
func (pa *path) handleReaderRemove(req pathReaderRemoveReq) {
if _, ok := pa.readers[req.Author]; ok {
pa.doReaderRemove(req.Author)
}
@ -767,9 +749,9 @@ func (pa *path) onReaderRemove(req pathReaderRemoveReq) { @@ -767,9 +749,9 @@ func (pa *path) onReaderRemove(req pathReaderRemoveReq) {
}
}
func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) {
func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
if pa.sourceReady {
pa.onReaderSetupPlayPost(req)
pa.handleReaderSetupPlayPost(req)
return
}
@ -784,7 +766,7 @@ func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) { @@ -784,7 +766,7 @@ func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) {
req.Res <- pathReaderSetupPlayRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
}
func (pa *path) onReaderSetupPlayPost(req pathReaderSetupPlayReq) {
func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) {
pa.readers[req.Author] = pathReaderStatePrePlay
if pa.isOnDemand() && pa.onDemandState == pathOnDemandStateClosing {
@ -799,7 +781,7 @@ func (pa *path) onReaderSetupPlayPost(req pathReaderSetupPlayReq) { @@ -799,7 +781,7 @@ func (pa *path) onReaderSetupPlayPost(req pathReaderSetupPlayReq) {
}
}
func (pa *path) onReaderPlay(req pathReaderPlayReq) {
func (pa *path) handleReaderPlay(req pathReaderPlayReq) {
atomic.AddInt64(pa.stats.CountReaders, 1)
pa.readers[req.Author] = pathReaderStatePlay
@ -810,7 +792,7 @@ func (pa *path) onReaderPlay(req pathReaderPlayReq) { @@ -810,7 +792,7 @@ func (pa *path) onReaderPlay(req pathReaderPlayReq) {
close(req.Res)
}
func (pa *path) onReaderPause(req pathReaderPauseReq) {
func (pa *path) handleReaderPause(req pathReaderPauseReq) {
if state, ok := pa.readers[req.Author]; ok && state == pathReaderStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readers[req.Author] = pathReaderStatePrePlay
@ -819,6 +801,28 @@ func (pa *path) onReaderPause(req pathReaderPauseReq) { @@ -819,6 +801,28 @@ func (pa *path) onReaderPause(req pathReaderPauseReq) {
close(req.Res)
}
func (pa *path) handleAPIPathsList(req apiPathsListReq2) {
req.Data.Items[pa.name] = apiPathsItem{
ConfName: pa.confName,
Conf: pa.conf,
Source: func() interface{} {
if pa.source == nil {
return nil
}
return pa.source.OnSourceAPIDescribe()
}(),
SourceReady: pa.sourceReady,
Readers: func() []interface{} {
ret := []interface{}{}
for r := range pa.readers {
ret = append(ret, r.OnReaderAPIDescribe())
}
return ret
}(),
}
req.Res <- apiPathsListRes2{}
}
// OnSourceStaticSetReady is called by a sourceStatic.
func (pa *path) OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.Res = make(chan pathSourceStaticSetReadyRes)

12
internal/core/path_manager.go

@ -13,6 +13,10 @@ import ( @@ -13,6 +13,10 @@ import (
"github.com/aler9/rtsp-simple-server/internal/logger"
)
type pathManagerHLSServer interface {
OnPathSourceReady(pa *path)
}
type pathManagerParent interface {
Log(logger.Level, string, ...interface{})
}
@ -30,7 +34,7 @@ type pathManager struct { @@ -30,7 +34,7 @@ type pathManager struct {
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
hlsServer *hlsServer
hlsServer pathManagerHLSServer
paths map[string]*path
// in
@ -40,7 +44,7 @@ type pathManager struct { @@ -40,7 +44,7 @@ type pathManager struct {
describe chan pathDescribeReq
readerSetupPlay chan pathReaderSetupPlayReq
publisherAnnounce chan pathPublisherAnnounceReq
hlsServerSet chan *hlsServer
hlsServerSet chan pathManagerHLSServer
apiPathsList chan apiPathsListReq1
}
@ -74,7 +78,7 @@ func newPathManager( @@ -74,7 +78,7 @@ func newPathManager(
describe: make(chan pathDescribeReq),
readerSetupPlay: make(chan pathReaderSetupPlayReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq),
hlsServerSet: make(chan *hlsServer),
hlsServerSet: make(chan pathManagerHLSServer),
apiPathsList: make(chan apiPathsListReq1),
}
@ -403,7 +407,7 @@ func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderS @@ -403,7 +407,7 @@ func (pm *pathManager) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderS
}
// OnHLSServer is called by hlsServer.
func (pm *pathManager) OnHLSServer(s *hlsServer) {
func (pm *pathManager) OnHLSServer(s pathManagerHLSServer) {
select {
case pm.hlsServerSet <- s:
case <-pm.ctx.Done():

9
internal/core/rtmp_conn.go

@ -47,6 +47,11 @@ type rtmpConnTrackIDPayloadPair struct { @@ -47,6 +47,11 @@ type rtmpConnTrackIDPayloadPair struct {
buf []byte
}
type rtmpConnPathManager interface {
OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
}
type rtmpConnParent interface {
Log(logger.Level, string, ...interface{})
OnConnClose(*rtmpConn)
@ -63,7 +68,7 @@ type rtmpConn struct { @@ -63,7 +68,7 @@ type rtmpConn struct {
wg *sync.WaitGroup
stats *stats
conn *rtmp.Conn
pathManager *pathManager
pathManager rtmpConnPathManager
parent rtmpConnParent
ctx context.Context
@ -86,7 +91,7 @@ func newRTMPConn( @@ -86,7 +91,7 @@ func newRTMPConn(
wg *sync.WaitGroup,
stats *stats,
nconn net.Conn,
pathManager *pathManager,
pathManager rtmpConnPathManager,
parent rtmpConnParent) *rtmpConn {
ctx, ctxCancel := context.WithCancel(parentCtx)

9
internal/core/rtsp_session.go

@ -19,6 +19,11 @@ const ( @@ -19,6 +19,11 @@ const (
pauseAfterAuthError = 2 * time.Second
)
type rtspSessionPathManager interface {
OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
}
type rtspSessionParent interface {
Log(logger.Level, string, ...interface{})
}
@ -29,7 +34,7 @@ type rtspSession struct { @@ -29,7 +34,7 @@ type rtspSession struct {
id string
ss *gortsplib.ServerSession
author *gortsplib.ServerConn
pathManager *pathManager
pathManager rtspSessionPathManager
parent rtspSessionParent
path *path
@ -47,7 +52,7 @@ func newRTSPSession( @@ -47,7 +52,7 @@ func newRTSPSession(
id string,
ss *gortsplib.ServerSession,
sc *gortsplib.ServerConn,
pathManager *pathManager,
pathManager rtspSessionPathManager,
parent rtspSessionParent) *rtspSession {
s := &rtspSession{
rtspAddress: rtspAddress,

Loading…
Cancel
Save