diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 4b909bb0..fc5ac288 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -307,6 +307,12 @@ components: type: string enum: [idle, read, publish] + HLSMuxer: + type: object + properties: + lastRequest: + type: string + PathsList: type: object properties: @@ -339,6 +345,14 @@ components: additionalProperties: $ref: '#/components/schemas/RTMPConn' + HLSMuxersList: + type: object + properties: + items: + type: object + additionalProperties: + $ref: '#/components/schemas/HLSMuxer' + paths: /v1/config/get: get: @@ -573,3 +587,20 @@ paths: description: invalid request. '500': description: internal server error. + + /v1/hlsmuxers/list: + get: + operationId: hlsMuxersList + summary: returns all active HLS muxers. + description: '' + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/HLSMuxersList' + '400': + description: invalid request. + '500': + description: internal server error. diff --git a/internal/core/api.go b/internal/core/api.go index 8ef9b887..dba2530e 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -165,7 +165,7 @@ func loadConfPathData(ctx *gin.Context) (interface{}, error) { return in, err } -type apiPathsItem struct { +type apiPathsListItem struct { ConfName string `json:"confName"` Conf *conf.PathConf `json:"conf"` Source interface{} `json:"source"` @@ -174,20 +174,20 @@ type apiPathsItem struct { } type apiPathsListData struct { - Items map[string]apiPathsItem `json:"items"` + Items map[string]apiPathsListItem `json:"items"` } -type apiPathsListRes1 struct { +type apiPathsListRes struct { Data *apiPathsListData Paths map[string]*path Err error } -type apiPathsListReq1 struct { - Res chan apiPathsListRes1 +type apiPathsListReq struct { + Res chan apiPathsListRes } -type apiPathsListReq2 struct { +type apiPathsListSubReq struct { Data *apiPathsListData Res chan struct{} } @@ -243,8 +243,31 @@ type apiRTMPConnsKickReq struct { Res chan apiRTMPConnsKickRes } +type apiHLSMuxersListItem struct { + LastRequest string `json:"lastRequest"` +} + +type apiHLSMuxersListData struct { + Items map[string]apiHLSMuxersListItem `json:"items"` +} + +type apiHLSMuxersListRes struct { + Data *apiHLSMuxersListData + Muxers map[string]*hlsMuxer + Err error +} + +type apiHLSMuxersListReq struct { + Res chan apiHLSMuxersListRes +} + +type apiHLSMuxersListSubReq struct { + Data *apiHLSMuxersListData + Res chan struct{} +} + type apiPathManager interface { - onAPIPathsList(req apiPathsListReq1) apiPathsListRes1 + onAPIPathsList(req apiPathsListReq) apiPathsListRes } type apiRTSPServer interface { @@ -257,6 +280,10 @@ type apiRTMPServer interface { onAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes } +type apiHLSServer interface { + onAPIHLSMuxersList(req apiHLSMuxersListReq) apiHLSMuxersListRes +} + type apiParent interface { Log(logger.Level, string, ...interface{}) onAPIConfigSet(conf *conf.Conf) @@ -268,6 +295,7 @@ type api struct { rtspServer apiRTSPServer rtspsServer apiRTSPServer rtmpServer apiRTMPServer + hlsServer apiHLSServer parent apiParent mutex sync.Mutex @@ -281,6 +309,7 @@ func newAPI( rtspServer apiRTSPServer, rtspsServer apiRTSPServer, rtmpServer apiRTMPServer, + hlsServer apiHLSServer, parent apiParent, ) (*api, error) { ln, err := net.Listen("tcp", address) @@ -294,6 +323,7 @@ func newAPI( rtspServer: rtspServer, rtspsServer: rtspsServer, rtmpServer: rtmpServer, + hlsServer: hlsServer, parent: parent, } @@ -312,6 +342,7 @@ func newAPI( group.POST("/v1/rtspssessions/kick/:id", a.onRTSPSSessionsKick) group.GET("/v1/rtmpconns/list", a.onRTMPConnsList) group.POST("/v1/rtmpconns/kick/:id", a.onRTMPConnsKick) + group.GET("/v1/hlsmuxers/list", a.onHLSMuxersList) a.s = &http.Server{Handler: router} @@ -510,7 +541,7 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { } func (a *api) onPathsList(ctx *gin.Context) { - res := a.pathManager.onAPIPathsList(apiPathsListReq1{}) + res := a.pathManager.onAPIPathsList(apiPathsListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -598,13 +629,6 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, res.Data) } -// onConfReload is called by core. -func (a *api) onConfReload(conf *conf.Conf) { - a.mutex.Lock() - defer a.mutex.Unlock() - a.conf = conf -} - func (a *api) onRTMPConnsKick(ctx *gin.Context) { if interfaceIsEmpty(a.rtmpServer) { ctx.AbortWithStatus(http.StatusNotFound) @@ -621,3 +645,25 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } + +func (a *api) onHLSMuxersList(ctx *gin.Context) { + if interfaceIsEmpty(a.hlsServer) { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + res := a.hlsServer.onAPIHLSMuxersList(apiHLSMuxersListReq{}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + + ctx.JSON(http.StatusOK, res.Data) +} + +// onConfReload is called by core. +func (a *api) onConfReload(conf *conf.Conf) { + a.mutex.Lock() + defer a.mutex.Unlock() + a.conf = conf +} diff --git a/internal/core/api_test.go b/internal/core/api_test.go index dde4ee57..a6f1e45d 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -224,12 +224,20 @@ func TestAPIList(t *testing.T) { "rtsp", "rtsps", "rtmp", + "hls", } { t.Run(ca, func(t *testing.T) { - p, ok := newInstance("api: yes\n" + - "encryption: optional\n" + - "serverCert: " + serverCertFpath + "\n" + - "serverKey: " + serverKeyFpath + "\n") + conf := "api: yes\n" + + if ca == "rtsps" { + conf += "protocols: [tcp]\n" + conf += "encryption: strict\n" + } + + conf += "serverCert: " + serverCertFpath + "\n" + conf += "serverKey: " + serverKeyFpath + "\n" + + p, ok := newInstance(conf) require.Equal(t, true, ok) defer p.close() @@ -261,34 +269,66 @@ func TestAPIList(t *testing.T) { }) require.NoError(t, err) defer cnt1.close() + + case "hls": + source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath", + gortsplib.Tracks{track}) + require.NoError(t, err) + defer source.Close() + + func() { + res, err := http.Get("http://localhost:8888/mypath/index.m3u8") + require.NoError(t, err) + defer res.Body.Close() + require.Equal(t, 200, res.StatusCode) + }() } - var pa string switch ca { - case "rtsp": - pa = "rtspsessions" + case "rtsp", "rtsps", "rtmp": + var pa string + switch ca { + case "rtsp": + pa = "rtspsessions" + + case "rtsps": + pa = "rtspssessions" + + case "rtmp": + pa = "rtmpconns" + } + + var out struct { + Items map[string]struct { + State string `json:"state"` + } `json:"items"` + } + err = httpRequest(http.MethodGet, "http://localhost:9997/v1/"+pa+"/list", nil, &out) + require.NoError(t, err) - case "rtsps": - pa = "rtspssessions" + var firstID string + for k := range out.Items { + firstID = k + } - case "rtmp": - pa = "rtmpconns" - } + require.Equal(t, "publish", out.Items[firstID].State) - var out struct { - Items map[string]struct { - State string `json:"state"` - } `json:"items"` - } - err = httpRequest(http.MethodGet, "http://localhost:9997/v1/"+pa+"/list", nil, &out) - require.NoError(t, err) + case "hls": + var out struct { + Items map[string]struct { + LastRequest string `json:"lastRequest"` + } `json:"items"` + } + err = httpRequest(http.MethodGet, "http://localhost:9997/v1/hlsmuxers/list", nil, &out) + require.NoError(t, err) - var firstID string - for k := range out.Items { - firstID = k - } + var firstID string + for k := range out.Items { + firstID = k + } - require.Equal(t, "publish", out.Items[firstID].State) + require.NotEqual(t, "", out.Items[firstID].LastRequest) + } }) } } diff --git a/internal/core/core.go b/internal/core/core.go index cd18c319..81b2702d 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -344,6 +344,7 @@ func (p *Core) createResources(initial bool) error { p.rtspServer, p.rtspsServer, p.rtmpServer, + p.hlsServer, p) if err != nil { return err @@ -468,7 +469,8 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { closePathManager || closeRTSPServer || closeRTSPSServer || - closeRTMPServer { + closeRTMPServer || + closeHLSServer { closeAPI = true } diff --git a/internal/core/core_test.go b/internal/core/core_test.go index 113d6f53..fd9d1e8b 100644 --- a/internal/core/core_test.go +++ b/internal/core/core_test.go @@ -188,7 +188,7 @@ func TestCorePathAutoDeletion(t *testing.T) { } }() - res := p.pathManager.onAPIPathsList(apiPathsListReq1{}) + res := p.pathManager.onAPIPathsList(apiPathsListReq{}) require.NoError(t, res.Err) require.Equal(t, 0, len(res.Data.Items)) diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 1aa47ee7..5b991178 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -122,6 +122,7 @@ type hlsMuxerParent interface { } type hlsMuxer struct { + name string hlsAlwaysRemux bool hlsSegmentCount int hlsSegmentDuration conf.StringDuration @@ -140,11 +141,13 @@ type hlsMuxer struct { requests []hlsMuxerRequest // in - request chan hlsMuxerRequest + request chan hlsMuxerRequest + apiHLSMuxersList chan apiHLSMuxersListSubReq } func newHLSMuxer( parentCtx context.Context, + name string, hlsAlwaysRemux bool, hlsSegmentCount int, hlsSegmentDuration conf.StringDuration, @@ -156,6 +159,7 @@ func newHLSMuxer( ctx, ctxCancel := context.WithCancel(parentCtx) m := &hlsMuxer{ + name: name, hlsAlwaysRemux: hlsAlwaysRemux, hlsSegmentCount: hlsSegmentCount, hlsSegmentDuration: hlsSegmentDuration, @@ -170,7 +174,8 @@ func newHLSMuxer( v := time.Now().Unix() return &v }(), - request: make(chan hlsMuxerRequest), + request: make(chan hlsMuxerRequest), + apiHLSMuxersList: make(chan apiHLSMuxersListSubReq), } m.log(logger.Info, "opened") @@ -221,6 +226,12 @@ func (m *hlsMuxer) run() { m.requests = append(m.requests, req) } + case req := <-m.apiHLSMuxersList: + req.Data.Items[m.name] = apiHLSMuxersListItem{ + LastRequest: time.Unix(atomic.LoadInt64(m.lastRequestTime), 0).String(), + } + close(req.Res) + case <-innerReady: isReady = true for _, req := range m.requests { @@ -499,3 +510,14 @@ func (m *hlsMuxer) onReaderAPIDescribe() interface{} { Type string `json:"type"` }{"hlsMuxer"} } + +// onAPIHLSMuxersList is called by api. +func (m *hlsMuxer) onAPIHLSMuxersList(req apiHLSMuxersListSubReq) { + req.Res = make(chan struct{}) + select { + case m.apiHLSMuxersList <- req: + <-req.Res + + case <-m.ctx.Done(): + } +} diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index c4edf3a5..d8a6f066 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -2,6 +2,7 @@ package core import ( "context" + "fmt" "io" "net" "net/http" @@ -36,9 +37,10 @@ type hlsServer struct { muxers map[string]*hlsMuxer // in - pathSourceReady chan *path - request chan hlsMuxerRequest - muxerClose chan *hlsMuxer + pathSourceReady chan *path + request chan hlsMuxerRequest + muxerClose chan *hlsMuxer + apiHLSMuxersList chan apiHLSMuxersListReq } func newHLSServer( @@ -74,6 +76,7 @@ func newHLSServer( pathSourceReady: make(chan *path), request: make(chan hlsMuxerRequest), muxerClose: make(chan *hlsMuxer), + apiHLSMuxersList: make(chan apiHLSMuxersListReq), } s.log(logger.Info, "listener opened on "+address) @@ -124,6 +127,17 @@ outer: } delete(s.muxers, c.PathName()) + case req := <-s.apiHLSMuxersList: + muxers := make(map[string]*hlsMuxer) + + for name, m := range s.muxers { + muxers[name] = m + } + + req.Res <- apiHLSMuxersListRes{ + Muxers: muxers, + } + case <-s.ctx.Done(): break outer } @@ -219,6 +233,7 @@ func (s *hlsServer) findOrCreateMuxer(pathName string) *hlsMuxer { if !ok { r = newHLSMuxer( s.ctx, + pathName, s.hlsAlwaysRemux, s.hlsSegmentCount, s.hlsSegmentDuration, @@ -247,3 +262,25 @@ func (s *hlsServer) onPathSourceReady(pa *path) { case <-s.ctx.Done(): } } + +// onAPIHLSMuxersList is called by api. +func (s *hlsServer) onAPIHLSMuxersList(req apiHLSMuxersListReq) apiHLSMuxersListRes { + req.Res = make(chan apiHLSMuxersListRes) + select { + case s.apiHLSMuxersList <- req: + res := <-req.Res + + res.Data = &apiHLSMuxersListData{ + Items: make(map[string]apiHLSMuxersListItem), + } + + for _, pa := range res.Muxers { + pa.onAPIHLSMuxersList(apiHLSMuxersListSubReq{Data: res.Data}) + } + + return res + + case <-s.ctx.Done(): + return apiHLSMuxersListRes{Err: fmt.Errorf("terminated")} + } +} diff --git a/internal/core/metrics.go b/internal/core/metrics.go index 3548401b..262ea543 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -18,7 +18,7 @@ func formatMetric(key string, value int64) string { } type metricsPathManager interface { - onAPIPathsList(req apiPathsListReq1) apiPathsListRes1 + onAPIPathsList(req apiPathsListReq) apiPathsListRes } type metricsRTSPServer interface { @@ -83,7 +83,7 @@ func (m *metrics) run() { func (m *metrics) onMetrics(ctx *gin.Context) { out := "" - res := m.pathManager.onAPIPathsList(apiPathsListReq1{}) + res := m.pathManager.onAPIPathsList(apiPathsListReq{}) if res.Err == nil { for name, p := range res.Data.Items { if p.SourceReady { diff --git a/internal/core/path.go b/internal/core/path.go index 0fe63f41..d6b0d354 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -218,7 +218,7 @@ type path struct { readerSetupPlay chan pathReaderSetupPlayReq readerPlay chan pathReaderPlayReq readerPause chan pathReaderPauseReq - apiPathsList chan apiPathsListReq2 + apiPathsList chan apiPathsListSubReq } func newPath( @@ -262,7 +262,7 @@ func newPath( readerSetupPlay: make(chan pathReaderSetupPlayReq), readerPlay: make(chan pathReaderPlayReq), readerPause: make(chan pathReaderPauseReq), - apiPathsList: make(chan apiPathsListReq2), + apiPathsList: make(chan apiPathsListSubReq), } pa.log(logger.Info, "opened") @@ -832,8 +832,8 @@ func (pa *path) handleReaderPause(req pathReaderPauseReq) { close(req.Res) } -func (pa *path) handleAPIPathsList(req apiPathsListReq2) { - req.Data.Items[pa.name] = apiPathsItem{ +func (pa *path) handleAPIPathsList(req apiPathsListSubReq) { + req.Data.Items[pa.name] = apiPathsListItem{ ConfName: pa.confName, Conf: pa.conf, Source: func() interface{} { @@ -967,7 +967,7 @@ func (pa *path) onReaderPause(req pathReaderPauseReq) { } // onAPIPathsList is called by api. -func (pa *path) onAPIPathsList(req apiPathsListReq2) { +func (pa *path) onAPIPathsList(req apiPathsListSubReq) { req.Res = make(chan struct{}) select { case pa.apiPathsList <- req: diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 8153d657..542fc4cb 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -44,7 +44,7 @@ type pathManager struct { readerSetupPlay chan pathReaderSetupPlayReq publisherAnnounce chan pathPublisherAnnounceReq hlsServerSet chan pathManagerHLSServer - apiPathsList chan apiPathsListReq1 + apiPathsList chan apiPathsListReq } func newPathManager( @@ -78,7 +78,7 @@ func newPathManager( readerSetupPlay: make(chan pathReaderSetupPlayReq), publisherAnnounce: make(chan pathPublisherAnnounceReq), hlsServerSet: make(chan pathManagerHLSServer), - apiPathsList: make(chan apiPathsListReq1), + apiPathsList: make(chan apiPathsListReq), } for pathName, pathConf := range pm.pathConfs { @@ -254,7 +254,7 @@ outer: paths[name] = pa } - req.Res <- apiPathsListRes1{ + req.Res <- apiPathsListRes{ Paths: paths, } @@ -421,23 +421,23 @@ func (pm *pathManager) onHLSServerSet(s pathManagerHLSServer) { } // onAPIPathsList is called by api. -func (pm *pathManager) onAPIPathsList(req apiPathsListReq1) apiPathsListRes1 { - req.Res = make(chan apiPathsListRes1) +func (pm *pathManager) onAPIPathsList(req apiPathsListReq) apiPathsListRes { + req.Res = make(chan apiPathsListRes) select { case pm.apiPathsList <- req: - res1 := <-req.Res + res := <-req.Res - res1.Data = &apiPathsListData{ - Items: make(map[string]apiPathsItem), + res.Data = &apiPathsListData{ + Items: make(map[string]apiPathsListItem), } - for _, pa := range res1.Paths { - pa.onAPIPathsList(apiPathsListReq2{Data: res1.Data}) + for _, pa := range res.Paths { + pa.onAPIPathsList(apiPathsListSubReq{Data: res.Data}) } - return res1 + return res case <-pm.ctx.Done(): - return apiPathsListRes1{Err: fmt.Errorf("terminated")} + return apiPathsListRes{Err: fmt.Errorf("terminated")} } }