From 4ac175d3cc35200c93d115404f91ece681e728b9 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Wed, 9 Nov 2022 18:31:31 +0100 Subject: [PATCH] api, metrics: add endpoints and metrics for RTSP connections (#1233) new API endpoints: * /v1/rtspconns/list * /v1/rtspsconns/list new metrics: * rtsp_conns * rtsps_conns --- README.md | 1 + apidocs/openapi.yaml | 61 ++++++++++++++--- internal/core/api.go | 55 +++++++++++----- internal/core/api_test.go | 28 +++++--- internal/core/core_test.go | 2 +- internal/core/hls_server.go | 7 +- internal/core/metrics.go | 121 ++++++++++++++++++++-------------- internal/core/metrics_test.go | 2 + internal/core/path_manager.go | 7 +- internal/core/rtmp_server.go | 15 +++-- internal/core/rtsp_conn.go | 9 +++ internal/core/rtsp_server.go | 84 ++++++++++++++++++++--- 12 files changed, 287 insertions(+), 105 deletions(-) diff --git a/README.md b/README.md index 46d0c37b..61b2e2f6 100644 --- a/README.md +++ b/README.md @@ -443,6 +443,7 @@ Obtaining: ``` paths{name="",state="ready"} 1 +rtsp_conns 1 rtsp_sessions{state="idle"} 0 rtsp_sessions{state="read"} 0 rtsp_sessions{state="publish"} 1 diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 0be089b2..a7bfbfc4 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -375,20 +375,19 @@ components: type: string enum: [hlsMuxer] - RTSPSession: + RTSPConn: type: object properties: created: type: string remoteAddr: type: string - state: - type: string - enum: [idle, read, publish] - RTSPSSession: + RTSPSession: type: object properties: + created: + type: string remoteAddr: type: string state: @@ -433,21 +432,29 @@ components: additionalProperties: $ref: '#/components/schemas/Path' - RTSPSessionsList: + RTSPConnsList: type: object properties: items: type: object additionalProperties: - $ref: '#/components/schemas/RTSPSession' + $ref: '#/components/schemas/RTSPConn' + + RTSPSConnsList: + type: object + properties: + items: + type: object + additionalProperties: + $ref: '#/components/schemas/RTSPConn' - RTSPSSessionsList: + RTSPSessionsList: type: object properties: items: type: object additionalProperties: - $ref: '#/components/schemas/RTSPSSession' + $ref: '#/components/schemas/RTSPSession' RTMPConnsList: type: object @@ -599,6 +606,23 @@ paths: '500': description: internal server error. + /v1/rtspconns/list: + get: + operationId: rtspConnsList + summary: returns all active RTSP connections. + description: '' + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTSPConnsList' + '400': + description: invalid request. + '500': + description: internal server error. + /v1/rtspsessions/list: get: operationId: rtspSessionsList @@ -616,6 +640,23 @@ paths: '500': description: internal server error. + /v1/rtspsconns/list: + get: + operationId: rtspsConnsList + summary: returns all active RTSPS connections. + description: '' + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTSPConnsList' + '400': + description: invalid request. + '500': + description: internal server error. + /v1/rtspsessions/kick/{id}: post: operationId: rtspSessionsKick @@ -647,7 +688,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/RTSPSSessionsList' + $ref: '#/components/schemas/RTSPSessionsList' '400': description: invalid request. '500': diff --git a/internal/core/api.go b/internal/core/api.go index 07281f46..02e5fa0f 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -83,21 +83,22 @@ func loadConfPathData(ctx *gin.Context) (interface{}, error) { } type apiPathManager interface { - apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes + apiPathsList() pathAPIPathsListRes } type apiRTSPServer interface { - apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes - apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes + apiConnsList() rtspServerAPIConnsListRes + apiSessionsList() rtspServerAPISessionsListRes + apiSessionsKick(string) rtspServerAPISessionsKickRes } type apiRTMPServer interface { - apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes - apiConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes + apiConnsList() rtmpServerAPIConnsListRes + apiConnsKick(id string) rtmpServerAPIConnsKickRes } type apiHLSServer interface { - apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes + apiHLSMuxersList() hlsServerAPIMuxersListRes } type apiParent interface { @@ -162,11 +163,13 @@ func newAPI( group.GET("/v1/paths/list", a.onPathsList) if !interfaceIsEmpty(a.rtspServer) { + group.GET("/v1/rtspconns/list", a.onRTSPConnsList) group.GET("/v1/rtspsessions/list", a.onRTSPSessionsList) group.POST("/v1/rtspsessions/kick/:id", a.onRTSPSessionsKick) } if !interfaceIsEmpty(a.rtspsServer) { + group.GET("/v1/rtspsconns/list", a.onRTSPSConnsList) group.GET("/v1/rtspssessions/list", a.onRTSPSSessionsList) group.POST("/v1/rtspssessions/kick/:id", a.onRTSPSSessionsKick) } @@ -382,7 +385,17 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { } func (a *api) onPathsList(ctx *gin.Context) { - res := a.pathManager.apiPathsList(pathAPIPathsListReq{}) + res := a.pathManager.apiPathsList() + if res.err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + + ctx.JSON(http.StatusOK, res.data) +} + +func (a *api) onRTSPConnsList(ctx *gin.Context) { + res := a.rtspServer.apiConnsList() if res.err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -392,7 +405,7 @@ func (a *api) onPathsList(ctx *gin.Context) { } func (a *api) onRTSPSessionsList(ctx *gin.Context) { - res := a.rtspServer.apiSessionsList(rtspServerAPISessionsListReq{}) + res := a.rtspServer.apiSessionsList() if res.err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -404,7 +417,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) { func (a *api) onRTSPSessionsKick(ctx *gin.Context) { id := ctx.Param("id") - res := a.rtspServer.apiSessionsKick(rtspServerAPISessionsKickReq{id: id}) + res := a.rtspServer.apiSessionsKick(id) if res.err != nil { ctx.AbortWithStatus(http.StatusNotFound) return @@ -413,8 +426,18 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } +func (a *api) onRTSPSConnsList(ctx *gin.Context) { + res := a.rtspsServer.apiConnsList() + if res.err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + + ctx.JSON(http.StatusOK, res.data) +} + func (a *api) onRTSPSSessionsList(ctx *gin.Context) { - res := a.rtspsServer.apiSessionsList(rtspServerAPISessionsListReq{}) + res := a.rtspsServer.apiSessionsList() if res.err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -426,7 +449,7 @@ func (a *api) onRTSPSSessionsList(ctx *gin.Context) { func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { id := ctx.Param("id") - res := a.rtspsServer.apiSessionsKick(rtspServerAPISessionsKickReq{id: id}) + res := a.rtspsServer.apiSessionsKick(id) if res.err != nil { ctx.AbortWithStatus(http.StatusNotFound) return @@ -436,7 +459,7 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { } func (a *api) onRTMPConnsList(ctx *gin.Context) { - res := a.rtmpServer.apiConnsList(rtmpServerAPIConnsListReq{}) + res := a.rtmpServer.apiConnsList() if res.err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -448,7 +471,7 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) { func (a *api) onRTMPConnsKick(ctx *gin.Context) { id := ctx.Param("id") - res := a.rtmpServer.apiConnsKick(rtmpServerAPIConnsKickReq{id: id}) + res := a.rtmpServer.apiConnsKick(id) if res.err != nil { ctx.AbortWithStatus(http.StatusNotFound) return @@ -458,7 +481,7 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { } func (a *api) onRTMPSConnsList(ctx *gin.Context) { - res := a.rtmpsServer.apiConnsList(rtmpServerAPIConnsListReq{}) + res := a.rtmpsServer.apiConnsList() if res.err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -470,7 +493,7 @@ func (a *api) onRTMPSConnsList(ctx *gin.Context) { func (a *api) onRTMPSConnsKick(ctx *gin.Context) { id := ctx.Param("id") - res := a.rtmpsServer.apiConnsKick(rtmpServerAPIConnsKickReq{id: id}) + res := a.rtmpsServer.apiConnsKick(id) if res.err != nil { ctx.AbortWithStatus(http.StatusNotFound) return @@ -480,7 +503,7 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) { } func (a *api) onHLSMuxersList(ctx *gin.Context) { - res := a.hlsServer.apiHLSMuxersList(hlsServerAPIMuxersListReq{}) + res := a.hlsServer.apiHLSMuxersList() if res.err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 54b84c76..8f03a15c 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -370,8 +370,10 @@ func TestAPIProtocolSpecificList(t *testing.T) { defer os.Remove(serverKeyFpath) for _, ca := range []string{ - "rtsp", - "rtsps", + "rtsp conns", + "rtsp sessions", + "rtsps conns", + "rtsps sessions", "rtmp", "rtmps", "hls", @@ -380,7 +382,7 @@ func TestAPIProtocolSpecificList(t *testing.T) { conf := "api: yes\n" switch ca { - case "rtsps": + case "rtsps conns", "rtsps sessions": conf += "protocols: [tcp]\n" + "encryption: strict\n" + "serverCert: " + serverCertFpath + "\n" + @@ -406,7 +408,7 @@ func TestAPIProtocolSpecificList(t *testing.T) { } switch ca { - case "rtsp": + case "rtsp conns", "rtsp sessions": source := gortsplib.Client{} err := source.StartPublishing("rtsp://localhost:8554/mypath", @@ -414,7 +416,7 @@ func TestAPIProtocolSpecificList(t *testing.T) { require.NoError(t, err) defer source.Close() - case "rtsps": + case "rtsps conns", "rtsps sessions": source := gortsplib.Client{ TLSConfig: &tls.Config{InsecureSkipVerify: true}, } @@ -478,13 +480,19 @@ func TestAPIProtocolSpecificList(t *testing.T) { } switch ca { - case "rtsp", "rtsps", "rtmp", "rtmps": + case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps": var pa string switch ca { - case "rtsp": + case "rtsp conns": + pa = "rtspconns" + + case "rtsp sessions": pa = "rtspsessions" - case "rtsps": + case "rtsps conns": + pa = "rtspsconns" + + case "rtsps sessions": pa = "rtspssessions" case "rtmp": @@ -507,7 +515,9 @@ func TestAPIProtocolSpecificList(t *testing.T) { firstID = k } - require.Equal(t, "publish", out.Items[firstID].State) + if ca != "rtsp conns" && ca != "rtsps conns" { + require.Equal(t, "publish", out.Items[firstID].State) + } case "hls": var out struct { diff --git a/internal/core/core_test.go b/internal/core/core_test.go index a954fd7b..6e488feb 100644 --- a/internal/core/core_test.go +++ b/internal/core/core_test.go @@ -163,7 +163,7 @@ func TestCorePathAutoDeletion(t *testing.T) { } }() - res := p.pathManager.apiPathsList(pathAPIPathsListReq{}) + res := p.pathManager.apiPathsList() require.NoError(t, res.err) require.Equal(t, 0, len(res.data.Items)) diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index e717ac1f..88965f3e 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -392,8 +392,11 @@ func (s *hlsServer) pathSourceNotReady(pa *path) { } // apiHLSMuxersList is called by api. -func (s *hlsServer) apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes { - req.res = make(chan hlsServerAPIMuxersListRes) +func (s *hlsServer) apiHLSMuxersList() hlsServerAPIMuxersListRes { + req := hlsServerAPIMuxersListReq{ + res: make(chan hlsServerAPIMuxersListRes), + } + select { case s.chAPIMuxerList <- req: res := <-req.res diff --git a/internal/core/metrics.go b/internal/core/metrics.go index fc1a771b..a7bda44e 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -18,19 +18,20 @@ func metric(key string, value int64) string { } type metricsPathManager interface { - apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes + apiPathsList() pathAPIPathsListRes } type metricsRTSPServer interface { - apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes + apiConnsList() rtspServerAPIConnsListRes + apiSessionsList() rtspServerAPISessionsListRes } type metricsRTMPServer interface { - apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes + apiConnsList() rtmpServerAPIConnsListRes } type metricsHLSServer interface { - apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes + apiHLSMuxersList() hlsServerAPIMuxersListRes } type metricsParent interface { @@ -90,7 +91,7 @@ func (m *metrics) log(level logger.Level, format string, args ...interface{}) { func (m *metrics) onMetrics(ctx *gin.Context) { out := "" - res := m.pathManager.apiPathsList(pathAPIPathsListReq{}) + res := m.pathManager.apiPathsList() if res.err == nil { for name, p := range res.data.Items { if p.SourceReady { @@ -102,61 +103,79 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.rtspServer) { - res := m.rtspServer.apiSessionsList(rtspServerAPISessionsListReq{}) - if res.err == nil { - idleCount := int64(0) - readCount := int64(0) - publishCount := int64(0) - - for _, i := range res.data.Items { - switch i.State { - case "idle": - idleCount++ - case "read": - readCount++ - case "publish": - publishCount++ - } + func() { + res := m.rtspServer.apiConnsList() + if res.err == nil { + out += metric("rtsp_conns", int64(len(res.data.Items))) } + }() + + func() { + res := m.rtspServer.apiSessionsList() + if res.err == nil { + idleCount := int64(0) + readCount := int64(0) + publishCount := int64(0) + + for _, i := range res.data.Items { + switch i.State { + case "idle": + idleCount++ + case "read": + readCount++ + case "publish": + publishCount++ + } + } - out += metric("rtsp_sessions{state=\"idle\"}", - idleCount) - out += metric("rtsp_sessions{state=\"read\"}", - readCount) - out += metric("rtsp_sessions{state=\"publish\"}", - publishCount) - } + out += metric("rtsp_sessions{state=\"idle\"}", + idleCount) + out += metric("rtsp_sessions{state=\"read\"}", + readCount) + out += metric("rtsp_sessions{state=\"publish\"}", + publishCount) + } + }() } if !interfaceIsEmpty(m.rtspsServer) { - res := m.rtspsServer.apiSessionsList(rtspServerAPISessionsListReq{}) - if res.err == nil { - idleCount := int64(0) - readCount := int64(0) - publishCount := int64(0) - - for _, i := range res.data.Items { - switch i.State { - case "idle": - idleCount++ - case "read": - readCount++ - case "publish": - publishCount++ - } + func() { + res := m.rtspsServer.apiConnsList() + if res.err == nil { + out += metric("rtsps_conns", int64(len(res.data.Items))) } + }() + + func() { + res := m.rtspsServer.apiSessionsList() + if res.err == nil { + idleCount := int64(0) + readCount := int64(0) + publishCount := int64(0) + + for _, i := range res.data.Items { + switch i.State { + case "idle": + idleCount++ + case "read": + readCount++ + case "publish": + publishCount++ + } + } - out += metric("rtsps_sessions{state=\"idle\"}", - idleCount) - out += metric("rtsps_sessions{state=\"read\"}", - readCount) - out += metric("rtsps_sessions{state=\"publish\"}", - publishCount) - } + out += metric("rtsps_sessions{state=\"idle\"}", + idleCount) + out += metric("rtsps_sessions{state=\"read\"}", + readCount) + out += metric("rtsps_sessions{state=\"publish\"}", + publishCount) + } + }() } if !interfaceIsEmpty(m.rtmpServer) { - res := m.rtmpServer.apiConnsList(rtmpServerAPIConnsListReq{}) + res := m.rtmpServer.apiConnsList() if res.err == nil { idleCount := int64(0) readCount := int64(0) @@ -183,7 +202,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.hlsServer) { - res := m.hlsServer.apiHLSMuxersList(hlsServerAPIMuxersListReq{}) + res := m.hlsServer.apiHLSMuxersList() if res.err == nil { for name := range res.data.Items { out += metric("hls_muxers{name=\""+name+"\"}", 1) diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go index ad831859..d4a789f8 100644 --- a/internal/core/metrics_test.go +++ b/internal/core/metrics_test.go @@ -102,9 +102,11 @@ func TestMetrics(t *testing.T) { "rtmp_conns{state=\"idle\"}": "0", "rtmp_conns{state=\"publish\"}": "1", "rtmp_conns{state=\"read\"}": "0", + "rtsp_conns": "1", "rtsp_sessions{state=\"idle\"}": "0", "rtsp_sessions{state=\"publish\"}": "1", "rtsp_sessions{state=\"read\"}": "0", + "rtsps_conns": "0", "rtsps_sessions{state=\"idle\"}": "0", "rtsps_sessions{state=\"publish\"}": "0", "rtsps_sessions{state=\"read\"}": "0", diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index be09c737..7cff9b2f 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -407,8 +407,11 @@ func (pm *pathManager) hlsServerSet(s pathManagerHLSServer) { } // apiPathsList is called by api. -func (pm *pathManager) apiPathsList(req pathAPIPathsListReq) pathAPIPathsListRes { - req.res = make(chan pathAPIPathsListRes) +func (pm *pathManager) apiPathsList() pathAPIPathsListRes { + req := pathAPIPathsListReq{ + res: make(chan pathAPIPathsListRes), + } + select { case pm.chAPIPathsList <- req: res := <-req.res diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index c6e1ff85..9d518c0d 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -314,8 +314,11 @@ func (s *rtmpServer) connClose(c *rtmpConn) { } // apiConnsList is called by api. -func (s *rtmpServer) apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes { - req.res = make(chan rtmpServerAPIConnsListRes) +func (s *rtmpServer) apiConnsList() rtmpServerAPIConnsListRes { + req := rtmpServerAPIConnsListReq{ + res: make(chan rtmpServerAPIConnsListRes), + } + select { case s.chAPIConnsList <- req: return <-req.res @@ -326,8 +329,12 @@ func (s *rtmpServer) apiConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPICo } // apiConnsKick is called by api. -func (s *rtmpServer) apiConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes { - req.res = make(chan rtmpServerAPIConnsKickRes) +func (s *rtmpServer) apiConnsKick(id string) rtmpServerAPIConnsKickRes { + req := rtmpServerAPIConnsKickReq{ + id: id, + res: make(chan rtmpServerAPIConnsKickRes), + } + select { case s.chAPIConnsKick <- req: return <-req.res diff --git a/internal/core/rtsp_conn.go b/internal/core/rtsp_conn.go index af16b45a..91569079 100644 --- a/internal/core/rtsp_conn.go +++ b/internal/core/rtsp_conn.go @@ -25,6 +25,7 @@ type rtspConnParent interface { } type rtspConn struct { + id string externalAuthenticationURL string rtspAddress string authMethods []headers.AuthMethod @@ -36,6 +37,7 @@ type rtspConn struct { conn *gortsplib.ServerConn parent rtspConnParent + created time.Time onConnectCmd *externalcmd.Cmd authUser string authPass string @@ -44,6 +46,7 @@ type rtspConn struct { } func newRTSPConn( + id string, externalAuthenticationURL string, rtspAddress string, authMethods []headers.AuthMethod, @@ -56,6 +59,7 @@ func newRTSPConn( parent rtspConnParent, ) *rtspConn { c := &rtspConn{ + id: id, externalAuthenticationURL: externalAuthenticationURL, rtspAddress: rtspAddress, authMethods: authMethods, @@ -66,6 +70,7 @@ func newRTSPConn( pathManager: pathManager, conn: conn, parent: parent, + created: time.Now(), } c.log(logger.Info, "opened") @@ -98,6 +103,10 @@ func (c *rtspConn) Conn() *gortsplib.ServerConn { return c.conn } +func (c *rtspConn) remoteAddr() net.Addr { + return c.conn.NetConn().RemoteAddr() +} + func (c *rtspConn) ip() net.IP { return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP } diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index ba197412..c3ea69ff 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -20,6 +20,20 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" ) +type rtspServerAPIConnsListItem struct { + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` +} + +type rtspServerAPIConnsListData struct { + Items map[string]rtspServerAPIConnsListItem `json:"items"` +} + +type rtspServerAPIConnsListRes struct { + data *rtspServerAPIConnsListData + err error +} + type rtspServerAPISessionsListItem struct { Created time.Time `json:"created"` RemoteAddr string `json:"remoteAddr"` @@ -35,16 +49,10 @@ type rtspServerAPISessionsListRes struct { err error } -type rtspServerAPISessionsListReq struct{} - type rtspServerAPISessionsKickRes struct { err error } -type rtspServerAPISessionsKickReq struct { - id string -} - type rtspServerParent interface { Log(logger.Level, string, ...interface{}) } @@ -259,9 +267,40 @@ func (s *rtspServer) newSessionID() (string, error) { } } +func (s *rtspServer) newConnID() (string, error) { + for { + b := make([]byte, 4) + _, err := rand.Read(b) + if err != nil { + return "", err + } + + u := uint32(b[3])<<24 | uint32(b[2])<<16 | uint32(b[1])<<8 | uint32(b[0]) + u %= 899999999 + u += 100000000 + + id := strconv.FormatUint(uint64(u), 10) + + alreadyPresent := func() bool { + for _, c := range s.conns { + if c.id == id { + return true + } + } + return false + }() + if !alreadyPresent { + return id, nil + } + } +} + // OnConnOpen implements gortsplib.ServerHandlerOnConnOpen. func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { + s.mutex.Lock() + id, _ := s.newConnID() c := newRTSPConn( + id, s.externalAuthenticationURL, s.rtspAddress, s.authMethods, @@ -272,9 +311,9 @@ func (s *rtspServer) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { s.pathManager, ctx.Conn, s) - s.mutex.Lock() s.conns[ctx.Conn] = c s.mutex.Unlock() + ctx.Conn.SetUserData(c) } @@ -380,8 +419,33 @@ func (s *rtspServer) OnDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) se.onDecodeError(ctx) } +// apiConnsList is called by api and metrics. +func (s *rtspServer) apiConnsList() rtspServerAPIConnsListRes { + select { + case <-s.ctx.Done(): + return rtspServerAPIConnsListRes{err: fmt.Errorf("terminated")} + default: + } + + s.mutex.RLock() + defer s.mutex.RUnlock() + + data := &rtspServerAPIConnsListData{ + Items: make(map[string]rtspServerAPIConnsListItem), + } + + for _, c := range s.conns { + data.Items[c.id] = rtspServerAPIConnsListItem{ + Created: c.created, + RemoteAddr: c.remoteAddr().String(), + } + } + + return rtspServerAPIConnsListRes{data: data} +} + // apiSessionsList is called by api and metrics. -func (s *rtspServer) apiSessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes { +func (s *rtspServer) apiSessionsList() rtspServerAPISessionsListRes { select { case <-s.ctx.Done(): return rtspServerAPISessionsListRes{err: fmt.Errorf("terminated")} @@ -418,7 +482,7 @@ func (s *rtspServer) apiSessionsList(req rtspServerAPISessionsListReq) rtspServe } // apiSessionsKick is called by api. -func (s *rtspServer) apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes { +func (s *rtspServer) apiSessionsKick(id string) rtspServerAPISessionsKickRes { select { case <-s.ctx.Done(): return rtspServerAPISessionsKickRes{err: fmt.Errorf("terminated")} @@ -429,7 +493,7 @@ func (s *rtspServer) apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServe defer s.mutex.RUnlock() for key, se := range s.sessions { - if se.id == req.id { + if se.id == id { se.close() delete(s.sessions, key) se.onClose(liberrors.ErrServerTerminated{})