Browse Source

api: add /get endpoints (#1577) (#1823)

this allows to get entities by ID or name after /list endpoints were
changed in v0.23.0.
pull/1824/head
Alessandro Ros 3 years ago committed by GitHub
parent
commit
b93eed64bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 236
      apidocs/openapi.yaml
  2. 278
      internal/core/api.go
  3. 95
      internal/core/api_defs.go
  4. 320
      internal/core/api_test.go
  5. 6
      internal/core/core_test.go
  6. 76
      internal/core/hls_manager.go
  7. 46
      internal/core/hls_muxer.go
  8. 48
      internal/core/metrics.go
  9. 108
      internal/core/path.go
  10. 52
      internal/core/path_manager.go
  11. 20
      internal/core/rtmp_conn.go
  12. 101
      internal/core/rtmp_server.go
  13. 10
      internal/core/rtsp_conn.go
  14. 144
      internal/core/rtsp_server.go
  15. 22
      internal/core/rtsp_session.go
  16. 107
      internal/core/webrtc_manager.go
  17. 34
      internal/core/webrtc_session.go

236
apidocs/openapi.yaml

@ -471,7 +471,7 @@ components: @@ -471,7 +471,7 @@ components:
items:
$ref: '#/components/schemas/RTSPSession'
WebRTCConn:
WebRTCSession:
type: object
properties:
id:
@ -496,7 +496,7 @@ components: @@ -496,7 +496,7 @@ components:
type: integer
format: int64
WebRTCConnsList:
WebRTCSessionsList:
type: object
properties:
pageCount:
@ -504,7 +504,7 @@ components: @@ -504,7 +504,7 @@ components:
items:
type: array
items:
$ref: '#/components/schemas/WebRTCConn'
$ref: '#/components/schemas/WebRTCSession'
paths:
/v2/config/get:
@ -645,6 +645,30 @@ paths: @@ -645,6 +645,30 @@ paths:
'500':
description: internal server error.
/v2/hlsmuxers/get/{name}:
post:
operationId: hlsMuxersGet
summary: returns a HLS muxer.
description: ''
parameters:
- name: name
in: path
required: true
description: name of the muxer.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/HLSMuxer'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/paths/list:
get:
operationId: pathsList
@ -675,6 +699,30 @@ paths: @@ -675,6 +699,30 @@ paths:
'500':
description: internal server error.
/v2/paths/get/{name}:
post:
operationId: pathsGet
summary: returns a path.
description: ''
parameters:
- name: name
in: path
required: true
description: name of the path.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/Path'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/rtspconns/list:
get:
operationId: rtspConnsList
@ -705,6 +753,30 @@ paths: @@ -705,6 +753,30 @@ paths:
'500':
description: internal server error.
/v2/rtspconns/get/{id}:
post:
operationId: rtspConnsGet
summary: returns a RTSP connection.
description: ''
parameters:
- name: id
in: path
required: true
description: ID of the connection.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/RTSPConn'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/rtspsessions/list:
get:
operationId: rtspSessionsList
@ -735,6 +807,30 @@ paths: @@ -735,6 +807,30 @@ paths:
'500':
description: internal server error.
/v2/rtspsessions/get/{id}:
post:
operationId: rtspSessionsGet
summary: returns a RTSP session.
description: ''
parameters:
- name: id
in: path
required: true
description: ID of the connection.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/RTSPSession'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/rtspsessions/kick/{id}:
post:
operationId: rtspSessionsKick
@ -744,7 +840,7 @@ paths: @@ -744,7 +840,7 @@ paths:
- name: id
in: path
required: true
description: the ID of the session.
description: ID of the session.
schema:
type: string
responses:
@ -785,6 +881,30 @@ paths: @@ -785,6 +881,30 @@ paths:
'500':
description: internal server error.
/v2/rtspsconns/get/{id}:
post:
operationId: rtspsConnsGet
summary: returns a RTSPS connection.
description: ''
parameters:
- name: id
in: path
required: true
description: ID of the connection.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/RTSPConn'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/rtspssessions/list:
get:
operationId: rtspsSessionsList
@ -815,6 +935,30 @@ paths: @@ -815,6 +935,30 @@ paths:
'500':
description: internal server error.
/v2/rtspssessions/get/{id}:
post:
operationId: rtspsSessionsGet
summary: returns a RTSPS session.
description: ''
parameters:
- name: id
in: path
required: true
description: ID of the connection.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/RTSPSession'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/rtspssessions/kick/{id}:
post:
operationId: rtspsSessionsKick
@ -824,7 +968,7 @@ paths: @@ -824,7 +968,7 @@ paths:
- name: id
in: path
required: true
description: the ID of the session.
description: ID of the session.
schema:
type: string
responses:
@ -865,6 +1009,30 @@ paths: @@ -865,6 +1009,30 @@ paths:
'500':
description: internal server error.
/v2/rtmpconns/get/{id}:
post:
operationId: rtmpConnectionsGet
summary: returns a RTMP connection.
description: ''
parameters:
- name: id
in: path
required: true
description: ID of the connection.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/RTMPConn'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/rtmpconns/kick/{id}:
post:
operationId: rtmpConnsKick
@ -874,7 +1042,7 @@ paths: @@ -874,7 +1042,7 @@ paths:
- name: id
in: path
required: true
description: the ID of the connection.
description: ID of the connection.
schema:
type: string
responses:
@ -915,6 +1083,30 @@ paths: @@ -915,6 +1083,30 @@ paths:
'500':
description: internal server error.
/v2/rtmpsconns/get/{id}:
post:
operationId: rtmpsConnectionsGet
summary: returns a RTMPS connection.
description: ''
parameters:
- name: id
in: path
required: true
description: ID of the connection.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/RTMPConn'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/rtmpsconns/kick/{id}:
post:
operationId: rtmpsConnsKick
@ -924,7 +1116,7 @@ paths: @@ -924,7 +1116,7 @@ paths:
- name: id
in: path
required: true
description: the ID of the connection.
description: ID of the connection.
schema:
type: string
responses:
@ -959,7 +1151,31 @@ paths: @@ -959,7 +1151,31 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/WebRTCConnsList'
$ref: '#/components/schemas/WebRTCSessionsList'
'400':
description: invalid request.
'500':
description: internal server error.
/v2/webrtcsessions/get/{id}:
post:
operationId: webrtcSessionsGet
summary: returns a WebRTC session.
description: ''
parameters:
- name: id
in: path
required: true
description: ID of the session.
schema:
type: string
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/WebRTCSession'
'400':
description: invalid request.
'500':
@ -968,13 +1184,13 @@ paths: @@ -968,13 +1184,13 @@ paths:
/v2/webrtcsessions/kick/{id}:
post:
operationId: webrtcSessionsKick
summary: kicks out a WebRTC connection from the server.
summary: kicks out a WebRTC session from the server.
description: ''
parameters:
- name: id
in: path
required: true
description: the ID of the session.
description: ID of the session.
schema:
type: string
responses:

278
internal/core/api.go

@ -126,22 +126,33 @@ func paginate(itemsPtr interface{}, itemsPerPageStr string, pageStr string) (int @@ -126,22 +126,33 @@ func paginate(itemsPtr interface{}, itemsPerPageStr string, pageStr string) (int
}
type apiPathManager interface {
apiPathsList() pathAPIPathsListRes
apiPathsList() (*apiPathsList, error)
apiPathsGet(string) (*apiPath, error)
}
type apiHLSManager interface {
apiMuxersList() hlsManagerAPIMuxersListRes
apiMuxersList() (*apiHLSMuxersList, error)
apiMuxersGet(string) (*apiHLSMuxer, error)
}
type apiRTSPServer interface {
apiConnsList() rtspServerAPIConnsListRes
apiSessionsList() rtspServerAPISessionsListRes
apiSessionsKick(uuid.UUID) rtspServerAPISessionsKickRes
apiConnsList() (*apiRTSPConnsList, error)
apiConnsGet(uuid.UUID) (*apiRTSPConn, error)
apiSessionsList() (*apiRTSPSessionsList, error)
apiSessionsGet(uuid.UUID) (*apiRTSPSession, error)
apiSessionsKick(uuid.UUID) error
}
type apiRTMPServer interface {
apiConnsList() rtmpServerAPIConnsListRes
apiConnsKick(uuid.UUID) rtmpServerAPIConnsKickRes
apiConnsList() (*apiRTMPConnsList, error)
apiConnsGet(uuid.UUID) (*apiRTMPConn, error)
apiConnsKick(uuid.UUID) error
}
type apiWebRTCManager interface {
apiSessionsList() (*apiWebRTCSessionsList, error)
apiSessionsGet(uuid.UUID) (*apiWebRTCSession, error)
apiSessionsKick(uuid.UUID) error
}
type apiParent interface {
@ -149,11 +160,6 @@ type apiParent interface { @@ -149,11 +160,6 @@ type apiParent interface {
apiConfigSet(conf *conf.Conf)
}
type apiWebRTCManager interface {
apiSessionsList() webRTCManagerAPISessionsListRes
apiSessionsKick(uuid.UUID) webRTCManagerAPISessionsKickRes
}
type api struct {
conf *conf.Conf
pathManager apiPathManager
@ -209,34 +215,43 @@ func newAPI( @@ -209,34 +215,43 @@ func newAPI(
if !interfaceIsEmpty(a.hlsManager) {
group.GET("/v2/hlsmuxers/list", a.onHLSMuxersList)
group.GET("/v2/hlsmuxers/get/:name", a.onHLSMuxersGet)
}
group.GET("/v2/paths/list", a.onPathsList)
group.GET("/v2/paths/get/:name", a.onPathsGet)
if !interfaceIsEmpty(a.rtspServer) {
group.GET("/v2/rtspconns/list", a.onRTSPConnsList)
group.GET("/v2/rtspconns/get/:id", a.onRTSPConnsGet)
group.GET("/v2/rtspsessions/list", a.onRTSPSessionsList)
group.GET("/v2/rtspsessions/get/:id", a.onRTSPSessionsGet)
group.POST("/v2/rtspsessions/kick/:id", a.onRTSPSessionsKick)
}
if !interfaceIsEmpty(a.rtspsServer) {
group.GET("/v2/rtspsconns/list", a.onRTSPSConnsList)
group.GET("/v2/rtspsconns/get/:id", a.onRTSPSConnsGet)
group.GET("/v2/rtspssessions/list", a.onRTSPSSessionsList)
group.GET("/v2/rtspssessions/get/:id", a.onRTSPSSessionsGet)
group.POST("/v2/rtspssessions/kick/:id", a.onRTSPSSessionsKick)
}
if !interfaceIsEmpty(a.rtmpServer) {
group.GET("/v2/rtmpconns/list", a.onRTMPConnsList)
group.GET("/v2/rtmpconns/get/:id", a.onRTMPConnsGet)
group.POST("/v2/rtmpconns/kick/:id", a.onRTMPConnsKick)
}
if !interfaceIsEmpty(a.rtmpsServer) {
group.GET("/v2/rtmpsconns/list", a.onRTMPSConnsList)
group.GET("/v2/rtmpsconns/get/:id", a.onRTMPSConnsGet)
group.POST("/v2/rtmpsconns/kick/:id", a.onRTMPSConnsKick)
}
if !interfaceIsEmpty(a.webRTCManager) {
group.GET("/v2/webrtcsessions/list", a.onWebRTCSessionsList)
group.GET("/v2/webrtcsessions/get/:id", a.onWebRTCSessionsGet)
group.POST("/v2/webrtcsessions/kick/:id", a.onWebRTCSessionsKick)
}
@ -425,54 +440,98 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { @@ -425,54 +440,98 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) {
}
func (a *api) onPathsList(ctx *gin.Context) {
res := a.pathManager.apiPathsList()
if res.err != nil {
data, err := a.pathManager.apiPathsList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *api) onPathsGet(ctx *gin.Context) {
data, err := a.pathManager.apiPathsGet(ctx.Param("name"))
if err != nil {
if err.Error() == "not found" {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.data)
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPConnsList(ctx *gin.Context) {
res := a.rtspServer.apiConnsList()
if res.err != nil {
data, err := a.rtspServer.apiConnsList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
data.PageCount = pageCount
ctx.JSON(http.StatusOK, res.data)
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
data, err := a.rtspServer.apiConnsGet(uuid)
if err != nil {
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSessionsList(ctx *gin.Context) {
res := a.rtspServer.apiSessionsList()
if res.err != nil {
data, err := a.rtspServer.apiSessionsList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
ctx.JSON(http.StatusOK, res.data)
data, err := a.rtspServer.apiSessionsGet(uuid)
if err != nil {
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
@ -482,8 +541,8 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { @@ -482,8 +541,8 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
return
}
res := a.rtspServer.apiSessionsKick(uuid)
if res.err != nil {
err = a.rtspServer.apiSessionsKick(uuid)
if err != nil {
return
}
@ -491,37 +550,67 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { @@ -491,37 +550,67 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
}
func (a *api) onRTSPSConnsList(ctx *gin.Context) {
res := a.rtspsServer.apiConnsList()
if res.err != nil {
data, err := a.rtspsServer.apiConnsList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
ctx.JSON(http.StatusOK, res.data)
data, err := a.rtspsServer.apiConnsGet(uuid)
if err != nil {
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
res := a.rtspsServer.apiSessionsList()
if res.err != nil {
data, err := a.rtspsServer.apiSessionsList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
data.PageCount = pageCount
ctx.JSON(http.StatusOK, res.data)
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
data, err := a.rtspsServer.apiSessionsGet(uuid)
if err != nil {
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
@ -531,8 +620,8 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { @@ -531,8 +620,8 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
return
}
res := a.rtspsServer.apiSessionsKick(uuid)
if res.err != nil {
err = a.rtspsServer.apiSessionsKick(uuid)
if err != nil {
return
}
@ -540,20 +629,35 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { @@ -540,20 +629,35 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
}
func (a *api) onRTMPConnsList(ctx *gin.Context) {
res := a.rtmpServer.apiConnsList()
if res.err != nil {
data, err := a.rtmpServer.apiConnsList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTMPConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
ctx.JSON(http.StatusOK, res.data)
data, err := a.rtmpServer.apiConnsGet(uuid)
if err != nil {
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTMPConnsKick(ctx *gin.Context) {
@ -563,8 +667,8 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { @@ -563,8 +667,8 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) {
return
}
res := a.rtmpServer.apiConnsKick(uuid)
if res.err != nil {
err = a.rtmpServer.apiConnsKick(uuid)
if err != nil {
return
}
@ -572,20 +676,35 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { @@ -572,20 +676,35 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) {
}
func (a *api) onRTMPSConnsList(ctx *gin.Context) {
res := a.rtmpsServer.apiConnsList()
if res.err != nil {
data, err := a.rtmpsServer.apiConnsList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
data.PageCount = pageCount
ctx.JSON(http.StatusOK, res.data)
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTMPSConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
data, err := a.rtmpsServer.apiConnsGet(uuid)
if err != nil {
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTMPSConnsKick(ctx *gin.Context) {
@ -595,8 +714,8 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) { @@ -595,8 +714,8 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) {
return
}
res := a.rtmpsServer.apiConnsKick(uuid)
if res.err != nil {
err = a.rtmpsServer.apiConnsKick(uuid)
if err != nil {
return
}
@ -604,37 +723,62 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) { @@ -604,37 +723,62 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) {
}
func (a *api) onHLSMuxersList(ctx *gin.Context) {
res := a.hlsManager.apiMuxersList()
if res.err != nil {
data, err := a.hlsManager.apiMuxersList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *api) onHLSMuxersGet(ctx *gin.Context) {
data, err := a.hlsManager.apiMuxersGet(ctx.Param("name"))
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.data)
ctx.JSON(http.StatusOK, data)
}
func (a *api) onWebRTCSessionsList(ctx *gin.Context) {
res := a.webRTCManager.apiSessionsList()
if res.err != nil {
data, err := a.webRTCManager.apiSessionsList()
if err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *api) onWebRTCSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
res.data.PageCount = pageCount
ctx.JSON(http.StatusOK, res.data)
data, err := a.webRTCManager.apiSessionsGet(uuid)
if err != nil {
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onWebRTCSessionsKick(ctx *gin.Context) {
@ -644,8 +788,8 @@ func (a *api) onWebRTCSessionsKick(ctx *gin.Context) { @@ -644,8 +788,8 @@ func (a *api) onWebRTCSessionsKick(ctx *gin.Context) {
return
}
res := a.webRTCManager.apiSessionsKick(uuid)
if res.err != nil {
err = a.webRTCManager.apiSessionsKick(uuid)
if err != nil {
return
}

95
internal/core/api_defs.go

@ -0,0 +1,95 @@ @@ -0,0 +1,95 @@
package core
import (
"time"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/conf"
)
type apiPath struct {
Name string `json:"name"`
ConfName string `json:"confName"`
Conf *conf.PathConf `json:"conf"`
Source interface{} `json:"source"`
SourceReady bool `json:"sourceReady"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
Readers []interface{} `json:"readers"`
}
type apiPathsList struct {
PageCount int `json:"pageCount"`
Items []*apiPath `json:"items"`
}
type apiHLSMuxer struct {
Path string `json:"path"`
Created time.Time `json:"created"`
LastRequest time.Time `json:"lastRequest"`
BytesSent uint64 `json:"bytesSent"`
}
type apiHLSMuxersList struct {
PageCount int `json:"pageCount"`
Items []*apiHLSMuxer `json:"items"`
}
type apiRTSPConn struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type apiRTSPConnsList struct {
PageCount int `json:"pageCount"`
Items []*apiRTSPConn `json:"items"`
}
type apiRTMPConn struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type apiRTMPConnsList struct {
PageCount int `json:"pageCount"`
Items []*apiRTMPConn `json:"items"`
}
type apiRTSPSession struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type apiRTSPSessionsList struct {
PageCount int `json:"pageCount"`
Items []*apiRTSPSession `json:"items"`
}
type apiWebRTCSession struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
State string `json:"state"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type apiWebRTCSessionsList struct {
PageCount int `json:"pageCount"`
Items []*apiWebRTCSession `json:"items"`
}

320
internal/core/api_test.go

@ -394,7 +394,60 @@ func TestAPIPathsList(t *testing.T) { @@ -394,7 +394,60 @@ func TestAPIPathsList(t *testing.T) {
})
}
func TestAPIProtocolSpecificList(t *testing.T) {
func TestAPIPathsGet(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath", media.Medias{testMediaH264})
require.NoError(t, err)
defer source.Close()
for _, ca := range []string{"ok", "not found"} {
t.Run(ca, func(t *testing.T) {
type pathSource struct {
Type string `json:"type"`
}
type path struct {
Name string `json:"name"`
Source pathSource `json:"source"`
SourceReady bool `json:"sourceReady"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
}
var pathName string
if ca == "ok" {
pathName = "mypath"
} else {
pathName = "nonexisting"
}
var out path
err := httpRequest(http.MethodGet, "http://localhost:9997/v2/paths/get/"+pathName, nil, &out)
if ca == "ok" {
require.NoError(t, err)
require.Equal(t, path{
Name: "mypath",
Source: pathSource{
Type: "rtspSession",
},
SourceReady: true,
Tracks: []string{"H264"},
}, out)
} else {
require.EqualError(t, err, "bad status code: 404")
}
})
}
}
func TestAPIProtocolList(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
@ -438,7 +491,7 @@ func TestAPIProtocolSpecificList(t *testing.T) { @@ -438,7 +491,7 @@ func TestAPIProtocolSpecificList(t *testing.T) {
medi := testMediaH264
switch ca {
switch ca { //nolint:dupl
case "rtsp conns", "rtsp sessions":
source := gortsplib.Client{}
@ -640,7 +693,268 @@ func TestAPIProtocolSpecificList(t *testing.T) { @@ -640,7 +693,268 @@ func TestAPIProtocolSpecificList(t *testing.T) {
}
}
func TestAPIKick(t *testing.T) {
func TestAPIProtocolGet(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp conns",
"rtsp sessions",
"rtsps conns",
"rtsps sessions",
"rtmp",
"rtmps",
"hls",
"webrtc",
} {
t.Run(ca, func(t *testing.T) {
conf := "api: yes\n"
switch ca {
case "rtsps conns", "rtsps sessions":
conf += "protocols: [tcp]\n" +
"encryption: strict\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n"
case "rtmps":
conf += "rtmpEncryption: strict\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n"
}
conf += "paths:\n" +
" all:\n"
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
medi := testMediaH264
switch ca { //nolint:dupl
case "rtsp conns", "rtsp sessions":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath", media.Medias{medi})
require.NoError(t, err)
defer source.Close()
case "rtsps conns", "rtsps sessions":
source := gortsplib.Client{
TLSConfig: &tls.Config{InsecureSkipVerify: true},
}
err := source.StartRecording("rtsps://localhost:8322/mypath", media.Medias{medi})
require.NoError(t, err)
defer source.Close()
case "rtmp", "rtmps":
var port string
if ca == "rtmp" {
port = "1935"
} else {
port = "1936"
}
u, err := url.Parse("rtmp://127.0.0.1:" + port + "/mypath")
require.NoError(t, err)
nconn, err := func() (net.Conn, error) {
if ca == "rtmp" {
return net.Dial("tcp", u.Host)
}
return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})
}()
require.NoError(t, err)
defer nconn.Close()
conn := rtmp.NewConn(nconn)
err = conn.InitializeClient(u, true)
require.NoError(t, err)
err = conn.WriteTracks(testFormatH264, nil)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
case "hls":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
media.Medias{medi})
require.NoError(t, err)
defer source.Close()
go func() {
time.Sleep(500 * time.Millisecond)
for i := 0; i < 3; i++ {
/*source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i)*90000,
SSRC: 563423,
},
Payload: []byte{
testSPS,
0x05,
},
})
[]byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},*/
source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i)*90000,
SSRC: 563423,
},
Payload: []byte{
// testSPS,
0x05,
},
})
}
}()
func() {
res, err := http.Get("http://localhost:8888/mypath/index.m3u8")
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, 200, res.StatusCode)
}()
case "webrtc":
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/mypath",
media.Medias{medi})
require.NoError(t, err)
defer source.Close()
c := newWebRTCTestClient(t, "http://localhost:8889/mypath/whep", false)
defer c.close()
time.Sleep(500 * time.Millisecond)
source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
<-c.incomingTrack
}
switch ca {
case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps":
var pa string
switch ca {
case "rtsp conns":
pa = "rtspconns"
case "rtsp sessions":
pa = "rtspsessions"
case "rtsps conns":
pa = "rtspsconns"
case "rtsps sessions":
pa = "rtspssessions"
case "rtmp":
pa = "rtmpconns"
case "rtmps":
pa = "rtmpsconns"
}
type item struct {
ID string `json:"id"`
State string `json:"state"`
}
var out1 struct {
Items []item `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v2/"+pa+"/list", nil, &out1)
require.NoError(t, err)
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out1.Items[0].State)
}
var out2 item
err = httpRequest(http.MethodGet, "http://localhost:9997/v2/"+pa+"/get/"+out1.Items[0].ID, nil, &out2)
require.NoError(t, err)
if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out2.State)
}
case "hls":
type item struct {
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
}
var out item
err = httpRequest(http.MethodGet, "http://localhost:9997/v2/hlsmuxers/get/mypath", nil, &out)
require.NoError(t, err)
s := fmt.Sprintf("^%d-", time.Now().Year())
require.Regexp(t, s, out.Created)
require.Regexp(t, s, out.LastRequest)
case "webrtc":
type item struct {
ID string `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
var out1 struct {
Items []item `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v2/webrtcsessions/list", nil, &out1)
require.NoError(t, err)
var out2 item
err = httpRequest(http.MethodGet, "http://localhost:9997/v2/webrtcsessions/get/"+out1.Items[0].ID, nil, &out2)
require.NoError(t, err)
require.Equal(t, true, out2.PeerConnectionEstablished)
}
})
}
}
func TestAPIProtocolKick(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)

6
internal/core/core_test.go

@ -165,10 +165,10 @@ func TestCorePathAutoDeletion(t *testing.T) { @@ -165,10 +165,10 @@ func TestCorePathAutoDeletion(t *testing.T) {
}
}()
res := p.pathManager.apiPathsList()
require.NoError(t, res.err)
data, err := p.pathManager.apiPathsList()
require.NoError(t, err)
require.Equal(t, 0, len(res.data.Items))
require.Equal(t, 0, len(data.Items))
})
}
}

76
internal/core/hls_manager.go

@ -4,37 +4,28 @@ import ( @@ -4,37 +4,28 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
)
type hlsManagerAPIMuxersListItem struct {
Path string `json:"path"`
Created time.Time `json:"created"`
LastRequest time.Time `json:"lastRequest"`
BytesSent uint64 `json:"bytesSent"`
}
type hlsManagerAPIMuxersListData struct {
PageCount int `json:"pageCount"`
Items []hlsManagerAPIMuxersListItem `json:"items"`
}
type hlsManagerAPIMuxersListRes struct {
data *hlsManagerAPIMuxersListData
muxers map[string]*hlsMuxer
err error
data *apiHLSMuxersList
err error
}
type hlsManagerAPIMuxersListReq struct {
res chan hlsManagerAPIMuxersListRes
}
type hlsManagerAPIMuxersListSubReq struct {
data *hlsManagerAPIMuxersListData
res chan struct{}
type hlsManagerAPIMuxersGetRes struct {
data *apiHLSMuxer
err error
}
type hlsManagerAPIMuxersGetReq struct {
name string
res chan hlsManagerAPIMuxersGetRes
}
type hlsManagerParent interface {
@ -67,6 +58,7 @@ type hlsManager struct { @@ -67,6 +58,7 @@ type hlsManager struct {
chHandleRequest chan hlsMuxerHandleRequestReq
chMuxerClose chan *hlsMuxer
chAPIMuxerList chan hlsManagerAPIMuxersListReq
chAPIMuxerGet chan hlsManagerAPIMuxersGetReq
}
func newHLSManager(
@ -114,6 +106,7 @@ func newHLSManager( @@ -114,6 +106,7 @@ func newHLSManager(
chHandleRequest: make(chan hlsMuxerHandleRequestReq),
chMuxerClose: make(chan *hlsMuxer),
chAPIMuxerList: make(chan hlsManagerAPIMuxersListReq),
chAPIMuxerGet: make(chan hlsManagerAPIMuxersGetReq),
}
var err error
@ -199,16 +192,27 @@ outer: @@ -199,16 +192,27 @@ outer:
delete(m.muxers, c.PathName())
case req := <-m.chAPIMuxerList:
muxers := make(map[string]*hlsMuxer)
data := &apiHLSMuxersList{
Items: []*apiHLSMuxer{},
}
for name, m := range m.muxers {
muxers[name] = m
for _, muxer := range m.muxers {
data.Items = append(data.Items, muxer.apiItem())
}
req.res <- hlsManagerAPIMuxersListRes{
muxers: muxers,
data: data,
}
case req := <-m.chAPIMuxerGet:
muxer, ok := m.muxers[req.name]
if !ok {
req.res <- hlsManagerAPIMuxersGetRes{err: fmt.Errorf("not found")}
continue
}
req.res <- hlsManagerAPIMuxersGetRes{data: muxer.apiItem()}
case <-m.ctx.Done():
break outer
}
@ -271,7 +275,7 @@ func (m *hlsManager) pathSourceNotReady(pa *path) { @@ -271,7 +275,7 @@ func (m *hlsManager) pathSourceNotReady(pa *path) {
}
// apiMuxersList is called by api.
func (m *hlsManager) apiMuxersList() hlsManagerAPIMuxersListRes {
func (m *hlsManager) apiMuxersList() (*apiHLSMuxersList, error) {
req := hlsManagerAPIMuxersListReq{
res: make(chan hlsManagerAPIMuxersListRes),
}
@ -279,19 +283,27 @@ func (m *hlsManager) apiMuxersList() hlsManagerAPIMuxersListRes { @@ -279,19 +283,27 @@ func (m *hlsManager) apiMuxersList() hlsManagerAPIMuxersListRes {
select {
case m.chAPIMuxerList <- req:
res := <-req.res
return res.data, res.err
res.data = &hlsManagerAPIMuxersListData{
Items: []hlsManagerAPIMuxersListItem{},
}
case <-m.ctx.Done():
return nil, fmt.Errorf("terminated")
}
}
for _, pa := range res.muxers {
pa.apiMuxersList(hlsManagerAPIMuxersListSubReq{data: res.data})
}
// apiMuxersGet is called by api.
func (m *hlsManager) apiMuxersGet(name string) (*apiHLSMuxer, error) {
req := hlsManagerAPIMuxersGetReq{
name: name,
res: make(chan hlsManagerAPIMuxersGetRes),
}
return res
select {
case m.chAPIMuxerGet <- req:
res := <-req.res
return res.data, res.err
case <-m.ctx.Done():
return hlsManagerAPIMuxersListRes{err: fmt.Errorf("terminated")}
return nil, fmt.Errorf("terminated")
}
}

46
internal/core/hls_muxer.go

@ -30,6 +30,10 @@ const ( @@ -30,6 +30,10 @@ const (
hlsMuxerRecreatePause = 10 * time.Second
)
func int64Ptr(v int64) *int64 {
return &v
}
type responseWriterWithCounter struct {
http.ResponseWriter
bytesSent *uint64
@ -80,8 +84,7 @@ type hlsMuxer struct { @@ -80,8 +84,7 @@ type hlsMuxer struct {
bytesSent *uint64
// in
chRequest chan *hlsMuxerHandleRequestReq
chAPIHLSMuxersList chan hlsManagerAPIMuxersListSubReq
chRequest chan *hlsMuxerHandleRequestReq
}
func newHLSMuxer(
@ -121,13 +124,9 @@ func newHLSMuxer( @@ -121,13 +124,9 @@ func newHLSMuxer(
ctx: ctx,
ctxCancel: ctxCancel,
created: time.Now(),
lastRequestTime: func() *int64 {
v := time.Now().UnixNano()
return &v
}(),
bytesSent: new(uint64),
chRequest: make(chan *hlsMuxerHandleRequestReq),
chAPIHLSMuxersList: make(chan hlsManagerAPIMuxersListSubReq),
lastRequestTime: int64Ptr(time.Now().UnixNano()),
bytesSent: new(uint64),
chRequest: make(chan *hlsMuxerHandleRequestReq),
}
m.Log(logger.Info, "created %s", func() string {
@ -201,15 +200,6 @@ func (m *hlsMuxer) run() { @@ -201,15 +200,6 @@ func (m *hlsMuxer) run() {
m.requests = append(m.requests, req)
}
case req := <-m.chAPIHLSMuxersList:
req.data.Items = append(req.data.Items, hlsManagerAPIMuxersListItem{
Path: m.pathName,
Created: m.created,
LastRequest: time.Unix(0, atomic.LoadInt64(m.lastRequestTime)),
BytesSent: atomic.LoadUint64(m.bytesSent),
})
close(req.res)
case <-innerReady:
isReady = true
for _, req := range m.requests {
@ -555,17 +545,6 @@ func (m *hlsMuxer) processRequest(req *hlsMuxerHandleRequestReq) { @@ -555,17 +545,6 @@ func (m *hlsMuxer) processRequest(req *hlsMuxerHandleRequestReq) {
}
}
// apiMuxersList is called by api.
func (m *hlsMuxer) apiMuxersList(req hlsManagerAPIMuxersListSubReq) {
req.res = make(chan struct{})
select {
case m.chAPIHLSMuxersList <- req:
<-req.res
case <-m.ctx.Done():
}
}
// apiReaderDescribe implements reader.
func (m *hlsMuxer) apiReaderDescribe() pathAPISourceOrReader {
return pathAPISourceOrReader{
@ -573,3 +552,12 @@ func (m *hlsMuxer) apiReaderDescribe() pathAPISourceOrReader { @@ -573,3 +552,12 @@ func (m *hlsMuxer) apiReaderDescribe() pathAPISourceOrReader {
ID: "",
}
}
func (m *hlsMuxer) apiItem() *apiHLSMuxer {
return &apiHLSMuxer{
Path: m.pathName,
Created: m.created,
LastRequest: time.Unix(0, atomic.LoadInt64(m.lastRequestTime)),
BytesSent: atomic.LoadUint64(m.bytesSent),
}
}

48
internal/core/metrics.go

@ -78,9 +78,9 @@ func (m *metrics) Log(level logger.Level, format string, args ...interface{}) { @@ -78,9 +78,9 @@ func (m *metrics) Log(level logger.Level, format string, args ...interface{}) {
func (m *metrics) onMetrics(ctx *gin.Context) {
out := ""
res := m.pathManager.apiPathsList()
if res.err == nil && len(res.data.Items) != 0 {
for _, i := range res.data.Items {
data, err := m.pathManager.apiPathsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
var state string
if i.SourceReady {
state = "ready"
@ -97,9 +97,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -97,9 +97,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
if !interfaceIsEmpty(m.hlsManager) {
res := m.hlsManager.apiMuxersList()
if res.err == nil && len(res.data.Items) != 0 {
for _, i := range res.data.Items {
data, err := m.hlsManager.apiMuxersList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{name=\"" + i.Path + "\"}"
out += metric("hls_muxers", tags, 1)
out += metric("hls_muxers_bytes_sent", tags, int64(i.BytesSent))
@ -112,9 +112,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -112,9 +112,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
if !interfaceIsEmpty(m.rtspServer) { //nolint:dupl
func() {
res := m.rtspServer.apiConnsList()
if res.err == nil && len(res.data.Items) != 0 {
for _, i := range res.data.Items {
data, err := m.rtspServer.apiConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\"}"
out += metric("rtsp_conns", tags, 1)
out += metric("rtsp_conns_bytes_received", tags, int64(i.BytesReceived))
@ -128,9 +128,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -128,9 +128,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}()
func() {
res := m.rtspServer.apiSessionsList()
if res.err == nil && len(res.data.Items) != 0 {
for _, i := range res.data.Items {
data, err := m.rtspServer.apiSessionsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}"
out += metric("rtsp_sessions", tags, 1)
out += metric("rtsp_sessions_bytes_received", tags, int64(i.BytesReceived))
@ -146,9 +146,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -146,9 +146,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
if !interfaceIsEmpty(m.rtspsServer) { //nolint:dupl
func() {
res := m.rtspsServer.apiConnsList()
if res.err == nil && len(res.data.Items) != 0 {
for _, i := range res.data.Items {
data, err := m.rtspsServer.apiConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\"}"
out += metric("rtsps_conns", tags, 1)
out += metric("rtsps_conns_bytes_received", tags, int64(i.BytesReceived))
@ -162,9 +162,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -162,9 +162,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}()
func() {
res := m.rtspsServer.apiSessionsList()
if res.err == nil && len(res.data.Items) != 0 {
for _, i := range res.data.Items {
data, err := m.rtspsServer.apiSessionsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}"
out += metric("rtsps_sessions", tags, 1)
out += metric("rtsps_sessions_bytes_received", tags, int64(i.BytesReceived))
@ -179,9 +179,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -179,9 +179,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
if !interfaceIsEmpty(m.rtmpServer) {
res := m.rtmpServer.apiConnsList()
if res.err == nil && len(res.data.Items) != 0 {
for _, i := range res.data.Items {
data, err := m.rtmpServer.apiConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}"
out += metric("rtmp_conns", tags, 1)
out += metric("rtmp_conns_bytes_received", tags, int64(i.BytesReceived))
@ -195,9 +195,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -195,9 +195,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
if !interfaceIsEmpty(m.webRTCManager) {
res := m.webRTCManager.apiSessionsList()
if res.err == nil && len(res.data.Items) != 0 {
for _, i := range res.data.Items {
data, err := m.webRTCManager.apiSessionsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\"}"
out += metric("webrtc_sessions", tags, 1)
out += metric("webrtc_sessions_bytes_received", tags, int64(i.BytesReceived))

108
internal/core/path.go

@ -158,35 +158,24 @@ type pathAPISourceOrReader struct { @@ -158,35 +158,24 @@ type pathAPISourceOrReader struct {
ID string `json:"id"`
}
type pathAPIPathsListItem struct {
Name string `json:"name"`
ConfName string `json:"confName"`
Conf *conf.PathConf `json:"conf"`
Source interface{} `json:"source"`
SourceReady bool `json:"sourceReady"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
Readers []interface{} `json:"readers"`
}
type pathAPIPathsListData struct {
PageCount int `json:"pageCount"`
Items []pathAPIPathsListItem `json:"items"`
}
type pathAPIPathsListRes struct {
data *pathAPIPathsListData
data *apiPathsList
paths map[string]*path
err error
}
type pathAPIPathsListReq struct {
res chan pathAPIPathsListRes
}
type pathAPIPathsListSubReq struct {
data *pathAPIPathsListData
res chan struct{}
type pathAPIPathsGetRes struct {
path *path
data *apiPath
err error
}
type pathAPIPathsGetReq struct {
name string
res chan pathAPIPathsGetRes
}
type path struct {
@ -232,7 +221,7 @@ type path struct { @@ -232,7 +221,7 @@ type path struct {
chPublisherStop chan pathPublisherStopReq
chReaderAdd chan pathReaderAddReq
chReaderRemove chan pathReaderRemoveReq
chAPIPathsList chan pathAPIPathsListSubReq
chAPIPathsGet chan pathAPIPathsGetReq
// out
done chan struct{}
@ -286,7 +275,7 @@ func newPath( @@ -286,7 +275,7 @@ func newPath(
chPublisherStop: make(chan pathPublisherStopReq),
chReaderAdd: make(chan pathReaderAddReq),
chReaderRemove: make(chan pathReaderRemoveReq),
chAPIPathsList: make(chan pathAPIPathsListSubReq),
chAPIPathsGet: make(chan pathAPIPathsGetReq),
done: make(chan struct{}),
}
@ -489,8 +478,8 @@ func (pa *path) run() { @@ -489,8 +478,8 @@ func (pa *path) run() {
case req := <-pa.chReaderRemove:
pa.handleReaderRemove(req)
case req := <-pa.chAPIPathsList:
pa.handleAPIPathsList(req)
case req := <-pa.chAPIPathsGet:
pa.handleAPIPathsGet(req)
case <-pa.ctx.Done():
return fmt.Errorf("terminated")
@ -898,34 +887,35 @@ func (pa *path) handleReaderAddPost(req pathReaderAddReq) { @@ -898,34 +887,35 @@ func (pa *path) handleReaderAddPost(req pathReaderAddReq) {
}
}
func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) {
req.data.Items = append(req.data.Items, pathAPIPathsListItem{
Name: pa.name,
ConfName: pa.confName,
Conf: pa.conf,
Source: func() interface{} {
if pa.source == nil {
return nil
}
return pa.source.apiSourceDescribe()
}(),
SourceReady: pa.stream != nil,
Tracks: func() []string {
if pa.stream == nil {
return []string{}
}
return mediasDescription(pa.stream.medias())
}(),
BytesReceived: atomic.LoadUint64(pa.bytesReceived),
Readers: func() []interface{} {
ret := []interface{}{}
for r := range pa.readers {
ret = append(ret, r.apiReaderDescribe())
}
return ret
}(),
})
close(req.res)
func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) {
req.res <- pathAPIPathsGetRes{
data: &apiPath{
Name: pa.name,
ConfName: pa.confName,
Conf: pa.conf,
Source: func() interface{} {
if pa.source == nil {
return nil
}
return pa.source.apiSourceDescribe()
}(),
SourceReady: pa.stream != nil,
Tracks: func() []string {
if pa.stream == nil {
return []string{}
}
return mediasDescription(pa.stream.medias())
}(),
BytesReceived: atomic.LoadUint64(pa.bytesReceived),
Readers: func() []interface{} {
ret := []interface{}{}
for r := range pa.readers {
ret = append(ret, r.apiReaderDescribe())
}
return ret
}(),
},
}
}
// reloadConf is called by pathManager.
@ -1039,13 +1029,15 @@ func (pa *path) readerRemove(req pathReaderRemoveReq) { @@ -1039,13 +1029,15 @@ func (pa *path) readerRemove(req pathReaderRemoveReq) {
}
}
// apiPathsList is called by api.
func (pa *path) apiPathsList(req pathAPIPathsListSubReq) {
req.res = make(chan struct{})
// apiPathsGet is called by api.
func (pa *path) apiPathsGet(req pathAPIPathsGetReq) (*apiPath, error) {
req.res = make(chan pathAPIPathsGetRes)
select {
case pa.chAPIPathsList <- req:
<-req.res
case pa.chAPIPathsGet <- req:
res := <-req.res
return res.data, res.err
case <-pa.ctx.Done():
return nil, fmt.Errorf("terminated")
}
}

52
internal/core/path_manager.go

@ -69,6 +69,7 @@ type pathManager struct { @@ -69,6 +69,7 @@ type pathManager struct {
chPublisherAdd chan pathPublisherAddReq
chHLSManagerSet chan pathManagerHLSManager
chAPIPathsList chan pathAPIPathsListReq
chAPIPathsGet chan pathAPIPathsGetReq
}
func newPathManager(
@ -113,6 +114,7 @@ func newPathManager( @@ -113,6 +114,7 @@ func newPathManager(
chPublisherAdd: make(chan pathPublisherAddReq),
chHLSManagerSet: make(chan pathManagerHLSManager),
chAPIPathsList: make(chan pathAPIPathsListReq),
chAPIPathsGet: make(chan pathAPIPathsGetReq),
}
for pathConfName, pathConf := range pm.pathConfs {
@ -292,10 +294,17 @@ outer: @@ -292,10 +294,17 @@ outer:
paths[name] = pa
}
req.res <- pathAPIPathsListRes{
paths: paths,
req.res <- pathAPIPathsListRes{paths: paths}
case req := <-pm.chAPIPathsGet:
path, ok := pm.paths[req.name]
if !ok {
req.res <- pathAPIPathsGetRes{err: fmt.Errorf("not found")}
continue
}
req.res <- pathAPIPathsGetRes{path: path}
case <-pm.ctx.Done():
break outer
}
@ -482,7 +491,7 @@ func (pm *pathManager) hlsManagerSet(s pathManagerHLSManager) { @@ -482,7 +491,7 @@ func (pm *pathManager) hlsManagerSet(s pathManagerHLSManager) {
}
// apiPathsList is called by api.
func (pm *pathManager) apiPathsList() pathAPIPathsListRes {
func (pm *pathManager) apiPathsList() (*apiPathsList, error) {
req := pathAPIPathsListReq{
res: make(chan pathAPIPathsListRes),
}
@ -491,17 +500,44 @@ func (pm *pathManager) apiPathsList() pathAPIPathsListRes { @@ -491,17 +500,44 @@ func (pm *pathManager) apiPathsList() pathAPIPathsListRes {
case pm.chAPIPathsList <- req:
res := <-req.res
res.data = &pathAPIPathsListData{
Items: []pathAPIPathsListItem{},
res.data = &apiPathsList{
Items: []*apiPath{},
}
for _, pa := range res.paths {
pa.apiPathsList(pathAPIPathsListSubReq{data: res.data})
item, err := pa.apiPathsGet(pathAPIPathsGetReq{})
if err != nil {
return nil, err
}
res.data.Items = append(res.data.Items, item)
}
return res.data, nil
case <-pm.ctx.Done():
return nil, fmt.Errorf("terminated")
}
}
// apiPathsGet is called by api.
func (pm *pathManager) apiPathsGet(name string) (*apiPath, error) {
req := pathAPIPathsGetReq{
name: name,
res: make(chan pathAPIPathsGetRes),
}
select {
case pm.chAPIPathsGet <- req:
res := <-req.res
if res.err != nil {
return nil, res.err
}
return res
data, err := res.path.apiPathsGet(req)
return data, err
case <-pm.ctx.Done():
return pathAPIPathsListRes{err: fmt.Errorf("terminated")}
return nil, fmt.Errorf("terminated")
}
}

20
internal/core/rtmp_conn.go

@ -827,3 +827,23 @@ func (c *rtmpConn) apiReaderDescribe() pathAPISourceOrReader { @@ -827,3 +827,23 @@ func (c *rtmpConn) apiReaderDescribe() pathAPISourceOrReader {
func (c *rtmpConn) apiSourceDescribe() pathAPISourceOrReader {
return c.apiReaderDescribe()
}
func (c *rtmpConn) apiItem() *apiRTMPConn {
return &apiRTMPConn{
ID: c.uuid,
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
State: func() string {
switch c.safeState() {
case rtmpConnStateRead:
return "read"
case rtmpConnStatePublish:
return "publish"
}
return "idle"
}(),
BytesReceived: c.conn.BytesReceived(),
BytesSent: c.conn.BytesSent(),
}
}

101
internal/core/rtmp_server.go

@ -6,7 +6,6 @@ import ( @@ -6,7 +6,6 @@ import (
"fmt"
"net"
"sync"
"time"
"github.com/google/uuid"
@ -15,27 +14,23 @@ import ( @@ -15,27 +14,23 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
)
type rtmpServerAPIConnsListItem struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
type rtmpServerAPIConnsListRes struct {
data *apiRTMPConnsList
err error
}
type rtmpServerAPIConnsListData struct {
PageCount int `json:"pageCount"`
Items []rtmpServerAPIConnsListItem `json:"items"`
type rtmpServerAPIConnsListReq struct {
res chan rtmpServerAPIConnsListRes
}
type rtmpServerAPIConnsListRes struct {
data *rtmpServerAPIConnsListData
type rtmpServerAPIConnsGetRes struct {
data *apiRTMPConn
err error
}
type rtmpServerAPIConnsListReq struct {
res chan rtmpServerAPIConnsListRes
type rtmpServerAPIConnsGetReq struct {
uuid uuid.UUID
res chan rtmpServerAPIConnsGetRes
}
type rtmpServerAPIConnsKickRes struct {
@ -71,9 +66,10 @@ type rtmpServer struct { @@ -71,9 +66,10 @@ type rtmpServer struct {
conns map[*rtmpConn]struct{}
// in
chConnClose chan *rtmpConn
chAPISessionsList chan rtmpServerAPIConnsListReq
chAPIConnsKick chan rtmpServerAPIConnsKickReq
chConnClose chan *rtmpConn
chAPIConnsList chan rtmpServerAPIConnsListReq
chAPIConnsGet chan rtmpServerAPIConnsGetReq
chAPIConnsKick chan rtmpServerAPIConnsKickReq
}
func newRTMPServer(
@ -129,7 +125,8 @@ func newRTMPServer( @@ -129,7 +125,8 @@ func newRTMPServer(
ln: ln,
conns: make(map[*rtmpConn]struct{}),
chConnClose: make(chan *rtmpConn),
chAPISessionsList: make(chan rtmpServerAPIConnsListReq),
chAPIConnsList: make(chan rtmpServerAPIConnsListReq),
chAPIConnsGet: make(chan rtmpServerAPIConnsGetReq),
chAPIConnsKick: make(chan rtmpServerAPIConnsKickReq),
}
@ -217,33 +214,26 @@ outer: @@ -217,33 +214,26 @@ outer:
case c := <-s.chConnClose:
delete(s.conns, c)
case req := <-s.chAPISessionsList:
data := &rtmpServerAPIConnsListData{
Items: []rtmpServerAPIConnsListItem{},
case req := <-s.chAPIConnsList:
data := &apiRTMPConnsList{
Items: []*apiRTMPConn{},
}
for c := range s.conns {
data.Items = append(data.Items, rtmpServerAPIConnsListItem{
ID: c.uuid,
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
State: func() string {
switch c.safeState() {
case rtmpConnStateRead:
return "read"
case rtmpConnStatePublish:
return "publish"
}
return "idle"
}(),
BytesReceived: c.conn.BytesReceived(),
BytesSent: c.conn.BytesSent(),
})
data.Items = append(data.Items, c.apiItem())
}
req.res <- rtmpServerAPIConnsListRes{data: data}
case req := <-s.chAPIConnsGet:
c := s.findConnByUUID(req.uuid)
if c == nil {
req.res <- rtmpServerAPIConnsGetRes{err: fmt.Errorf("not found")}
continue
}
req.res <- rtmpServerAPIConnsGetRes{data: c.apiItem()}
case req := <-s.chAPIConnsKick:
c := s.findConnByUUID(req.uuid)
if c == nil {
@ -287,22 +277,40 @@ func (s *rtmpServer) connClose(c *rtmpConn) { @@ -287,22 +277,40 @@ func (s *rtmpServer) connClose(c *rtmpConn) {
}
// apiConnsList is called by api.
func (s *rtmpServer) apiConnsList() rtmpServerAPIConnsListRes {
func (s *rtmpServer) apiConnsList() (*apiRTMPConnsList, error) {
req := rtmpServerAPIConnsListReq{
res: make(chan rtmpServerAPIConnsListRes),
}
select {
case s.chAPISessionsList <- req:
return <-req.res
case s.chAPIConnsList <- req:
res := <-req.res
return res.data, res.err
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
}
}
// apiConnsGet is called by api.
func (s *rtmpServer) apiConnsGet(uuid uuid.UUID) (*apiRTMPConn, error) {
req := rtmpServerAPIConnsGetReq{
uuid: uuid,
res: make(chan rtmpServerAPIConnsGetRes),
}
select {
case s.chAPIConnsGet <- req:
res := <-req.res
return res.data, res.err
case <-s.ctx.Done():
return rtmpServerAPIConnsListRes{err: fmt.Errorf("terminated")}
return nil, fmt.Errorf("terminated")
}
}
// apiConnsKick is called by api.
func (s *rtmpServer) apiConnsKick(uuid uuid.UUID) rtmpServerAPIConnsKickRes {
func (s *rtmpServer) apiConnsKick(uuid uuid.UUID) error {
req := rtmpServerAPIConnsKickReq{
uuid: uuid,
res: make(chan rtmpServerAPIConnsKickRes),
@ -310,9 +318,10 @@ func (s *rtmpServer) apiConnsKick(uuid uuid.UUID) rtmpServerAPIConnsKickRes { @@ -310,9 +318,10 @@ func (s *rtmpServer) apiConnsKick(uuid uuid.UUID) rtmpServerAPIConnsKickRes {
select {
case s.chAPIConnsKick <- req:
return <-req.res
res := <-req.res
return res.err
case <-s.ctx.Done():
return rtmpServerAPIConnsKickRes{err: fmt.Errorf("terminated")}
return fmt.Errorf("terminated")
}
}

10
internal/core/rtsp_conn.go

@ -209,3 +209,13 @@ func (c *rtspConn) handleAuthError(authErr error) (*base.Response, error) { @@ -209,3 +209,13 @@ func (c *rtspConn) handleAuthError(authErr error) (*base.Response, error) {
StatusCode: base.StatusUnauthorized,
}, authErr
}
func (c *rtspConn) apiItem() *apiRTSPConn {
return &apiRTSPConn{
ID: c.uuid,
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
BytesReceived: c.conn.BytesReceived(),
BytesSent: c.conn.BytesSent(),
}
}

144
internal/core/rtsp_server.go

@ -19,47 +19,6 @@ import ( @@ -19,47 +19,6 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
)
type rtspServerAPIConnsListItem struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type rtspServerAPIConnsListData struct {
PageCount int `json:"pageCount"`
Items []rtspServerAPIConnsListItem `json:"items"`
}
type rtspServerAPIConnsListRes struct {
data *rtspServerAPIConnsListData
err error
}
type rtspServerAPISessionsListItem struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type rtspServerAPISessionsListData struct {
PageCount int `json:"pageCount"`
Items []rtspServerAPISessionsListItem `json:"items"`
}
type rtspServerAPISessionsListRes struct {
data *rtspServerAPISessionsListData
err error
}
type rtspServerAPISessionsKickRes struct {
err error
}
type rtspServerParent interface {
logger.Writer
}
@ -364,6 +323,15 @@ func (s *rtspServer) OnDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) @@ -364,6 +323,15 @@ func (s *rtspServer) OnDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx)
se.onDecodeError(ctx)
}
func (s *rtspServer) findConnByUUID(uuid uuid.UUID) *rtspConn {
for _, c := range s.conns {
if c.uuid == uuid {
return c
}
}
return nil
}
func (s *rtspServer) findSessionByUUID(uuid uuid.UUID) (*gortsplib.ServerSession, *rtspSession) {
for key, sx := range s.sessions {
if sx.uuid == uuid {
@ -374,78 +342,92 @@ func (s *rtspServer) findSessionByUUID(uuid uuid.UUID) (*gortsplib.ServerSession @@ -374,78 +342,92 @@ func (s *rtspServer) findSessionByUUID(uuid uuid.UUID) (*gortsplib.ServerSession
}
// apiConnsList is called by api and metrics.
func (s *rtspServer) apiConnsList() rtspServerAPIConnsListRes {
func (s *rtspServer) apiConnsList() (*apiRTSPConnsList, error) {
select {
case <-s.ctx.Done():
return rtspServerAPIConnsListRes{err: fmt.Errorf("terminated")}
return nil, fmt.Errorf("terminated")
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
data := &rtspServerAPIConnsListData{
Items: []rtspServerAPIConnsListItem{},
data := &apiRTSPConnsList{
Items: []*apiRTSPConn{},
}
for _, c := range s.conns {
data.Items = append(data.Items, rtspServerAPIConnsListItem{
ID: c.uuid,
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
BytesReceived: c.conn.BytesReceived(),
BytesSent: c.conn.BytesSent(),
})
data.Items = append(data.Items, c.apiItem())
}
return data, nil
}
// apiConnsGet is called by api.
func (s *rtspServer) apiConnsGet(uuid uuid.UUID) (*apiRTSPConn, error) {
select {
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
default:
}
return rtspServerAPIConnsListRes{data: data}
s.mutex.RLock()
defer s.mutex.RUnlock()
conn := s.findConnByUUID(uuid)
if conn == nil {
return nil, fmt.Errorf("not found")
}
return conn.apiItem(), nil
}
// apiSessionsList is called by api and metrics.
func (s *rtspServer) apiSessionsList() rtspServerAPISessionsListRes {
func (s *rtspServer) apiSessionsList() (*apiRTSPSessionsList, error) {
select {
case <-s.ctx.Done():
return rtspServerAPISessionsListRes{err: fmt.Errorf("terminated")}
return nil, fmt.Errorf("terminated")
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
data := &rtspServerAPISessionsListData{
Items: []rtspServerAPISessionsListItem{},
data := &apiRTSPSessionsList{
Items: []*apiRTSPSession{},
}
for _, s := range s.sessions {
data.Items = append(data.Items, rtspServerAPISessionsListItem{
ID: s.uuid,
Created: s.created,
RemoteAddr: s.remoteAddr().String(),
State: func() string {
switch s.safeState() {
case gortsplib.ServerSessionStatePrePlay,
gortsplib.ServerSessionStatePlay:
return "read"
case gortsplib.ServerSessionStatePreRecord,
gortsplib.ServerSessionStateRecord:
return "publish"
}
return "idle"
}(),
BytesReceived: s.session.BytesReceived(),
BytesSent: s.session.BytesSent(),
})
data.Items = append(data.Items, s.apiItem())
}
return data, nil
}
// apiSessionsGet is called by api.
func (s *rtspServer) apiSessionsGet(uuid uuid.UUID) (*apiRTSPSession, error) {
select {
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
default:
}
s.mutex.RLock()
defer s.mutex.RUnlock()
_, sx := s.findSessionByUUID(uuid)
if sx == nil {
return nil, fmt.Errorf("not found")
}
return rtspServerAPISessionsListRes{data: data}
return sx.apiItem(), nil
}
// apiSessionsKick is called by api.
func (s *rtspServer) apiSessionsKick(uuid uuid.UUID) rtspServerAPISessionsKickRes {
func (s *rtspServer) apiSessionsKick(uuid uuid.UUID) error {
select {
case <-s.ctx.Done():
return rtspServerAPISessionsKickRes{err: fmt.Errorf("terminated")}
return fmt.Errorf("terminated")
default:
}
@ -454,11 +436,11 @@ func (s *rtspServer) apiSessionsKick(uuid uuid.UUID) rtspServerAPISessionsKickRe @@ -454,11 +436,11 @@ func (s *rtspServer) apiSessionsKick(uuid uuid.UUID) rtspServerAPISessionsKickRe
key, sx := s.findSessionByUUID(uuid)
if sx == nil {
return rtspServerAPISessionsKickRes{err: fmt.Errorf("not found")}
return fmt.Errorf("not found")
}
sx.close()
delete(s.sessions, key)
sx.onClose(liberrors.ErrServerTerminated{})
return rtspServerAPISessionsKickRes{}
return nil
}

22
internal/core/rtsp_session.go

@ -385,3 +385,25 @@ func (s *rtspSession) onPacketLost(ctx *gortsplib.ServerHandlerOnPacketLostCtx) @@ -385,3 +385,25 @@ func (s *rtspSession) onPacketLost(ctx *gortsplib.ServerHandlerOnPacketLostCtx)
func (s *rtspSession) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) {
s.Log(logger.Warn, ctx.Error.Error())
}
func (s *rtspSession) apiItem() *apiRTSPSession {
return &apiRTSPSession{
ID: s.uuid,
Created: s.created,
RemoteAddr: s.remoteAddr().String(),
State: func() string {
switch s.safeState() {
case gortsplib.ServerSessionStatePrePlay,
gortsplib.ServerSessionStatePlay:
return "read"
case gortsplib.ServerSessionStatePreRecord,
gortsplib.ServerSessionStateRecord:
return "publish"
}
return "idle"
}(),
BytesReceived: s.session.BytesReceived(),
BytesSent: s.session.BytesSent(),
}
}

107
internal/core/webrtc_manager.go

@ -64,25 +64,8 @@ func linkHeaderToIceServers(link []string) []webrtc.ICEServer { @@ -64,25 +64,8 @@ func linkHeaderToIceServers(link []string) []webrtc.ICEServer {
return ret
}
type webRTCManagerAPISessionsListItem struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
State string `json:"state"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type webRTCManagerAPISessionsListData struct {
PageCount int `json:"pageCount"`
Items []webRTCManagerAPISessionsListItem `json:"items"`
}
type webRTCManagerAPISessionsListRes struct {
data *webRTCManagerAPISessionsListData
data *apiWebRTCSessionsList
err error
}
@ -99,6 +82,16 @@ type webRTCManagerAPISessionsKickReq struct { @@ -99,6 +82,16 @@ type webRTCManagerAPISessionsKickReq struct {
res chan webRTCManagerAPISessionsKickRes
}
type webRTCManagerAPISessionsGetRes struct {
data *apiWebRTCSession
err error
}
type webRTCManagerAPISessionsGetReq struct {
uuid uuid.UUID
res chan webRTCManagerAPISessionsGetRes
}
type webRTCSessionNewRes struct {
sx *webRTCSession
answer []byte
@ -157,6 +150,7 @@ type webRTCManager struct { @@ -157,6 +150,7 @@ type webRTCManager struct {
chSessionClose chan *webRTCSession
chSessionAddCandidates chan webRTCSessionAddCandidatesReq
chAPISessionsList chan webRTCManagerAPISessionsListReq
chAPIConnsGet chan webRTCManagerAPISessionsGetReq
chAPIConnsKick chan webRTCManagerAPISessionsKickReq
// out
@ -200,6 +194,7 @@ func newWebRTCManager( @@ -200,6 +194,7 @@ func newWebRTCManager(
chSessionClose: make(chan *webRTCSession),
chSessionAddCandidates: make(chan webRTCSessionAddCandidatesReq),
chAPISessionsList: make(chan webRTCManagerAPISessionsListReq),
chAPIConnsGet: make(chan webRTCManagerAPISessionsGetReq),
chAPIConnsKick: make(chan webRTCManagerAPISessionsKickReq),
done: make(chan struct{}),
}
@ -309,46 +304,25 @@ outer: @@ -309,46 +304,25 @@ outer:
req.res <- webRTCSessionAddCandidatesRes{sx: sx}
case req := <-m.chAPISessionsList:
data := &webRTCManagerAPISessionsListData{
Items: []webRTCManagerAPISessionsListItem{},
data := &apiWebRTCSessionsList{
Items: []*apiWebRTCSession{},
}
for sx := range m.sessions {
peerConnectionEstablished := false
localCandidate := ""
remoteCandidate := ""
bytesReceived := uint64(0)
bytesSent := uint64(0)
pc := sx.safePC()
if pc != nil {
peerConnectionEstablished = true
localCandidate = pc.localCandidate()
remoteCandidate = pc.remoteCandidate()
bytesReceived = pc.bytesReceived()
bytesSent = pc.bytesSent()
}
data.Items = append(data.Items, webRTCManagerAPISessionsListItem{
ID: sx.uuid,
Created: sx.created,
RemoteAddr: sx.req.remoteAddr,
PeerConnectionEstablished: peerConnectionEstablished,
LocalCandidate: localCandidate,
RemoteCandidate: remoteCandidate,
State: func() string {
if sx.req.publish {
return "publish"
}
return "read"
}(),
BytesReceived: bytesReceived,
BytesSent: bytesSent,
})
data.Items = append(data.Items, sx.apiItem())
}
req.res <- webRTCManagerAPISessionsListRes{data: data}
case req := <-m.chAPIConnsGet:
sx := m.findSessionByUUID(req.uuid)
if sx == nil {
req.res <- webRTCManagerAPISessionsGetRes{err: fmt.Errorf("not found")}
continue
}
req.res <- webRTCManagerAPISessionsGetRes{data: sx.apiItem()}
case req := <-m.chAPIConnsKick:
sx := m.findSessionByUUID(req.uuid)
if sx == nil {
@ -482,22 +456,40 @@ func (m *webRTCManager) sessionAddCandidates( @@ -482,22 +456,40 @@ func (m *webRTCManager) sessionAddCandidates(
}
// apiSessionsList is called by api.
func (m *webRTCManager) apiSessionsList() webRTCManagerAPISessionsListRes {
func (m *webRTCManager) apiSessionsList() (*apiWebRTCSessionsList, error) {
req := webRTCManagerAPISessionsListReq{
res: make(chan webRTCManagerAPISessionsListRes),
}
select {
case m.chAPISessionsList <- req:
return <-req.res
res := <-req.res
return res.data, res.err
case <-m.ctx.Done():
return nil, fmt.Errorf("terminated")
}
}
// apiSessionsGet is called by api.
func (m *webRTCManager) apiSessionsGet(uuid uuid.UUID) (*apiWebRTCSession, error) {
req := webRTCManagerAPISessionsGetReq{
uuid: uuid,
res: make(chan webRTCManagerAPISessionsGetRes),
}
select {
case m.chAPIConnsGet <- req:
res := <-req.res
return res.data, res.err
case <-m.ctx.Done():
return webRTCManagerAPISessionsListRes{err: fmt.Errorf("terminated")}
return nil, fmt.Errorf("terminated")
}
}
// apiSessionsKick is called by api.
func (m *webRTCManager) apiSessionsKick(uuid uuid.UUID) webRTCManagerAPISessionsKickRes {
func (m *webRTCManager) apiSessionsKick(uuid uuid.UUID) error {
req := webRTCManagerAPISessionsKickReq{
uuid: uuid,
res: make(chan webRTCManagerAPISessionsKickRes),
@ -505,9 +497,10 @@ func (m *webRTCManager) apiSessionsKick(uuid uuid.UUID) webRTCManagerAPISessions @@ -505,9 +497,10 @@ func (m *webRTCManager) apiSessionsKick(uuid uuid.UUID) webRTCManagerAPISessions
select {
case m.chAPIConnsKick <- req:
return <-req.res
res := <-req.res
return res.err
case <-m.ctx.Done():
return webRTCManagerAPISessionsKickRes{err: fmt.Errorf("terminated")}
return fmt.Errorf("terminated")
}
}

34
internal/core/webrtc_session.go

@ -596,3 +596,37 @@ func (s *webRTCSession) apiSourceDescribe() pathAPISourceOrReader { @@ -596,3 +596,37 @@ func (s *webRTCSession) apiSourceDescribe() pathAPISourceOrReader {
func (s *webRTCSession) apiReaderDescribe() pathAPISourceOrReader {
return s.apiSourceDescribe()
}
func (s *webRTCSession) apiItem() *apiWebRTCSession {
peerConnectionEstablished := false
localCandidate := ""
remoteCandidate := ""
bytesReceived := uint64(0)
bytesSent := uint64(0)
pc := s.safePC()
if pc != nil {
peerConnectionEstablished = true
localCandidate = pc.localCandidate()
remoteCandidate = pc.remoteCandidate()
bytesReceived = pc.bytesReceived()
bytesSent = pc.bytesSent()
}
return &apiWebRTCSession{
ID: s.uuid,
Created: s.created,
RemoteAddr: s.req.remoteAddr,
PeerConnectionEstablished: peerConnectionEstablished,
LocalCandidate: localCandidate,
RemoteCandidate: remoteCandidate,
State: func() string {
if s.req.publish {
return "publish"
}
return "read"
}(),
BytesReceived: bytesReceived,
BytesSent: bytesSent,
}
}

Loading…
Cancel
Save