diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index c0d752c9..eedea4cc 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -182,6 +182,7 @@ components: source: oneOf: - $ref: '#/components/schemas/PathSourceRTSPSession' + - $ref: '#/components/schemas/PathSourceRTSPSSession' - $ref: '#/components/schemas/PathSourceRTMPConn' sourceReady: type: boolean @@ -190,6 +191,7 @@ components: items: oneOf: - $ref: '#/components/schemas/PathReaderRTSPSession' + - $ref: '#/components/schemas/PathReaderRTSPSSession' - $ref: '#/components/schemas/PathReaderRTMPConn' - $ref: '#/components/schemas/PathReaderHLSMuxer' @@ -202,6 +204,15 @@ components: id: type: string + PathSourceRTSPSSession: + type: object + properties: + type: + type: string + enum: [rtspssession] + id: + type: string + PathSourceRTMPConn: type: object properties: @@ -220,6 +231,15 @@ components: id: type: string + PathReaderRTSPSSession: + type: object + properties: + type: + type: string + enum: [rtspssession] + id: + type: string + PathReaderRTMPConn: type: object properties: @@ -245,6 +265,15 @@ components: type: string enum: [idle, read, publish] + RTSPSSession: + type: object + properties: + remoteAddr: + type: string + state: + type: string + enum: [idle, read, publish] + RTMPConn: type: object properties: @@ -421,6 +450,46 @@ paths: '500': description: internal server error. + /v1/rtspssessions/list: + get: + operationId: rtspsSessionsList + summary: returns all active RTSPS sessions. + description: '' + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + items: + type: object + additionalProperties: + $ref: '#/components/schemas/RTSPSSession' + '400': + description: invalid request. + '500': + description: internal server error. + + /v1/rtspssessions/kick/{id}: + post: + operationId: rtspsSessionsKick + summary: kicks out a RTSPS session from the server. + description: '' + parameters: + - name: id + in: path + required: true + description: the ID of the session. + schema: + type: string + responses: + '200': + description: the request was successful. + '400': + description: invalid request. + '500': + description: internal server error. + /v1/rtmpconns/list: get: operationId: rtmpConnsList diff --git a/internal/core/api.go b/internal/core/api.go index 8e7ab344..27be605d 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -283,6 +283,8 @@ func newAPI( group.GET("/v1/paths/list", a.onPathsList) group.GET("/v1/rtspsessions/list", a.onRTSPSessionsList) group.POST("/v1/rtspsessions/kick/:id", a.onRTSPSessionsKick) + group.GET("/v1/rtspssessions/list", a.onRTSPSSessionsList) + group.POST("/v1/rtspssessions/kick/:id", a.onRTSPSSessionsKick) group.GET("/v1/rtmpconns/list", a.onRTMPConnsList) group.POST("/v1/rtmpconns/kick/:id", a.onRTMPConnsKick) @@ -511,7 +513,7 @@ func (a *api) onPathsList(ctx *gin.Context) { } func (a *api) onRTSPSessionsList(ctx *gin.Context) { - if a.rtspServer == nil && a.rtspsServer == nil { + if reflect.ValueOf(a.rtspServer).IsNil() { ctx.AbortWithStatus(http.StatusNotFound) return } @@ -520,54 +522,70 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) { Items: make(map[string]apiRTSPSessionsListItem), } - if !reflect.ValueOf(a.rtspServer).IsNil() { - res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data}) - if res.Err != nil { - ctx.AbortWithStatus(http.StatusInternalServerError) - return - } - } - - if !reflect.ValueOf(a.rtspsServer).IsNil() { - res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data}) - if res.Err != nil { - ctx.AbortWithStatus(http.StatusInternalServerError) - return - } + res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return } ctx.JSON(http.StatusOK, data) } func (a *api) onRTSPSessionsKick(ctx *gin.Context) { - if a.rtspServer == nil && a.rtspsServer == nil { + if reflect.ValueOf(a.rtspServer).IsNil() { ctx.AbortWithStatus(http.StatusNotFound) return } id := ctx.Param("id") - if a.rtspServer != nil { - res := a.rtspServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) - if res.Err == nil { - ctx.Status(http.StatusOK) - return - } + res := a.rtspServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusNotFound) + return } - if a.rtspsServer != nil { - res := a.rtspsServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) - if res.Err != nil { - ctx.Status(http.StatusOK) - return - } + ctx.Status(http.StatusOK) +} + +func (a *api) onRTSPSSessionsList(ctx *gin.Context) { + if reflect.ValueOf(a.rtspsServer).IsNil() { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + data := apiRTSPSessionsListData{ + Items: make(map[string]apiRTSPSessionsListItem), } - ctx.AbortWithStatus(http.StatusNotFound) + res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { + if reflect.ValueOf(a.rtspsServer).IsNil() { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + id := ctx.Param("id") + + res := a.rtspsServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) + if res.Err != nil { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + + ctx.Status(http.StatusOK) } func (a *api) onRTMPConnsList(ctx *gin.Context) { - if a.rtmpServer == nil { + if reflect.ValueOf(a.rtmpServer).IsNil() { ctx.AbortWithStatus(http.StatusNotFound) return } @@ -593,7 +611,7 @@ func (a *api) OnConfReload(conf *conf.Conf) { } func (a *api) onRTMPConnsKick(ctx *gin.Context) { - if a.rtmpServer == nil { + if reflect.ValueOf(a.rtmpServer).IsNil() { ctx.AbortWithStatus(http.StatusNotFound) return } diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 15f10a01..b89a0aff 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "os" "testing" "time" @@ -164,136 +165,170 @@ func TestAPIPathsList(t *testing.T) { require.Equal(t, true, ok) } -func TestAPIRTSPSessionsList(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.close() - - track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) - require.NoError(t, err) - - source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath", - gortsplib.Tracks{track}) - require.NoError(t, err) - defer source.Close() - - var out struct { - Items map[string]struct { - State string `json:"state"` - } `json:"items"` - } - err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtspsessions/list", nil, &out) - require.NoError(t, err) - - var firstID string - for k := range out.Items { - firstID = k - } - - require.Equal(t, "publish", out.Items[firstID].State) -} - -func TestAPIRTSPSessionsKick(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.close() - - track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) - require.NoError(t, err) - - source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath", - gortsplib.Tracks{track}) - require.NoError(t, err) - defer source.Close() - - var out1 struct { - Items map[string]struct{} `json:"items"` - } - err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtspsessions/list", nil, &out1) - require.NoError(t, err) - - var firstID string - for k := range out1.Items { - firstID = k - } - - err = httpRequest(http.MethodPost, "http://localhost:9997/v1/rtspsessions/kick/"+firstID, nil, nil) +func TestAPIList(t *testing.T) { + serverCertFpath, err := writeTempFile(serverCert) require.NoError(t, err) + defer os.Remove(serverCertFpath) - var out2 struct { - Items map[string]struct{} `json:"items"` - } - err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtspsessions/list", nil, &out2) - require.NoError(t, err) - require.Equal(t, 0, len(out2.Items)) -} - -func TestAPIRTMPConnsList(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.close() - - cnt1, err := newContainer("ffmpeg", "source", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.mkv", - "-c", "copy", - "-f", "flv", - "rtmp://localhost:1935/test1/test2", - }) + serverKeyFpath, err := writeTempFile(serverKey) require.NoError(t, err) - defer cnt1.close() - - var out struct { - Items map[string]struct { - State string `json:"state"` - } `json:"items"` + defer os.Remove(serverKeyFpath) + + for _, ca := range []string{ + "rtsp", + "rtsps", + "rtmp", + } { + t.Run(ca, func(t *testing.T) { + p, ok := newInstance("api: yes\n" + + "encryption: optional\n" + + "serverCert: " + serverCertFpath + "\n" + + "serverKey: " + serverKeyFpath + "\n") + require.Equal(t, true, ok) + defer p.close() + + track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + switch ca { + case "rtsp": + source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath", + gortsplib.Tracks{track}) + require.NoError(t, err) + defer source.Close() + + case "rtsps": + source, err := gortsplib.DialPublish("rtsps://localhost:8555/mypath", + gortsplib.Tracks{track}) + require.NoError(t, err) + defer source.Close() + + case "rtmp": + cnt1, err := newContainer("ffmpeg", "source", []string{ + "-re", + "-stream_loop", "-1", + "-i", "emptyvideo.mkv", + "-c", "copy", + "-f", "flv", + "rtmp://localhost:1935/test1/test2", + }) + require.NoError(t, err) + defer cnt1.close() + } + + var pa string + switch ca { + case "rtsp": + pa = "rtspsessions" + + case "rtsps": + pa = "rtspssessions" + + case "rtmp": + pa = "rtmpconns" + } + + var out struct { + Items map[string]struct { + State string `json:"state"` + } `json:"items"` + } + err = httpRequest(http.MethodGet, "http://localhost:9997/v1/"+pa+"/list", nil, &out) + require.NoError(t, err) + + var firstID string + for k := range out.Items { + firstID = k + } + + require.Equal(t, "publish", out.Items[firstID].State) + }) } - err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtmpconns/list", nil, &out) - require.NoError(t, err) - - var firstID string - for k := range out.Items { - firstID = k - } - - require.Equal(t, "publish", out.Items[firstID].State) } -func TestAPIRTSPConnsKick(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.close() - - cnt1, err := newContainer("ffmpeg", "source", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.mkv", - "-c", "copy", - "-f", "flv", - "rtmp://localhost:1935/test1/test2", - }) +func TestAPIKick(t *testing.T) { + serverCertFpath, err := writeTempFile(serverCert) require.NoError(t, err) - defer cnt1.close() + defer os.Remove(serverCertFpath) - var out1 struct { - Items map[string]struct{} `json:"items"` - } - err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtmpconns/list", nil, &out1) + serverKeyFpath, err := writeTempFile(serverKey) require.NoError(t, err) - - var firstID string - for k := range out1.Items { - firstID = k - } - - err = httpRequest(http.MethodPost, "http://localhost:9997/v1/rtmpconns/kick/"+firstID, nil, nil) - require.NoError(t, err) - - var out2 struct { - Items map[string]struct{} `json:"items"` + defer os.Remove(serverKeyFpath) + + for _, ca := range []string{ + "rtsp", + "rtsps", + "rtmp", + } { + t.Run(ca, func(t *testing.T) { + p, ok := newInstance("api: yes\n" + + "encryption: optional\n" + + "serverCert: " + serverCertFpath + "\n" + + "serverKey: " + serverKeyFpath + "\n") + require.Equal(t, true, ok) + defer p.close() + + track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + switch ca { + case "rtsp": + source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath", + gortsplib.Tracks{track}) + require.NoError(t, err) + defer source.Close() + + case "rtsps": + source, err := gortsplib.DialPublish("rtsps://localhost:8555/mypath", + gortsplib.Tracks{track}) + require.NoError(t, err) + defer source.Close() + + case "rtmp": + cnt1, err := newContainer("ffmpeg", "source", []string{ + "-re", + "-stream_loop", "-1", + "-i", "emptyvideo.mkv", + "-c", "copy", + "-f", "flv", + "rtmp://localhost:1935/test1/test2", + }) + require.NoError(t, err) + defer cnt1.close() + } + + var pa string + switch ca { + case "rtsp": + pa = "rtspsessions" + + case "rtsps": + pa = "rtspssessions" + + case "rtmp": + pa = "rtmpconns" + } + + var out1 struct { + Items map[string]struct{} `json:"items"` + } + err = httpRequest(http.MethodGet, "http://localhost:9997/v1/"+pa+"/list", nil, &out1) + require.NoError(t, err) + + var firstID string + for k := range out1.Items { + firstID = k + } + + err = httpRequest(http.MethodPost, "http://localhost:9997/v1/"+pa+"/kick/"+firstID, nil, nil) + require.NoError(t, err) + + var out2 struct { + Items map[string]struct{} `json:"items"` + } + err = httpRequest(http.MethodGet, "http://localhost:9997/v1/"+pa+"/list", nil, &out2) + require.NoError(t, err) + require.Equal(t, 0, len(out2.Items)) + }) } - err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtmpconns/list", nil, &out2) - require.NoError(t, err) - require.Equal(t, 0, len(out2.Items)) } diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index 60c80ed0..387dde5e 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -103,7 +103,7 @@ hlsAddress: :8888 hlsAlwaysRemux: no # number of HLS segments to generate. # increasing segments allows more buffering, -# decreasing segments decrease latency. +# decreasing segments decreases latency. hlsSegmentCount: 3 # minimum duration of each segment. # the real segment duration is also influenced by the interval between IDR frames,