Browse Source

api: add RTSPS endpoints to list and kick RTSPS sessions

pull/509/head
aler9 5 years ago
parent
commit
da7f9c7ee1
  1. 69
      apidocs/openapi.yaml
  2. 80
      internal/core/api.go
  3. 279
      internal/core/api_test.go
  4. 2
      rtsp-simple-server.yml

69
apidocs/openapi.yaml

@ -182,6 +182,7 @@ components: @@ -182,6 +182,7 @@ components:
source:
oneOf:
- $ref: '#/components/schemas/PathSourceRTSPSession'
- $ref: '#/components/schemas/PathSourceRTSPSSession'
- $ref: '#/components/schemas/PathSourceRTMPConn'
sourceReady:
type: boolean
@ -190,6 +191,7 @@ components: @@ -190,6 +191,7 @@ components:
items:
oneOf:
- $ref: '#/components/schemas/PathReaderRTSPSession'
- $ref: '#/components/schemas/PathReaderRTSPSSession'
- $ref: '#/components/schemas/PathReaderRTMPConn'
- $ref: '#/components/schemas/PathReaderHLSMuxer'
@ -202,6 +204,15 @@ components: @@ -202,6 +204,15 @@ components:
id:
type: string
PathSourceRTSPSSession:
type: object
properties:
type:
type: string
enum: [rtspssession]
id:
type: string
PathSourceRTMPConn:
type: object
properties:
@ -220,6 +231,15 @@ components: @@ -220,6 +231,15 @@ components:
id:
type: string
PathReaderRTSPSSession:
type: object
properties:
type:
type: string
enum: [rtspssession]
id:
type: string
PathReaderRTMPConn:
type: object
properties:
@ -245,6 +265,15 @@ components: @@ -245,6 +265,15 @@ components:
type: string
enum: [idle, read, publish]
RTSPSSession:
type: object
properties:
remoteAddr:
type: string
state:
type: string
enum: [idle, read, publish]
RTMPConn:
type: object
properties:
@ -421,6 +450,46 @@ paths: @@ -421,6 +450,46 @@ paths:
'500':
description: internal server error.
/v1/rtspssessions/list:
get:
operationId: rtspsSessionsList
summary: returns all active RTSPS sessions.
description: ''
responses:
'200':
description: the request was successful.
content:
application/json:
schema:
items:
type: object
additionalProperties:
$ref: '#/components/schemas/RTSPSSession'
'400':
description: invalid request.
'500':
description: internal server error.
/v1/rtspssessions/kick/{id}:
post:
operationId: rtspsSessionsKick
summary: kicks out a RTSPS session from the server.
description: ''
parameters:
- name: id
in: path
required: true
description: the ID of the session.
schema:
type: string
responses:
'200':
description: the request was successful.
'400':
description: invalid request.
'500':
description: internal server error.
/v1/rtmpconns/list:
get:
operationId: rtmpConnsList

80
internal/core/api.go

@ -283,6 +283,8 @@ func newAPI( @@ -283,6 +283,8 @@ func newAPI(
group.GET("/v1/paths/list", a.onPathsList)
group.GET("/v1/rtspsessions/list", a.onRTSPSessionsList)
group.POST("/v1/rtspsessions/kick/:id", a.onRTSPSessionsKick)
group.GET("/v1/rtspssessions/list", a.onRTSPSSessionsList)
group.POST("/v1/rtspssessions/kick/:id", a.onRTSPSSessionsKick)
group.GET("/v1/rtmpconns/list", a.onRTMPConnsList)
group.POST("/v1/rtmpconns/kick/:id", a.onRTMPConnsKick)
@ -511,7 +513,7 @@ func (a *api) onPathsList(ctx *gin.Context) { @@ -511,7 +513,7 @@ func (a *api) onPathsList(ctx *gin.Context) {
}
func (a *api) onRTSPSessionsList(ctx *gin.Context) {
if a.rtspServer == nil && a.rtspsServer == nil {
if reflect.ValueOf(a.rtspServer).IsNil() {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
@ -520,54 +522,70 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) { @@ -520,54 +522,70 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) {
Items: make(map[string]apiRTSPSessionsListItem),
}
if !reflect.ValueOf(a.rtspServer).IsNil() {
res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
}
if !reflect.ValueOf(a.rtspsServer).IsNil() {
res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
res := a.rtspServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
if a.rtspServer == nil && a.rtspsServer == nil {
if reflect.ValueOf(a.rtspServer).IsNil() {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
id := ctx.Param("id")
if a.rtspServer != nil {
res := a.rtspServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id})
if res.Err == nil {
ctx.Status(http.StatusOK)
return
}
res := a.rtspServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
if a.rtspsServer != nil {
res := a.rtspsServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id})
if res.Err != nil {
ctx.Status(http.StatusOK)
return
}
ctx.Status(http.StatusOK)
}
func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
if reflect.ValueOf(a.rtspsServer).IsNil() {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
data := apiRTSPSessionsListData{
Items: make(map[string]apiRTSPSessionsListItem),
}
ctx.AbortWithStatus(http.StatusNotFound)
res := a.rtspsServer.OnAPIRTSPSessionsList(apiRTSPSessionsListReq{Data: &data})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
if reflect.ValueOf(a.rtspsServer).IsNil() {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
id := ctx.Param("id")
res := a.rtspsServer.OnAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id})
if res.Err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
ctx.Status(http.StatusOK)
}
func (a *api) onRTMPConnsList(ctx *gin.Context) {
if a.rtmpServer == nil {
if reflect.ValueOf(a.rtmpServer).IsNil() {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
@ -593,7 +611,7 @@ func (a *api) OnConfReload(conf *conf.Conf) { @@ -593,7 +611,7 @@ func (a *api) OnConfReload(conf *conf.Conf) {
}
func (a *api) onRTMPConnsKick(ctx *gin.Context) {
if a.rtmpServer == nil {
if reflect.ValueOf(a.rtmpServer).IsNil() {
ctx.AbortWithStatus(http.StatusNotFound)
return
}

279
internal/core/api_test.go

@ -6,6 +6,7 @@ import ( @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"testing"
"time"
@ -164,136 +165,170 @@ func TestAPIPathsList(t *testing.T) { @@ -164,136 +165,170 @@ func TestAPIPathsList(t *testing.T) {
require.Equal(t, true, ok)
}
func TestAPIRTSPSessionsList(t *testing.T) {
p, ok := newInstance("api: yes\n")
require.Equal(t, true, ok)
defer p.close()
track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
var out struct {
Items map[string]struct {
State string `json:"state"`
} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtspsessions/list", nil, &out)
require.NoError(t, err)
var firstID string
for k := range out.Items {
firstID = k
}
require.Equal(t, "publish", out.Items[firstID].State)
}
func TestAPIRTSPSessionsKick(t *testing.T) {
p, ok := newInstance("api: yes\n")
require.Equal(t, true, ok)
defer p.close()
track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
var out1 struct {
Items map[string]struct{} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtspsessions/list", nil, &out1)
require.NoError(t, err)
var firstID string
for k := range out1.Items {
firstID = k
}
err = httpRequest(http.MethodPost, "http://localhost:9997/v1/rtspsessions/kick/"+firstID, nil, nil)
func TestAPIList(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
var out2 struct {
Items map[string]struct{} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtspsessions/list", nil, &out2)
require.NoError(t, err)
require.Equal(t, 0, len(out2.Items))
}
func TestAPIRTMPConnsList(t *testing.T) {
p, ok := newInstance("api: yes\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost:1935/test1/test2",
})
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer cnt1.close()
var out struct {
Items map[string]struct {
State string `json:"state"`
} `json:"items"`
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp",
"rtsps",
"rtmp",
} {
t.Run(ca, func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"encryption: optional\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n")
require.Equal(t, true, ok)
defer p.close()
track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
switch ca {
case "rtsp":
source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
case "rtsps":
source, err := gortsplib.DialPublish("rtsps://localhost:8555/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
case "rtmp":
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost:1935/test1/test2",
})
require.NoError(t, err)
defer cnt1.close()
}
var pa string
switch ca {
case "rtsp":
pa = "rtspsessions"
case "rtsps":
pa = "rtspssessions"
case "rtmp":
pa = "rtmpconns"
}
var out struct {
Items map[string]struct {
State string `json:"state"`
} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/"+pa+"/list", nil, &out)
require.NoError(t, err)
var firstID string
for k := range out.Items {
firstID = k
}
require.Equal(t, "publish", out.Items[firstID].State)
})
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtmpconns/list", nil, &out)
require.NoError(t, err)
var firstID string
for k := range out.Items {
firstID = k
}
require.Equal(t, "publish", out.Items[firstID].State)
}
func TestAPIRTSPConnsKick(t *testing.T) {
p, ok := newInstance("api: yes\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost:1935/test1/test2",
})
func TestAPIKick(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer cnt1.close()
defer os.Remove(serverCertFpath)
var out1 struct {
Items map[string]struct{} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtmpconns/list", nil, &out1)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
var firstID string
for k := range out1.Items {
firstID = k
}
err = httpRequest(http.MethodPost, "http://localhost:9997/v1/rtmpconns/kick/"+firstID, nil, nil)
require.NoError(t, err)
var out2 struct {
Items map[string]struct{} `json:"items"`
defer os.Remove(serverKeyFpath)
for _, ca := range []string{
"rtsp",
"rtsps",
"rtmp",
} {
t.Run(ca, func(t *testing.T) {
p, ok := newInstance("api: yes\n" +
"encryption: optional\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n")
require.Equal(t, true, ok)
defer p.close()
track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
switch ca {
case "rtsp":
source, err := gortsplib.DialPublish("rtsp://localhost:8554/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
case "rtsps":
source, err := gortsplib.DialPublish("rtsps://localhost:8555/mypath",
gortsplib.Tracks{track})
require.NoError(t, err)
defer source.Close()
case "rtmp":
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost:1935/test1/test2",
})
require.NoError(t, err)
defer cnt1.close()
}
var pa string
switch ca {
case "rtsp":
pa = "rtspsessions"
case "rtsps":
pa = "rtspssessions"
case "rtmp":
pa = "rtmpconns"
}
var out1 struct {
Items map[string]struct{} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/"+pa+"/list", nil, &out1)
require.NoError(t, err)
var firstID string
for k := range out1.Items {
firstID = k
}
err = httpRequest(http.MethodPost, "http://localhost:9997/v1/"+pa+"/kick/"+firstID, nil, nil)
require.NoError(t, err)
var out2 struct {
Items map[string]struct{} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/"+pa+"/list", nil, &out2)
require.NoError(t, err)
require.Equal(t, 0, len(out2.Items))
})
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtmpconns/list", nil, &out2)
require.NoError(t, err)
require.Equal(t, 0, len(out2.Items))
}

2
rtsp-simple-server.yml

@ -103,7 +103,7 @@ hlsAddress: :8888 @@ -103,7 +103,7 @@ hlsAddress: :8888
hlsAlwaysRemux: no
# number of HLS segments to generate.
# increasing segments allows more buffering,
# decreasing segments decrease latency.
# decreasing segments decreases latency.
hlsSegmentCount: 3
# minimum duration of each segment.
# the real segment duration is also influenced by the interval between IDR frames,

Loading…
Cancel
Save