Browse Source

move api, metrics and pprof into dedicated packages (#2843)

pull/2844/head
Alessandro Ros 1 year ago committed by GitHub
parent
commit
1341421412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 290
      internal/api/api.go
  2. 39
      internal/api/api_test.go
  3. 34
      internal/core/api_test.go
  4. 103
      internal/core/core.go
  5. 4
      internal/core/path.go
  6. 16
      internal/core/path_manager.go
  7. 2
      internal/core/path_manager_test.go
  8. 72
      internal/metrics/metrics.go
  9. 36
      internal/pprof/pprof.go

290
internal/core/api.go → internal/api/api.go

@ -1,4 +1,5 @@
package core // Package api contains the API server.
package api
import ( import (
"encoding/json" "encoding/json"
@ -97,17 +98,20 @@ func paramName(ctx *gin.Context) (string, bool) {
return name[1:], true return name[1:], true
} }
type apiPathManager interface { // PathManager contains methods used by the API and Metrics server.
apiPathsList() (*defs.APIPathList, error) type PathManager interface {
apiPathsGet(string) (*defs.APIPath, error) APIPathsList() (*defs.APIPathList, error)
APIPathsGet(string) (*defs.APIPath, error)
} }
type apiHLSServer interface { // HLSServer contains methods used by the API and Metrics server.
type HLSServer interface {
APIMuxersList() (*defs.APIHLSMuxerList, error) APIMuxersList() (*defs.APIHLSMuxerList, error)
APIMuxersGet(string) (*defs.APIHLSMuxer, error) APIMuxersGet(string) (*defs.APIHLSMuxer, error)
} }
type apiRTSPServer interface { // RTSPServer contains methods used by the API and Metrics server.
type RTSPServer interface {
APIConnsList() (*defs.APIRTSPConnsList, error) APIConnsList() (*defs.APIRTSPConnsList, error)
APIConnsGet(uuid.UUID) (*defs.APIRTSPConn, error) APIConnsGet(uuid.UUID) (*defs.APIRTSPConn, error)
APISessionsList() (*defs.APIRTSPSessionList, error) APISessionsList() (*defs.APIRTSPSessionList, error)
@ -115,19 +119,22 @@ type apiRTSPServer interface {
APISessionsKick(uuid.UUID) error APISessionsKick(uuid.UUID) error
} }
type apiRTMPServer interface { // RTMPServer contains methods used by the API and Metrics server.
type RTMPServer interface {
APIConnsList() (*defs.APIRTMPConnList, error) APIConnsList() (*defs.APIRTMPConnList, error)
APIConnsGet(uuid.UUID) (*defs.APIRTMPConn, error) APIConnsGet(uuid.UUID) (*defs.APIRTMPConn, error)
APIConnsKick(uuid.UUID) error APIConnsKick(uuid.UUID) error
} }
type apiSRTServer interface { // SRTServer contains methods used by the API and Metrics server.
type SRTServer interface {
APIConnsList() (*defs.APISRTConnList, error) APIConnsList() (*defs.APISRTConnList, error)
APIConnsGet(uuid.UUID) (*defs.APISRTConn, error) APIConnsGet(uuid.UUID) (*defs.APISRTConn, error)
APIConnsKick(uuid.UUID) error APIConnsKick(uuid.UUID) error
} }
type apiWebRTCServer interface { // WebRTCServer contains methods used by the API and Metrics server.
type WebRTCServer interface {
APISessionsList() (*defs.APIWebRTCSessionList, error) APISessionsList() (*defs.APIWebRTCSessionList, error)
APISessionsGet(uuid.UUID) (*defs.APIWebRTCSession, error) APISessionsGet(uuid.UUID) (*defs.APIWebRTCSession, error)
APISessionsKick(uuid.UUID) error APISessionsKick(uuid.UUID) error
@ -135,52 +142,30 @@ type apiWebRTCServer interface {
type apiParent interface { type apiParent interface {
logger.Writer logger.Writer
apiConfigSet(conf *conf.Conf) APIConfigSet(conf *conf.Conf)
} }
type api struct { // API is an API server.
conf *conf.Conf type API struct {
pathManager apiPathManager Address string
rtspServer apiRTSPServer ReadTimeout conf.StringDuration
rtspsServer apiRTSPServer Conf *conf.Conf
rtmpServer apiRTMPServer PathManager PathManager
rtmpsServer apiRTMPServer RTSPServer RTSPServer
hlsManager apiHLSServer RTSPSServer RTSPServer
webRTCServer apiWebRTCServer RTMPServer RTMPServer
srtServer apiSRTServer RTMPSServer RTMPServer
parent apiParent HLSServer HLSServer
WebRTCServer WebRTCServer
SRTServer SRTServer
Parent apiParent
httpServer *httpserv.WrappedServer httpServer *httpserv.WrappedServer
mutex sync.Mutex mutex sync.Mutex
} }
func newAPI( // Initialize initializes API.
address string, func (a *API) Initialize() error {
readTimeout conf.StringDuration,
conf *conf.Conf,
pathManager apiPathManager,
rtspServer apiRTSPServer,
rtspsServer apiRTSPServer,
rtmpServer apiRTMPServer,
rtmpsServer apiRTMPServer,
hlsManager apiHLSServer,
webRTCServer apiWebRTCServer,
srtServer apiSRTServer,
parent apiParent,
) (*api, error) {
a := &api{
conf: conf,
pathManager: pathManager,
rtspServer: rtspServer,
rtspsServer: rtspsServer,
rtmpServer: rtmpServer,
rtmpsServer: rtmpsServer,
hlsManager: hlsManager,
webRTCServer: webRTCServer,
srtServer: srtServer,
parent: parent,
}
router := gin.New() router := gin.New()
router.SetTrustedProxies(nil) //nolint:errcheck router.SetTrustedProxies(nil) //nolint:errcheck
@ -202,12 +187,12 @@ func newAPI(
group.GET("/v3/paths/list", a.onPathsList) group.GET("/v3/paths/list", a.onPathsList)
group.GET("/v3/paths/get/*name", a.onPathsGet) group.GET("/v3/paths/get/*name", a.onPathsGet)
if !interfaceIsEmpty(a.hlsManager) { if !interfaceIsEmpty(a.HLSServer) {
group.GET("/v3/hlsmuxers/list", a.onHLSMuxersList) group.GET("/v3/hlsmuxers/list", a.onHLSMuxersList)
group.GET("/v3/hlsmuxers/get/*name", a.onHLSMuxersGet) group.GET("/v3/hlsmuxers/get/*name", a.onHLSMuxersGet)
} }
if !interfaceIsEmpty(a.rtspServer) { if !interfaceIsEmpty(a.RTSPServer) {
group.GET("/v3/rtspconns/list", a.onRTSPConnsList) group.GET("/v3/rtspconns/list", a.onRTSPConnsList)
group.GET("/v3/rtspconns/get/:id", a.onRTSPConnsGet) group.GET("/v3/rtspconns/get/:id", a.onRTSPConnsGet)
group.GET("/v3/rtspsessions/list", a.onRTSPSessionsList) group.GET("/v3/rtspsessions/list", a.onRTSPSessionsList)
@ -215,7 +200,7 @@ func newAPI(
group.POST("/v3/rtspsessions/kick/:id", a.onRTSPSessionsKick) group.POST("/v3/rtspsessions/kick/:id", a.onRTSPSessionsKick)
} }
if !interfaceIsEmpty(a.rtspsServer) { if !interfaceIsEmpty(a.RTSPSServer) {
group.GET("/v3/rtspsconns/list", a.onRTSPSConnsList) group.GET("/v3/rtspsconns/list", a.onRTSPSConnsList)
group.GET("/v3/rtspsconns/get/:id", a.onRTSPSConnsGet) group.GET("/v3/rtspsconns/get/:id", a.onRTSPSConnsGet)
group.GET("/v3/rtspssessions/list", a.onRTSPSSessionsList) group.GET("/v3/rtspssessions/list", a.onRTSPSSessionsList)
@ -223,63 +208,64 @@ func newAPI(
group.POST("/v3/rtspssessions/kick/:id", a.onRTSPSSessionsKick) group.POST("/v3/rtspssessions/kick/:id", a.onRTSPSSessionsKick)
} }
if !interfaceIsEmpty(a.rtmpServer) { if !interfaceIsEmpty(a.RTMPServer) {
group.GET("/v3/rtmpconns/list", a.onRTMPConnsList) group.GET("/v3/rtmpconns/list", a.onRTMPConnsList)
group.GET("/v3/rtmpconns/get/:id", a.onRTMPConnsGet) group.GET("/v3/rtmpconns/get/:id", a.onRTMPConnsGet)
group.POST("/v3/rtmpconns/kick/:id", a.onRTMPConnsKick) group.POST("/v3/rtmpconns/kick/:id", a.onRTMPConnsKick)
} }
if !interfaceIsEmpty(a.rtmpsServer) { if !interfaceIsEmpty(a.RTMPSServer) {
group.GET("/v3/rtmpsconns/list", a.onRTMPSConnsList) group.GET("/v3/rtmpsconns/list", a.onRTMPSConnsList)
group.GET("/v3/rtmpsconns/get/:id", a.onRTMPSConnsGet) group.GET("/v3/rtmpsconns/get/:id", a.onRTMPSConnsGet)
group.POST("/v3/rtmpsconns/kick/:id", a.onRTMPSConnsKick) group.POST("/v3/rtmpsconns/kick/:id", a.onRTMPSConnsKick)
} }
if !interfaceIsEmpty(a.webRTCServer) { if !interfaceIsEmpty(a.WebRTCServer) {
group.GET("/v3/webrtcsessions/list", a.onWebRTCSessionsList) group.GET("/v3/webrtcsessions/list", a.onWebRTCSessionsList)
group.GET("/v3/webrtcsessions/get/:id", a.onWebRTCSessionsGet) group.GET("/v3/webrtcsessions/get/:id", a.onWebRTCSessionsGet)
group.POST("/v3/webrtcsessions/kick/:id", a.onWebRTCSessionsKick) group.POST("/v3/webrtcsessions/kick/:id", a.onWebRTCSessionsKick)
} }
if !interfaceIsEmpty(a.srtServer) { if !interfaceIsEmpty(a.SRTServer) {
group.GET("/v3/srtconns/list", a.onSRTConnsList) group.GET("/v3/srtconns/list", a.onSRTConnsList)
group.GET("/v3/srtconns/get/:id", a.onSRTConnsGet) group.GET("/v3/srtconns/get/:id", a.onSRTConnsGet)
group.POST("/v3/srtconns/kick/:id", a.onSRTConnsKick) group.POST("/v3/srtconns/kick/:id", a.onSRTConnsKick)
} }
network, address := restrictnetwork.Restrict("tcp", address) network, address := restrictnetwork.Restrict("tcp", a.Address)
var err error var err error
a.httpServer, err = httpserv.NewWrappedServer( a.httpServer, err = httpserv.NewWrappedServer(
network, network,
address, address,
time.Duration(readTimeout), time.Duration(a.ReadTimeout),
"", "",
"", "",
router, router,
a, a,
) )
if err != nil { if err != nil {
return nil, err return err
} }
a.Log(logger.Info, "listener opened on "+address) a.Log(logger.Info, "listener opened on "+address)
return a, nil return nil
} }
func (a *api) close() { // Close closes the API.
func (a *API) Close() {
a.Log(logger.Info, "listener is closing") a.Log(logger.Info, "listener is closing")
a.httpServer.Close() a.httpServer.Close()
} }
// Log implements logger.Writer. // Log implements logger.Writer.
func (a *api) Log(level logger.Level, format string, args ...interface{}) { func (a *API) Log(level logger.Level, format string, args ...interface{}) {
a.parent.Log(level, "[API] "+format, args...) a.Parent.Log(level, "[API] "+format, args...)
} }
// error coming from something the user inserted into the request. // error coming from something the user inserted into the request.
func (a *api) writeError(ctx *gin.Context, status int, err error) { func (a *API) writeError(ctx *gin.Context, status int, err error) {
// show error in logs // show error in logs
a.Log(logger.Error, err.Error()) a.Log(logger.Error, err.Error())
@ -289,15 +275,15 @@ func (a *api) writeError(ctx *gin.Context, status int, err error) {
}) })
} }
func (a *api) onConfigGlobalGet(ctx *gin.Context) { func (a *API) onConfigGlobalGet(ctx *gin.Context) {
a.mutex.Lock() a.mutex.Lock()
c := a.conf c := a.Conf
a.mutex.Unlock() a.mutex.Unlock()
ctx.JSON(http.StatusOK, c.Global()) ctx.JSON(http.StatusOK, c.Global())
} }
func (a *api) onConfigGlobalPatch(ctx *gin.Context) { func (a *API) onConfigGlobalPatch(ctx *gin.Context) {
var c conf.OptionalGlobal var c conf.OptionalGlobal
err := json.NewDecoder(ctx.Request.Body).Decode(&c) err := json.NewDecoder(ctx.Request.Body).Decode(&c)
if err != nil { if err != nil {
@ -308,7 +294,7 @@ func (a *api) onConfigGlobalPatch(ctx *gin.Context) {
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
newConf := a.conf.Clone() newConf := a.Conf.Clone()
newConf.PatchGlobal(&c) newConf.PatchGlobal(&c)
@ -318,24 +304,24 @@ func (a *api) onConfigGlobalPatch(ctx *gin.Context) {
return return
} }
a.conf = newConf a.Conf = newConf
// since reloading the configuration can cause the shutdown of the API, // since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine // call it in a goroutine
go a.parent.apiConfigSet(newConf) go a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onConfigPathDefaultsGet(ctx *gin.Context) { func (a *API) onConfigPathDefaultsGet(ctx *gin.Context) {
a.mutex.Lock() a.mutex.Lock()
c := a.conf c := a.Conf
a.mutex.Unlock() a.mutex.Unlock()
ctx.JSON(http.StatusOK, c.PathDefaults) ctx.JSON(http.StatusOK, c.PathDefaults)
} }
func (a *api) onConfigPathDefaultsPatch(ctx *gin.Context) { func (a *API) onConfigPathDefaultsPatch(ctx *gin.Context) {
var p conf.OptionalPath var p conf.OptionalPath
err := json.NewDecoder(ctx.Request.Body).Decode(&p) err := json.NewDecoder(ctx.Request.Body).Decode(&p)
if err != nil { if err != nil {
@ -346,7 +332,7 @@ func (a *api) onConfigPathDefaultsPatch(ctx *gin.Context) {
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
newConf := a.conf.Clone() newConf := a.Conf.Clone()
newConf.PatchPathDefaults(&p) newConf.PatchPathDefaults(&p)
@ -356,15 +342,15 @@ func (a *api) onConfigPathDefaultsPatch(ctx *gin.Context) {
return return
} }
a.conf = newConf a.Conf = newConf
a.parent.apiConfigSet(newConf) a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onConfigPathsList(ctx *gin.Context) { func (a *API) onConfigPathsList(ctx *gin.Context) {
a.mutex.Lock() a.mutex.Lock()
c := a.conf c := a.Conf
a.mutex.Unlock() a.mutex.Unlock()
data := &defs.APIPathConfList{ data := &defs.APIPathConfList{
@ -386,7 +372,7 @@ func (a *api) onConfigPathsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onConfigPathsGet(ctx *gin.Context) { func (a *API) onConfigPathsGet(ctx *gin.Context) {
name, ok := paramName(ctx) name, ok := paramName(ctx)
if !ok { if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
@ -394,7 +380,7 @@ func (a *api) onConfigPathsGet(ctx *gin.Context) {
} }
a.mutex.Lock() a.mutex.Lock()
c := a.conf c := a.Conf
a.mutex.Unlock() a.mutex.Unlock()
p, ok := c.Paths[name] p, ok := c.Paths[name]
@ -406,7 +392,7 @@ func (a *api) onConfigPathsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, p) ctx.JSON(http.StatusOK, p)
} }
func (a *api) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl
name, ok := paramName(ctx) name, ok := paramName(ctx)
if !ok { if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
@ -423,7 +409,7 @@ func (a *api) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
newConf := a.conf.Clone() newConf := a.Conf.Clone()
err = newConf.AddPath(name, &p) err = newConf.AddPath(name, &p)
if err != nil { if err != nil {
@ -437,13 +423,13 @@ func (a *api) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl
return return
} }
a.conf = newConf a.Conf = newConf
a.parent.apiConfigSet(newConf) a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl
name, ok := paramName(ctx) name, ok := paramName(ctx)
if !ok { if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
@ -460,7 +446,7 @@ func (a *api) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
newConf := a.conf.Clone() newConf := a.Conf.Clone()
err = newConf.PatchPath(name, &p) err = newConf.PatchPath(name, &p)
if err != nil { if err != nil {
@ -474,13 +460,13 @@ func (a *api) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl
return return
} }
a.conf = newConf a.Conf = newConf
a.parent.apiConfigSet(newConf) a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl
name, ok := paramName(ctx) name, ok := paramName(ctx)
if !ok { if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
@ -497,7 +483,7 @@ func (a *api) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
newConf := a.conf.Clone() newConf := a.Conf.Clone()
err = newConf.ReplacePath(name, &p) err = newConf.ReplacePath(name, &p)
if err != nil { if err != nil {
@ -511,13 +497,13 @@ func (a *api) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl
return return
} }
a.conf = newConf a.Conf = newConf
a.parent.apiConfigSet(newConf) a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onConfigPathsDelete(ctx *gin.Context) { func (a *API) onConfigPathsDelete(ctx *gin.Context) {
name, ok := paramName(ctx) name, ok := paramName(ctx)
if !ok { if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
@ -527,7 +513,7 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) {
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
newConf := a.conf.Clone() newConf := a.Conf.Clone()
err := newConf.RemovePath(name) err := newConf.RemovePath(name)
if err != nil { if err != nil {
@ -541,14 +527,14 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) {
return return
} }
a.conf = newConf a.Conf = newConf
a.parent.apiConfigSet(newConf) a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onPathsList(ctx *gin.Context) { func (a *API) onPathsList(ctx *gin.Context) {
data, err := a.pathManager.apiPathsList() data, err := a.PathManager.APIPathsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -565,14 +551,14 @@ func (a *api) onPathsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onPathsGet(ctx *gin.Context) { func (a *API) onPathsGet(ctx *gin.Context) {
name, ok := paramName(ctx) name, ok := paramName(ctx)
if !ok { if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return return
} }
data, err := a.pathManager.apiPathsGet(name) data, err := a.PathManager.APIPathsGet(name)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -581,8 +567,8 @@ func (a *api) onPathsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPConnsList(ctx *gin.Context) { func (a *API) onRTSPConnsList(ctx *gin.Context) {
data, err := a.rtspServer.APIConnsList() data, err := a.RTSPServer.APIConnsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -599,14 +585,14 @@ func (a *api) onRTSPConnsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPConnsGet(ctx *gin.Context) { func (a *API) onRTSPConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
data, err := a.rtspServer.APIConnsGet(uuid) data, err := a.RTSPServer.APIConnsGet(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -615,8 +601,8 @@ func (a *api) onRTSPConnsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPSessionsList(ctx *gin.Context) { func (a *API) onRTSPSessionsList(ctx *gin.Context) {
data, err := a.rtspServer.APISessionsList() data, err := a.RTSPServer.APISessionsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -633,14 +619,14 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPSessionsGet(ctx *gin.Context) { func (a *API) onRTSPSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
data, err := a.rtspServer.APISessionsGet(uuid) data, err := a.RTSPServer.APISessionsGet(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -649,14 +635,14 @@ func (a *api) onRTSPSessionsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPSessionsKick(ctx *gin.Context) { func (a *API) onRTSPSessionsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
err = a.rtspServer.APISessionsKick(uuid) err = a.RTSPServer.APISessionsKick(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -665,8 +651,8 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onRTSPSConnsList(ctx *gin.Context) { func (a *API) onRTSPSConnsList(ctx *gin.Context) {
data, err := a.rtspsServer.APIConnsList() data, err := a.RTSPSServer.APIConnsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -683,14 +669,14 @@ func (a *api) onRTSPSConnsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPSConnsGet(ctx *gin.Context) { func (a *API) onRTSPSConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
data, err := a.rtspsServer.APIConnsGet(uuid) data, err := a.RTSPSServer.APIConnsGet(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -699,8 +685,8 @@ func (a *api) onRTSPSConnsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPSSessionsList(ctx *gin.Context) { func (a *API) onRTSPSSessionsList(ctx *gin.Context) {
data, err := a.rtspsServer.APISessionsList() data, err := a.RTSPSServer.APISessionsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -717,14 +703,14 @@ func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPSSessionsGet(ctx *gin.Context) { func (a *API) onRTSPSSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
data, err := a.rtspsServer.APISessionsGet(uuid) data, err := a.RTSPSServer.APISessionsGet(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -733,14 +719,14 @@ func (a *api) onRTSPSSessionsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { func (a *API) onRTSPSSessionsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
err = a.rtspsServer.APISessionsKick(uuid) err = a.RTSPSServer.APISessionsKick(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -749,8 +735,8 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onRTMPConnsList(ctx *gin.Context) { func (a *API) onRTMPConnsList(ctx *gin.Context) {
data, err := a.rtmpServer.APIConnsList() data, err := a.RTMPServer.APIConnsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -767,14 +753,14 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTMPConnsGet(ctx *gin.Context) { func (a *API) onRTMPConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
data, err := a.rtmpServer.APIConnsGet(uuid) data, err := a.RTMPServer.APIConnsGet(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -783,14 +769,14 @@ func (a *api) onRTMPConnsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTMPConnsKick(ctx *gin.Context) { func (a *API) onRTMPConnsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
err = a.rtmpServer.APIConnsKick(uuid) err = a.RTMPServer.APIConnsKick(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -799,8 +785,8 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) {
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onRTMPSConnsList(ctx *gin.Context) { func (a *API) onRTMPSConnsList(ctx *gin.Context) {
data, err := a.rtmpsServer.APIConnsList() data, err := a.RTMPSServer.APIConnsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -817,14 +803,14 @@ func (a *api) onRTMPSConnsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTMPSConnsGet(ctx *gin.Context) { func (a *API) onRTMPSConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
data, err := a.rtmpsServer.APIConnsGet(uuid) data, err := a.RTMPSServer.APIConnsGet(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -833,14 +819,14 @@ func (a *api) onRTMPSConnsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onRTMPSConnsKick(ctx *gin.Context) { func (a *API) onRTMPSConnsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
err = a.rtmpsServer.APIConnsKick(uuid) err = a.RTMPSServer.APIConnsKick(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -849,8 +835,8 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) {
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onHLSMuxersList(ctx *gin.Context) { func (a *API) onHLSMuxersList(ctx *gin.Context) {
data, err := a.hlsManager.APIMuxersList() data, err := a.HLSServer.APIMuxersList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -867,14 +853,14 @@ func (a *api) onHLSMuxersList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onHLSMuxersGet(ctx *gin.Context) { func (a *API) onHLSMuxersGet(ctx *gin.Context) {
name, ok := paramName(ctx) name, ok := paramName(ctx)
if !ok { if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return return
} }
data, err := a.hlsManager.APIMuxersGet(name) data, err := a.HLSServer.APIMuxersGet(name)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -883,8 +869,8 @@ func (a *api) onHLSMuxersGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onWebRTCSessionsList(ctx *gin.Context) { func (a *API) onWebRTCSessionsList(ctx *gin.Context) {
data, err := a.webRTCServer.APISessionsList() data, err := a.WebRTCServer.APISessionsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -901,14 +887,14 @@ func (a *api) onWebRTCSessionsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onWebRTCSessionsGet(ctx *gin.Context) { func (a *API) onWebRTCSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
data, err := a.webRTCServer.APISessionsGet(uuid) data, err := a.WebRTCServer.APISessionsGet(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -917,14 +903,14 @@ func (a *api) onWebRTCSessionsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onWebRTCSessionsKick(ctx *gin.Context) { func (a *API) onWebRTCSessionsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
err = a.webRTCServer.APISessionsKick(uuid) err = a.WebRTCServer.APISessionsKick(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -933,8 +919,8 @@ func (a *api) onWebRTCSessionsKick(ctx *gin.Context) {
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
func (a *api) onSRTConnsList(ctx *gin.Context) { func (a *API) onSRTConnsList(ctx *gin.Context) {
data, err := a.srtServer.APIConnsList() data, err := a.SRTServer.APIConnsList()
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -951,14 +937,14 @@ func (a *api) onSRTConnsList(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onSRTConnsGet(ctx *gin.Context) { func (a *API) onSRTConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
data, err := a.srtServer.APIConnsGet(uuid) data, err := a.SRTServer.APIConnsGet(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -967,14 +953,14 @@ func (a *api) onSRTConnsGet(ctx *gin.Context) {
ctx.JSON(http.StatusOK, data) ctx.JSON(http.StatusOK, data)
} }
func (a *api) onSRTConnsKick(ctx *gin.Context) { func (a *API) onSRTConnsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id")) uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil { if err != nil {
a.writeError(ctx, http.StatusBadRequest, err) a.writeError(ctx, http.StatusBadRequest, err)
return return
} }
err = a.srtServer.APIConnsKick(uuid) err = a.SRTServer.APIConnsKick(uuid)
if err != nil { if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err) a.writeError(ctx, http.StatusInternalServerError, err)
return return
@ -983,9 +969,9 @@ func (a *api) onSRTConnsKick(ctx *gin.Context) {
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }
// confReload is called by core. // ReloadConf is called by core.
func (a *api) confReload(conf *conf.Conf) { func (a *API) ReloadConf(conf *conf.Conf) {
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
a.conf = conf a.Conf = conf
} }

39
internal/api/api_test.go

@ -0,0 +1,39 @@
package api
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestPaginate(t *testing.T) {
items := make([]int, 5)
for i := 0; i < 5; i++ {
items[i] = i
}
pageCount, err := paginate(&items, "1", "1")
require.NoError(t, err)
require.Equal(t, 5, pageCount)
require.Equal(t, []int{1}, items)
items = make([]int, 5)
for i := 0; i < 5; i++ {
items[i] = i
}
pageCount, err = paginate(&items, "3", "2")
require.NoError(t, err)
require.Equal(t, 2, pageCount)
require.Equal(t, []int{}, items)
items = make([]int, 6)
for i := 0; i < 6; i++ {
items[i] = i
}
pageCount, err = paginate(&items, "4", "1")
require.NoError(t, err)
require.Equal(t, 2, pageCount)
require.Equal(t, []int{4, 5}, items)
}

34
internal/core/api_test.go

@ -20,7 +20,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/datarhei/gosrt" srt "github.com/datarhei/gosrt"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -102,38 +102,6 @@ func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in int
require.NoError(t, err) require.NoError(t, err)
} }
func TestPagination(t *testing.T) {
items := make([]int, 5)
for i := 0; i < 5; i++ {
items[i] = i
}
pageCount, err := paginate(&items, "1", "1")
require.NoError(t, err)
require.Equal(t, 5, pageCount)
require.Equal(t, []int{1}, items)
items = make([]int, 5)
for i := 0; i < 5; i++ {
items[i] = i
}
pageCount, err = paginate(&items, "3", "2")
require.NoError(t, err)
require.Equal(t, 2, pageCount)
require.Equal(t, []int{}, items)
items = make([]int, 6)
for i := 0; i < 6; i++ {
items[i] = i
}
pageCount, err = paginate(&items, "4", "1")
require.NoError(t, err)
require.Equal(t, 2, pageCount)
require.Equal(t, []int{4, 5}, items)
}
func TestAPIConfigGlobalGet(t *testing.T) { func TestAPIConfigGlobalGet(t *testing.T) {
p, ok := newInstance("api: yes\n") p, ok := newInstance("api: yes\n")
require.Equal(t, true, ok) require.Equal(t, true, ok)

103
internal/core/core.go

@ -16,10 +16,13 @@ import (
"github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/bluenviron/mediamtx/internal/api"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/confwatcher" "github.com/bluenviron/mediamtx/internal/confwatcher"
"github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/metrics"
"github.com/bluenviron/mediamtx/internal/pprof"
"github.com/bluenviron/mediamtx/internal/record" "github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/rlimit" "github.com/bluenviron/mediamtx/internal/rlimit"
"github.com/bluenviron/mediamtx/internal/servers/hls" "github.com/bluenviron/mediamtx/internal/servers/hls"
@ -84,8 +87,8 @@ type Core struct {
conf *conf.Conf conf *conf.Conf
logger *logger.Logger logger *logger.Logger
externalCmdPool *externalcmd.Pool externalCmdPool *externalcmd.Pool
metrics *metrics metrics *metrics.Metrics
pprof *pprof pprof *pprof.PPROF
recordCleaner *record.Cleaner recordCleaner *record.Cleaner
pathManager *pathManager pathManager *pathManager
rtspServer *rtsp.Server rtspServer *rtsp.Server
@ -95,7 +98,7 @@ type Core struct {
hlsServer *hls.Server hlsServer *hls.Server
webRTCServer *webrtc.Server webRTCServer *webrtc.Server
srtServer *srt.Server srtServer *srt.Server
api *api api *api.API
confWatcher *confwatcher.ConfWatcher confWatcher *confwatcher.ConfWatcher
// in // in
@ -275,12 +278,12 @@ func (p *Core) createResources(initial bool) error {
if p.conf.Metrics && if p.conf.Metrics &&
p.metrics == nil { p.metrics == nil {
p.metrics = &metrics{ p.metrics = &metrics.Metrics{
Address: p.conf.MetricsAddress, Address: p.conf.MetricsAddress,
ReadTimeout: p.conf.ReadTimeout, ReadTimeout: p.conf.ReadTimeout,
Parent: p, Parent: p,
} }
err = p.metrics.initialize() err := p.metrics.Initialize()
if err != nil { if err != nil {
return err return err
} }
@ -288,11 +291,12 @@ func (p *Core) createResources(initial bool) error {
if p.conf.PPROF && if p.conf.PPROF &&
p.pprof == nil { p.pprof == nil {
p.pprof, err = newPPROF( p.pprof = &pprof.PPROF{
p.conf.PPROFAddress, Address: p.conf.PPROFAddress,
p.conf.ReadTimeout, ReadTimeout: p.conf.ReadTimeout,
p, Parent: p,
) }
err := p.pprof.Initialize()
if err != nil { if err != nil {
return err return err
} }
@ -324,7 +328,7 @@ func (p *Core) createResources(initial bool) error {
) )
if p.metrics != nil { if p.metrics != nil {
p.metrics.setPathManager(p.pathManager) p.metrics.SetPathManager(p.pathManager)
} }
} }
@ -366,7 +370,7 @@ func (p *Core) createResources(initial bool) error {
} }
if p.metrics != nil { if p.metrics != nil {
p.metrics.setRTSPServer(p.rtspServer) p.metrics.SetRTSPServer(p.rtspServer)
} }
} }
@ -405,7 +409,7 @@ func (p *Core) createResources(initial bool) error {
} }
if p.metrics != nil { if p.metrics != nil {
p.metrics.setRTSPSServer(p.rtspsServer) p.metrics.SetRTSPSServer(p.rtspsServer)
} }
} }
@ -435,7 +439,7 @@ func (p *Core) createResources(initial bool) error {
} }
if p.metrics != nil { if p.metrics != nil {
p.metrics.setRTMPServer(p.rtmpServer) p.metrics.SetRTMPServer(p.rtmpServer)
} }
} }
@ -465,7 +469,7 @@ func (p *Core) createResources(initial bool) error {
} }
if p.metrics != nil { if p.metrics != nil {
p.metrics.setRTMPSServer(p.rtmpsServer) p.metrics.SetRTMPSServer(p.rtmpsServer)
} }
} }
@ -499,7 +503,7 @@ func (p *Core) createResources(initial bool) error {
p.pathManager.setHLSServer(p.hlsServer) p.pathManager.setHLSServer(p.hlsServer)
if p.metrics != nil { if p.metrics != nil {
p.metrics.setHLSServer(p.hlsServer) p.metrics.SetHLSServer(p.hlsServer)
} }
} }
@ -524,14 +528,14 @@ func (p *Core) createResources(initial bool) error {
PathManager: p.pathManager, PathManager: p.pathManager,
Parent: p, Parent: p,
} }
err = p.webRTCServer.Initialize() err := p.webRTCServer.Initialize()
if err != nil { if err != nil {
p.webRTCServer = nil p.webRTCServer = nil
return err return err
} }
if p.metrics != nil { if p.metrics != nil {
p.metrics.setWebRTCServer(p.webRTCServer) p.metrics.SetWebRTCServer(p.webRTCServer)
} }
} }
@ -551,32 +555,33 @@ func (p *Core) createResources(initial bool) error {
PathManager: p.pathManager, PathManager: p.pathManager,
Parent: p, Parent: p,
} }
err = p.srtServer.Initialize() err := p.srtServer.Initialize()
if err != nil { if err != nil {
return err return err
} }
if p.metrics != nil { if p.metrics != nil {
p.metrics.setSRTServer(p.srtServer) p.metrics.SetSRTServer(p.srtServer)
} }
} }
if p.conf.API && if p.conf.API &&
p.api == nil { p.api == nil {
p.api, err = newAPI( p.api = &api.API{
p.conf.APIAddress, Address: p.conf.APIAddress,
p.conf.ReadTimeout, ReadTimeout: p.conf.ReadTimeout,
p.conf, Conf: p.conf,
p.pathManager, PathManager: p.pathManager,
p.rtspServer, RTSPServer: p.rtspServer,
p.rtspsServer, RTSPSServer: p.rtspsServer,
p.rtmpServer, RTMPServer: p.rtmpServer,
p.rtmpsServer, RTMPSServer: p.rtmpsServer,
p.hlsServer, HLSServer: p.hlsServer,
p.webRTCServer, WebRTCServer: p.webRTCServer,
p.srtServer, SRTServer: p.srtServer,
p, Parent: p,
) }
err := p.api.Initialize()
if err != nil { if err != nil {
return err return err
} }
@ -626,7 +631,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeMetrics || closeMetrics ||
closeLogger closeLogger
if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.pathManager.confReload(newConf.Paths) p.pathManager.ReloadConf(newConf.Paths)
} }
closeRTSPServer := newConf == nil || closeRTSPServer := newConf == nil ||
@ -779,16 +784,16 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
if p.api != nil { if p.api != nil {
if closeAPI { if closeAPI {
p.api.close() p.api.Close()
p.api = nil p.api = nil
} else if !calledByAPI { // avoid a loop } else if !calledByAPI { // avoid a loop
p.api.confReload(newConf) p.api.ReloadConf(newConf)
} }
} }
if closeSRTServer && p.srtServer != nil { if closeSRTServer && p.srtServer != nil {
if p.metrics != nil { if p.metrics != nil {
p.metrics.setSRTServer(nil) p.metrics.SetSRTServer(nil)
} }
p.srtServer.Close() p.srtServer.Close()
@ -797,7 +802,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
if closeWebRTCServer && p.webRTCServer != nil { if closeWebRTCServer && p.webRTCServer != nil {
if p.metrics != nil { if p.metrics != nil {
p.metrics.setWebRTCServer(nil) p.metrics.SetWebRTCServer(nil)
} }
p.webRTCServer.Close() p.webRTCServer.Close()
@ -806,7 +811,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
if closeHLSServer && p.hlsServer != nil { if closeHLSServer && p.hlsServer != nil {
if p.metrics != nil { if p.metrics != nil {
p.metrics.setHLSServer(nil) p.metrics.SetHLSServer(nil)
} }
p.pathManager.setHLSServer(nil) p.pathManager.setHLSServer(nil)
@ -817,7 +822,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
if closeRTMPSServer && p.rtmpsServer != nil { if closeRTMPSServer && p.rtmpsServer != nil {
if p.metrics != nil { if p.metrics != nil {
p.metrics.setRTMPSServer(nil) p.metrics.SetRTMPSServer(nil)
} }
p.rtmpsServer.Close() p.rtmpsServer.Close()
@ -826,7 +831,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
if closeRTMPServer && p.rtmpServer != nil { if closeRTMPServer && p.rtmpServer != nil {
if p.metrics != nil { if p.metrics != nil {
p.metrics.setRTMPServer(nil) p.metrics.SetRTMPServer(nil)
} }
p.rtmpServer.Close() p.rtmpServer.Close()
@ -835,7 +840,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
if closeRTSPSServer && p.rtspsServer != nil { if closeRTSPSServer && p.rtspsServer != nil {
if p.metrics != nil { if p.metrics != nil {
p.metrics.setRTSPSServer(nil) p.metrics.SetRTSPSServer(nil)
} }
p.rtspsServer.Close() p.rtspsServer.Close()
@ -844,7 +849,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
if closeRTSPServer && p.rtspServer != nil { if closeRTSPServer && p.rtspServer != nil {
if p.metrics != nil { if p.metrics != nil {
p.metrics.setRTSPServer(nil) p.metrics.SetRTSPServer(nil)
} }
p.rtspServer.Close() p.rtspServer.Close()
@ -853,7 +858,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
if closePathManager && p.pathManager != nil { if closePathManager && p.pathManager != nil {
if p.metrics != nil { if p.metrics != nil {
p.metrics.setPathManager(nil) p.metrics.SetPathManager(nil)
} }
p.pathManager.close() p.pathManager.close()
@ -866,12 +871,12 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
} }
if closePPROF && p.pprof != nil { if closePPROF && p.pprof != nil {
p.pprof.close() p.pprof.Close()
p.pprof = nil p.pprof = nil
} }
if closeMetrics && p.metrics != nil { if closeMetrics && p.metrics != nil {
p.metrics.close() p.metrics.Close()
p.metrics = nil p.metrics = nil
} }
@ -892,8 +897,8 @@ func (p *Core) reloadConf(newConf *conf.Conf, calledByAPI bool) error {
return p.createResources(false) return p.createResources(false)
} }
// apiConfigSet is called by api. // APIConfigSet is called by api.
func (p *Core) apiConfigSet(conf *conf.Conf) { func (p *Core) APIConfigSet(conf *conf.Conf) {
select { select {
case p.chAPIConfigSet <- conf: case p.chAPIConfigSet <- conf:
case <-p.ctx.Done(): case <-p.ctx.Done():

4
internal/core/path.go

@ -1019,8 +1019,8 @@ func (pa *path) RemoveReader(req defs.PathRemoveReaderReq) {
} }
} }
// apiPathsGet is called by api. // APIPathsGet is called by api.
func (pa *path) apiPathsGet(req pathAPIPathsGetReq) (*defs.APIPath, error) { func (pa *path) APIPathsGet(req pathAPIPathsGetReq) (*defs.APIPath, error) {
req.res = make(chan pathAPIPathsGetRes) req.res = make(chan pathAPIPathsGetRes)
select { select {
case pa.chAPIPathsGet <- req: case pa.chAPIPathsGet <- req:

16
internal/core/path_manager.go

@ -433,8 +433,8 @@ func (pm *pathManager) removePath(pa *path) {
delete(pm.paths, pa.name) delete(pm.paths, pa.name)
} }
// confReload is called by core. // ReloadConf is called by core.
func (pm *pathManager) confReload(pathConfs map[string]*conf.Path) { func (pm *pathManager) ReloadConf(pathConfs map[string]*conf.Path) {
select { select {
case pm.chReloadConf <- pathConfs: case pm.chReloadConf <- pathConfs:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
@ -545,8 +545,8 @@ func (pm *pathManager) setHLSServer(s pathManagerHLSServer) {
} }
} }
// apiPathsList is called by api. // APIPathsList is called by api.
func (pm *pathManager) apiPathsList() (*defs.APIPathList, error) { func (pm *pathManager) APIPathsList() (*defs.APIPathList, error) {
req := pathAPIPathsListReq{ req := pathAPIPathsListReq{
res: make(chan pathAPIPathsListRes), res: make(chan pathAPIPathsListRes),
} }
@ -560,7 +560,7 @@ func (pm *pathManager) apiPathsList() (*defs.APIPathList, error) {
} }
for _, pa := range res.paths { for _, pa := range res.paths {
item, err := pa.apiPathsGet(pathAPIPathsGetReq{}) item, err := pa.APIPathsGet(pathAPIPathsGetReq{})
if err == nil { if err == nil {
res.data.Items = append(res.data.Items, item) res.data.Items = append(res.data.Items, item)
} }
@ -577,8 +577,8 @@ func (pm *pathManager) apiPathsList() (*defs.APIPathList, error) {
} }
} }
// apiPathsGet is called by api. // APIPathsGet is called by api.
func (pm *pathManager) apiPathsGet(name string) (*defs.APIPath, error) { func (pm *pathManager) APIPathsGet(name string) (*defs.APIPath, error) {
req := pathAPIPathsGetReq{ req := pathAPIPathsGetReq{
name: name, name: name,
res: make(chan pathAPIPathsGetRes), res: make(chan pathAPIPathsGetRes),
@ -591,7 +591,7 @@ func (pm *pathManager) apiPathsGet(name string) (*defs.APIPath, error) {
return nil, res.err return nil, res.err
} }
data, err := res.path.apiPathsGet(req) data, err := res.path.APIPathsGet(req)
return data, err return data, err
case <-pm.ctx.Done(): case <-pm.ctx.Done():

2
internal/core/path_manager_test.go

@ -78,7 +78,7 @@ func TestPathAutoDeletion(t *testing.T) {
} }
}() }()
data, err := p.pathManager.apiPathsList() data, err := p.pathManager.APIPathsList()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, len(data.Items)) require.Equal(t, 0, len(data.Items))

72
internal/core/metrics.go → internal/metrics/metrics.go

@ -1,20 +1,27 @@
package core // Package metrics contains the metrics provider.
package metrics
import ( import (
"io" "io"
"net/http" "net/http"
"reflect"
"strconv" "strconv"
"sync" "sync"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/bluenviron/mediamtx/internal/api"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpserv" "github.com/bluenviron/mediamtx/internal/protocols/httpserv"
"github.com/bluenviron/mediamtx/internal/restrictnetwork" "github.com/bluenviron/mediamtx/internal/restrictnetwork"
) )
func interfaceIsEmpty(i interface{}) bool {
return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil()
}
func metric(key string, tags string, value int64) string { func metric(key string, tags string, value int64) string {
return key + tags + " " + strconv.FormatInt(value, 10) + "\n" return key + tags + " " + strconv.FormatInt(value, 10) + "\n"
} }
@ -23,24 +30,26 @@ type metricsParent interface {
logger.Writer logger.Writer
} }
type metrics struct { // Metrics is a metrics provider.
type Metrics struct {
Address string Address string
ReadTimeout conf.StringDuration ReadTimeout conf.StringDuration
Parent metricsParent Parent metricsParent
httpServer *httpserv.WrappedServer httpServer *httpserv.WrappedServer
mutex sync.Mutex mutex sync.Mutex
pathManager apiPathManager pathManager api.PathManager
rtspServer apiRTSPServer rtspServer api.RTSPServer
rtspsServer apiRTSPServer rtspsServer api.RTSPServer
rtmpServer apiRTMPServer rtmpServer api.RTMPServer
rtmpsServer apiRTMPServer rtmpsServer api.RTMPServer
srtServer apiSRTServer srtServer api.SRTServer
hlsManager apiHLSServer hlsManager api.HLSServer
webRTCServer apiWebRTCServer webRTCServer api.WebRTCServer
} }
func (m *metrics) initialize() error { // Initialize initializes metrics.
func (m *Metrics) Initialize() error {
router := gin.New() router := gin.New()
router.SetTrustedProxies(nil) //nolint:errcheck router.SetTrustedProxies(nil) //nolint:errcheck
@ -67,20 +76,21 @@ func (m *metrics) initialize() error {
return nil return nil
} }
func (m *metrics) close() { // Close closes Metrics.
func (m *Metrics) Close() {
m.Log(logger.Info, "listener is closing") m.Log(logger.Info, "listener is closing")
m.httpServer.Close() m.httpServer.Close()
} }
// Log implements logger.Writer. // Log implements logger.Writer.
func (m *metrics) Log(level logger.Level, format string, args ...interface{}) { func (m *Metrics) Log(level logger.Level, format string, args ...interface{}) {
m.Parent.Log(level, "[metrics] "+format, args...) m.Parent.Log(level, "[metrics] "+format, args...)
} }
func (m *metrics) onMetrics(ctx *gin.Context) { func (m *Metrics) onMetrics(ctx *gin.Context) {
out := "" out := ""
data, err := m.pathManager.apiPathsList() data, err := m.pathManager.APIPathsList()
if err == nil && len(data.Items) != 0 { if err == nil && len(data.Items) != 0 {
for _, i := range data.Items { for _, i := range data.Items {
var state string var state string
@ -249,57 +259,57 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
io.WriteString(ctx.Writer, out) //nolint:errcheck io.WriteString(ctx.Writer, out) //nolint:errcheck
} }
// setPathManager is called by core. // SetPathManager is called by core.
func (m *metrics) setPathManager(s apiPathManager) { func (m *Metrics) SetPathManager(s api.PathManager) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.pathManager = s m.pathManager = s
} }
// setHLSServer is called by core. // SetHLSServer is called by core.
func (m *metrics) setHLSServer(s apiHLSServer) { func (m *Metrics) SetHLSServer(s api.HLSServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.hlsManager = s m.hlsManager = s
} }
// setRTSPServer is called by core. // SetRTSPServer is called by core.
func (m *metrics) setRTSPServer(s apiRTSPServer) { func (m *Metrics) SetRTSPServer(s api.RTSPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtspServer = s m.rtspServer = s
} }
// setRTSPSServer is called by core. // SetRTSPSServer is called by core.
func (m *metrics) setRTSPSServer(s apiRTSPServer) { func (m *Metrics) SetRTSPSServer(s api.RTSPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtspsServer = s m.rtspsServer = s
} }
// setRTMPServer is called by core. // SetRTMPServer is called by core.
func (m *metrics) setRTMPServer(s apiRTMPServer) { func (m *Metrics) SetRTMPServer(s api.RTMPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtmpServer = s m.rtmpServer = s
} }
// setRTMPSServer is called by core. // SetRTMPSServer is called by core.
func (m *metrics) setRTMPSServer(s apiRTMPServer) { func (m *Metrics) SetRTMPSServer(s api.RTMPServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.rtmpsServer = s m.rtmpsServer = s
} }
// setSRTServer is called by core. // SetSRTServer is called by core.
func (m *metrics) setSRTServer(s apiSRTServer) { func (m *Metrics) SetSRTServer(s api.SRTServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.srtServer = s m.srtServer = s
} }
// setWebRTCServer is called by core. // SetWebRTCServer is called by core.
func (m *metrics) setWebRTCServer(s apiWebRTCServer) { func (m *Metrics) SetWebRTCServer(s api.WebRTCServer) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.webRTCServer = s m.webRTCServer = s

36
internal/core/pprof.go → internal/pprof/pprof.go

@ -1,4 +1,5 @@
package core // Package pprof contains a pprof exporter.
package pprof
import ( import (
"net/http" "net/http"
@ -17,48 +18,45 @@ type pprofParent interface {
logger.Writer logger.Writer
} }
type pprof struct { // PPROF is a pprof exporter.
parent pprofParent type PPROF struct {
Address string
ReadTimeout conf.StringDuration
Parent pprofParent
httpServer *httpserv.WrappedServer httpServer *httpserv.WrappedServer
} }
func newPPROF( // Initialize initializes PPROF.
address string, func (pp *PPROF) Initialize() error {
readTimeout conf.StringDuration, network, address := restrictnetwork.Restrict("tcp", pp.Address)
parent pprofParent,
) (*pprof, error) {
pp := &pprof{
parent: parent,
}
network, address := restrictnetwork.Restrict("tcp", address)
var err error var err error
pp.httpServer, err = httpserv.NewWrappedServer( pp.httpServer, err = httpserv.NewWrappedServer(
network, network,
address, address,
time.Duration(readTimeout), time.Duration(pp.ReadTimeout),
"", "",
"", "",
http.DefaultServeMux, http.DefaultServeMux,
pp, pp,
) )
if err != nil { if err != nil {
return nil, err return err
} }
pp.Log(logger.Info, "listener opened on "+address) pp.Log(logger.Info, "listener opened on "+address)
return pp, nil return nil
} }
func (pp *pprof) close() { // Close closes PPROF.
func (pp *PPROF) Close() {
pp.Log(logger.Info, "listener is closing") pp.Log(logger.Info, "listener is closing")
pp.httpServer.Close() pp.httpServer.Close()
} }
// Log implements logger.Writer. // Log implements logger.Writer.
func (pp *pprof) Log(level logger.Level, format string, args ...interface{}) { func (pp *PPROF) Log(level logger.Level, format string, args ...interface{}) {
pp.parent.Log(level, "[pprof] "+format, args...) pp.Parent.Log(level, "[pprof] "+format, args...)
} }
Loading…
Cancel
Save