Browse Source

api: add "state" field to RTSP sessions and RTMP connections

pull/509/head
aler9 5 years ago
parent
commit
20ac25571c
  1. 6
      apidocs/openapi.yaml
  2. 2
      go.mod
  3. 4
      go.sum
  4. 2
      internal/core/api.go
  5. 24
      internal/core/api_test.go
  6. 16
      internal/core/rtmp_conn.go
  7. 12
      internal/core/rtmp_server.go
  8. 12
      internal/core/rtsp_server.go
  9. 49
      internal/core/rtsp_session.go

6
apidocs/openapi.yaml

@ -241,12 +241,18 @@ components: @@ -241,12 +241,18 @@ components:
properties:
remoteAddr:
type: string
state:
type: string
enum: [idle, read, publish]
RTMPConn:
type: object
properties:
remoteAddr:
type: string
state:
type: string
enum: [idle, read, publish]
paths:
/v1/config/get:

2
go.mod

@ -5,7 +5,7 @@ go 1.16 @@ -5,7 +5,7 @@ go 1.16
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210810153440-c45a1b399530
github.com/aler9/gortsplib v0.0.0-20210811100517-d05a92be5f04
github.com/asticode/go-astits v1.9.0
github.com/fsnotify/fsnotify v1.4.9
github.com/gin-gonic/gin v1.7.2

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210810153440-c45a1b399530 h1:/Lzuu854GPVUzVHW35QyViBQ4EE2dgP30E6VMULcqF4=
github.com/aler9/gortsplib v0.0.0-20210810153440-c45a1b399530/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI=
github.com/aler9/gortsplib v0.0.0-20210811100517-d05a92be5f04 h1:YrzGYp2+DE+AYaFLwKkKPDCDv8BMTPcRD4kzMZaLM8Y=
github.com/aler9/gortsplib v0.0.0-20210811100517-d05a92be5f04/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=

2
internal/core/api.go

@ -168,6 +168,7 @@ type apiPathsListReq2 struct { @@ -168,6 +168,7 @@ type apiPathsListReq2 struct {
type apiRTSPSessionsListItem struct {
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
}
type apiRTSPSessionsListData struct {
@ -192,6 +193,7 @@ type apiRTSPSessionsKickReq struct { @@ -192,6 +193,7 @@ type apiRTSPSessionsKickReq struct {
type apiRTMPConnsListItem struct {
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
}
type apiRTMPConnsListData struct {

24
internal/core/api_test.go

@ -178,11 +178,19 @@ func TestAPIRTSPSessionsList(t *testing.T) { @@ -178,11 +178,19 @@ func TestAPIRTSPSessionsList(t *testing.T) {
defer source.Close()
var out struct {
Items map[string]struct{} `json:"items"`
Items map[string]struct {
State string `json:"state"`
} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtspsessions/list", nil, &out)
require.NoError(t, err)
require.Equal(t, 1, len(out.Items))
var firstID string
for k := range out.Items {
firstID = k
}
require.Equal(t, "publish", out.Items[firstID].State)
}
func TestAPIRTSPSessionsKick(t *testing.T) {
@ -237,11 +245,19 @@ func TestAPIRTMPConnsList(t *testing.T) { @@ -237,11 +245,19 @@ func TestAPIRTMPConnsList(t *testing.T) {
defer cnt1.close()
var out struct {
Items map[string]struct{} `json:"items"`
Items map[string]struct {
State string `json:"state"`
} `json:"items"`
}
err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtmpconns/list", nil, &out)
require.NoError(t, err)
require.Equal(t, 1, len(out.Items))
var firstID string
for k := range out.Items {
firstID = k
}
require.Equal(t, "publish", out.Items[firstID].State)
}
func TestAPIRTSPConnsKick(t *testing.T) {

16
internal/core/rtmp_conn.go

@ -70,6 +70,8 @@ type rtmpConn struct { @@ -70,6 +70,8 @@ type rtmpConn struct {
ctxCancel func()
path *path
ringBuffer *ringbuffer.RingBuffer // read
state gortsplib.ServerSessionState
stateMutex sync.Mutex
}
func newRTMPConn(
@ -141,6 +143,12 @@ func (c *rtmpConn) ip() net.IP { @@ -141,6 +143,12 @@ func (c *rtmpConn) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
}
func (c *rtmpConn) safeState() gortsplib.ServerSessionState {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.state
}
func (c *rtmpConn) run() {
defer c.wg.Done()
@ -223,6 +231,10 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -223,6 +231,10 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
c.path.OnReaderRemove(pathReaderRemoveReq{Author: c})
}()
c.stateMutex.Lock()
c.state = gortsplib.ServerSessionStateRead
c.stateMutex.Unlock()
var videoTrack *gortsplib.Track
videoTrackID := -1
var h264Decoder *rtph264.Decoder
@ -419,6 +431,10 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { @@ -419,6 +431,10 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
c.path.OnPublisherRemove(pathPublisherRemoveReq{Author: c})
}()
c.stateMutex.Lock()
c.state = gortsplib.ServerSessionStatePublish
c.stateMutex.Unlock()
// disable write deadline
c.conn.NetConn().SetWriteDeadline(time.Time{})

12
internal/core/rtmp_server.go

@ -10,6 +10,8 @@ import ( @@ -10,6 +10,8 @@ import (
"sync"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
@ -161,6 +163,16 @@ outer: @@ -161,6 +163,16 @@ outer:
for c := range s.conns {
req.Data.Items[c.ID()] = apiRTMPConnsListItem{
RemoteAddr: c.RemoteAddr().String(),
State: func() string {
switch c.safeState() {
case gortsplib.ServerSessionStateRead:
return "read"
case gortsplib.ServerSessionStatePublish:
return "publish"
}
return "idle"
}(),
}
}
req.Res <- apiRTMPConnsListRes{}

12
internal/core/rtsp_server.go

@ -354,6 +354,18 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe @@ -354,6 +354,18 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe
for _, s := range s.sessions {
req.Data.Items[s.ID()] = apiRTSPSessionsListItem{
RemoteAddr: s.RemoteAddr().String(),
State: func() string {
switch s.safeState() {
case gortsplib.ServerSessionStatePreRead,
gortsplib.ServerSessionStateRead:
return "read"
case gortsplib.ServerSessionStatePrePublish,
gortsplib.ServerSessionStatePublish:
return "publish"
}
return "idle"
}(),
}
}
s.mutex.RUnlock()

49
internal/core/rtsp_session.go

@ -4,6 +4,7 @@ import ( @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/aler9/gortsplib"
@ -32,6 +33,8 @@ type rtspSession struct { @@ -32,6 +33,8 @@ type rtspSession struct {
parent rtspSessionParent
path *path
state gortsplib.ServerSessionState
stateMutex sync.Mutex
setuppedTracks map[int]*gortsplib.Track // read
onReadCmd *externalcmd.Cmd // read
announcedTracks gortsplib.Tracks // publish
@ -63,18 +66,18 @@ func newRTSPSession( @@ -63,18 +66,18 @@ func newRTSPSession(
// ParentClose closes a Session.
func (s *rtspSession) ParentClose() {
if s.ss.State() == gortsplib.ServerSessionStatePlay {
if s.ss.State() == gortsplib.ServerSessionStateRead {
if s.onReadCmd != nil {
s.onReadCmd.Close()
}
}
switch s.ss.State() {
case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay:
case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead:
s.path.OnReaderRemove(pathReaderRemoveReq{Author: s})
s.path = nil
case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord:
case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish:
s.path.OnPublisherRemove(pathPublisherRemoveReq{Author: s})
s.path = nil
}
@ -95,6 +98,12 @@ func (s *rtspSession) ID() string { @@ -95,6 +98,12 @@ func (s *rtspSession) ID() string {
return s.id
}
func (s *rtspSession) safeState() gortsplib.ServerSessionState {
s.stateMutex.Lock()
defer s.stateMutex.Unlock()
return s.state
}
// RemoteAddr returns the remote address of the author of the session.
func (s *rtspSession) RemoteAddr() net.Addr {
return s.author.NetConn().RemoteAddr()
@ -143,6 +152,10 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno @@ -143,6 +152,10 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
s.path = res.Path
s.announcedTracks = ctx.Tracks
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePrePublish
s.stateMutex.Unlock()
return &base.Response{
StatusCode: base.StatusOK,
}, nil
@ -171,7 +184,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -171,7 +184,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
}
switch s.ss.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePreRead: // play
res := s.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{
Author: s,
PathName: ctx.Path,
@ -217,6 +230,10 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -217,6 +230,10 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
}
s.setuppedTracks[ctx.TrackID] = res.Stream.tracks()[ctx.TrackID]
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePreRead
s.stateMutex.Unlock()
return &base.Response{
StatusCode: base.StatusOK,
}, res.Stream.rtspStream, nil
@ -232,7 +249,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -232,7 +249,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
h := make(base.Header)
if s.ss.State() == gortsplib.ServerSessionStatePrePlay {
if s.ss.State() == gortsplib.ServerSessionStatePreRead {
s.path.OnReaderPlay(pathReaderPlayReq{Author: s})
if s.path.Conf().RunOnRead != "" {
@ -242,6 +259,10 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -242,6 +259,10 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
Port: port,
})
}
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStateRead
s.stateMutex.Unlock()
}
return &base.Response{
@ -264,6 +285,10 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -264,6 +285,10 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
s.stream = res.Stream
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePublish
s.stateMutex.Unlock()
return &base.Response{
StatusCode: base.StatusOK,
}, nil
@ -272,15 +297,23 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -272,15 +297,23 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
// OnPause is called by rtspServer.
func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) {
switch s.ss.State() {
case gortsplib.ServerSessionStatePlay:
case gortsplib.ServerSessionStateRead:
if s.onReadCmd != nil {
s.onReadCmd.Close()
}
s.path.OnReaderPause(pathReaderPauseReq{Author: s})
case gortsplib.ServerSessionStateRecord:
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePreRead
s.stateMutex.Unlock()
case gortsplib.ServerSessionStatePublish:
s.path.OnPublisherPause(pathPublisherPauseReq{Author: s})
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePrePublish
s.stateMutex.Unlock()
}
return &base.Response{
@ -341,7 +374,7 @@ func (s *rtspSession) OnPublisherAccepted(tracksLen int) { @@ -341,7 +374,7 @@ func (s *rtspSession) OnPublisherAccepted(tracksLen int) {
// OnFrame is called by rtspServer.
func (s *rtspSession) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
if s.ss.State() != gortsplib.ServerSessionStateRecord {
if s.ss.State() != gortsplib.ServerSessionStatePublish {
return
}

Loading…
Cancel
Save