diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index e0447517..c0d752c9 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -241,12 +241,18 @@ components: properties: remoteAddr: type: string + state: + type: string + enum: [idle, read, publish] RTMPConn: type: object properties: remoteAddr: type: string + state: + type: string + enum: [idle, read, publish] paths: /v1/config/get: diff --git a/go.mod b/go.mod index 62baedf9..1a354d75 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20210810153440-c45a1b399530 + github.com/aler9/gortsplib v0.0.0-20210811100517-d05a92be5f04 github.com/asticode/go-astits v1.9.0 github.com/fsnotify/fsnotify v1.4.9 github.com/gin-gonic/gin v1.7.2 diff --git a/go.sum b/go.sum index 3277ebd6..96fd6a78 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20210810153440-c45a1b399530 h1:/Lzuu854GPVUzVHW35QyViBQ4EE2dgP30E6VMULcqF4= -github.com/aler9/gortsplib v0.0.0-20210810153440-c45a1b399530/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI= +github.com/aler9/gortsplib v0.0.0-20210811100517-d05a92be5f04 h1:YrzGYp2+DE+AYaFLwKkKPDCDv8BMTPcRD4kzMZaLM8Y= +github.com/aler9/gortsplib v0.0.0-20210811100517-d05a92be5f04/go.mod h1:s5FsbPRxJhU/YedvUKAKHVY+lQEdYsiJpuN2CHb89cI= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= diff --git a/internal/core/api.go b/internal/core/api.go index 725246e5..1b8d31c6 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -168,6 +168,7 @@ type apiPathsListReq2 struct { type apiRTSPSessionsListItem struct { RemoteAddr string `json:"remoteAddr"` + State string `json:"state"` } type apiRTSPSessionsListData struct { @@ -192,6 +193,7 @@ type apiRTSPSessionsKickReq struct { type apiRTMPConnsListItem struct { RemoteAddr string `json:"remoteAddr"` + State string `json:"state"` } type apiRTMPConnsListData struct { diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 3294f912..15f10a01 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -178,11 +178,19 @@ func TestAPIRTSPSessionsList(t *testing.T) { defer source.Close() var out struct { - Items map[string]struct{} `json:"items"` + Items map[string]struct { + State string `json:"state"` + } `json:"items"` } err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtspsessions/list", nil, &out) require.NoError(t, err) - require.Equal(t, 1, len(out.Items)) + + var firstID string + for k := range out.Items { + firstID = k + } + + require.Equal(t, "publish", out.Items[firstID].State) } func TestAPIRTSPSessionsKick(t *testing.T) { @@ -237,11 +245,19 @@ func TestAPIRTMPConnsList(t *testing.T) { defer cnt1.close() var out struct { - Items map[string]struct{} `json:"items"` + Items map[string]struct { + State string `json:"state"` + } `json:"items"` } err = httpRequest(http.MethodGet, "http://localhost:9997/v1/rtmpconns/list", nil, &out) require.NoError(t, err) - require.Equal(t, 1, len(out.Items)) + + var firstID string + for k := range out.Items { + firstID = k + } + + require.Equal(t, "publish", out.Items[firstID].State) } func TestAPIRTSPConnsKick(t *testing.T) { diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index d83b2847..f13e4c48 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -70,6 +70,8 @@ type rtmpConn struct { ctxCancel func() path *path ringBuffer *ringbuffer.RingBuffer // read + state gortsplib.ServerSessionState + stateMutex sync.Mutex } func newRTMPConn( @@ -141,6 +143,12 @@ func (c *rtmpConn) ip() net.IP { return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP } +func (c *rtmpConn) safeState() gortsplib.ServerSessionState { + c.stateMutex.Lock() + defer c.stateMutex.Unlock() + return c.state +} + func (c *rtmpConn) run() { defer c.wg.Done() @@ -223,6 +231,10 @@ func (c *rtmpConn) runRead(ctx context.Context) error { c.path.OnReaderRemove(pathReaderRemoveReq{Author: c}) }() + c.stateMutex.Lock() + c.state = gortsplib.ServerSessionStateRead + c.stateMutex.Unlock() + var videoTrack *gortsplib.Track videoTrackID := -1 var h264Decoder *rtph264.Decoder @@ -419,6 +431,10 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { c.path.OnPublisherRemove(pathPublisherRemoveReq{Author: c}) }() + c.stateMutex.Lock() + c.state = gortsplib.ServerSessionStatePublish + c.stateMutex.Unlock() + // disable write deadline c.conn.NetConn().SetWriteDeadline(time.Time{}) diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index 8d230d96..0905ed25 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/aler9/gortsplib" + "github.com/aler9/rtsp-simple-server/internal/logger" ) @@ -161,6 +163,16 @@ outer: for c := range s.conns { req.Data.Items[c.ID()] = apiRTMPConnsListItem{ RemoteAddr: c.RemoteAddr().String(), + State: func() string { + switch c.safeState() { + case gortsplib.ServerSessionStateRead: + return "read" + + case gortsplib.ServerSessionStatePublish: + return "publish" + } + return "idle" + }(), } } req.Res <- apiRTMPConnsListRes{} diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index cb20a7bf..18cdeb59 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -354,6 +354,18 @@ func (s *rtspServer) OnAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe for _, s := range s.sessions { req.Data.Items[s.ID()] = apiRTSPSessionsListItem{ RemoteAddr: s.RemoteAddr().String(), + State: func() string { + switch s.safeState() { + case gortsplib.ServerSessionStatePreRead, + gortsplib.ServerSessionStateRead: + return "read" + + case gortsplib.ServerSessionStatePrePublish, + gortsplib.ServerSessionStatePublish: + return "publish" + } + return "idle" + }(), } } s.mutex.RUnlock() diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index f194f86b..3c50d64a 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net" + "sync" "time" "github.com/aler9/gortsplib" @@ -32,6 +33,8 @@ type rtspSession struct { parent rtspSessionParent path *path + state gortsplib.ServerSessionState + stateMutex sync.Mutex setuppedTracks map[int]*gortsplib.Track // read onReadCmd *externalcmd.Cmd // read announcedTracks gortsplib.Tracks // publish @@ -63,18 +66,18 @@ func newRTSPSession( // ParentClose closes a Session. func (s *rtspSession) ParentClose() { - if s.ss.State() == gortsplib.ServerSessionStatePlay { + if s.ss.State() == gortsplib.ServerSessionStateRead { if s.onReadCmd != nil { s.onReadCmd.Close() } } switch s.ss.State() { - case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay: + case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead: s.path.OnReaderRemove(pathReaderRemoveReq{Author: s}) s.path = nil - case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord: + case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish: s.path.OnPublisherRemove(pathPublisherRemoveReq{Author: s}) s.path = nil } @@ -95,6 +98,12 @@ func (s *rtspSession) ID() string { return s.id } +func (s *rtspSession) safeState() gortsplib.ServerSessionState { + s.stateMutex.Lock() + defer s.stateMutex.Unlock() + return s.state +} + // RemoteAddr returns the remote address of the author of the session. func (s *rtspSession) RemoteAddr() net.Addr { return s.author.NetConn().RemoteAddr() @@ -143,6 +152,10 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno s.path = res.Path s.announcedTracks = ctx.Tracks + s.stateMutex.Lock() + s.state = gortsplib.ServerSessionStatePrePublish + s.stateMutex.Unlock() + return &base.Response{ StatusCode: base.StatusOK, }, nil @@ -171,7 +184,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt } switch s.ss.State() { - case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play + case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePreRead: // play res := s.pathManager.OnReaderSetupPlay(pathReaderSetupPlayReq{ Author: s, PathName: ctx.Path, @@ -217,6 +230,10 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt } s.setuppedTracks[ctx.TrackID] = res.Stream.tracks()[ctx.TrackID] + s.stateMutex.Lock() + s.state = gortsplib.ServerSessionStatePreRead + s.stateMutex.Unlock() + return &base.Response{ StatusCode: base.StatusOK, }, res.Stream.rtspStream, nil @@ -232,7 +249,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { h := make(base.Header) - if s.ss.State() == gortsplib.ServerSessionStatePrePlay { + if s.ss.State() == gortsplib.ServerSessionStatePreRead { s.path.OnReaderPlay(pathReaderPlayReq{Author: s}) if s.path.Conf().RunOnRead != "" { @@ -242,6 +259,10 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo Port: port, }) } + + s.stateMutex.Lock() + s.state = gortsplib.ServerSessionStateRead + s.stateMutex.Unlock() } return &base.Response{ @@ -264,6 +285,10 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R s.stream = res.Stream + s.stateMutex.Lock() + s.state = gortsplib.ServerSessionStatePublish + s.stateMutex.Unlock() + return &base.Response{ StatusCode: base.StatusOK, }, nil @@ -272,15 +297,23 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R // OnPause is called by rtspServer. func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) { switch s.ss.State() { - case gortsplib.ServerSessionStatePlay: + case gortsplib.ServerSessionStateRead: if s.onReadCmd != nil { s.onReadCmd.Close() } s.path.OnReaderPause(pathReaderPauseReq{Author: s}) - case gortsplib.ServerSessionStateRecord: + s.stateMutex.Lock() + s.state = gortsplib.ServerSessionStatePreRead + s.stateMutex.Unlock() + + case gortsplib.ServerSessionStatePublish: s.path.OnPublisherPause(pathPublisherPauseReq{Author: s}) + + s.stateMutex.Lock() + s.state = gortsplib.ServerSessionStatePrePublish + s.stateMutex.Unlock() } return &base.Response{ @@ -341,7 +374,7 @@ func (s *rtspSession) OnPublisherAccepted(tracksLen int) { // OnFrame is called by rtspServer. func (s *rtspSession) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { - if s.ss.State() != gortsplib.ServerSessionStateRecord { + if s.ss.State() != gortsplib.ServerSessionStatePublish { return }