Browse Source

api: add path to RTMP connections, RTSP sessions, WebRTC sessions (#1962) (#2022)

* api: add path to rtmp response

* add 'path' to RTSP and WebRTC sessions too

* add tests

---------

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
pull/2031/head
Volodymyr Borodin 3 years ago committed by GitHub
parent
commit
47317ea8e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      apidocs/openapi.yaml
  2. 3
      internal/core/api_defs.go
  3. 44
      internal/core/api_test.go
  4. 35
      internal/core/rtmp_conn.go
  5. 53
      internal/core/rtsp_session.go
  6. 29
      internal/core/webrtc_session.go

6
apidocs/openapi.yaml

@ -389,6 +389,8 @@ components:
state: state:
type: string type: string
enum: [idle, read, publish] enum: [idle, read, publish]
path:
type: string
bytesReceived: bytesReceived:
type: integer type: integer
format: int64 format: int64
@ -408,6 +410,8 @@ components:
state: state:
type: string type: string
enum: [idle, read, publish] enum: [idle, read, publish]
path:
type: string
bytesReceived: bytesReceived:
type: integer type: integer
format: int64 format: int64
@ -496,6 +500,8 @@ components:
state: state:
type: string type: string
enum: [read, publish] enum: [read, publish]
path:
type: string
bytesReceived: bytesReceived:
type: integer type: integer
format: int64 format: int64

3
internal/core/api_defs.go

@ -57,6 +57,7 @@ type apiRTMPConn struct {
Created time.Time `json:"created"` Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"` RemoteAddr string `json:"remoteAddr"`
State string `json:"state"` State string `json:"state"`
Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"` BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"` BytesSent uint64 `json:"bytesSent"`
} }
@ -72,6 +73,7 @@ type apiRTSPSession struct {
Created time.Time `json:"created"` Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"` RemoteAddr string `json:"remoteAddr"`
State string `json:"state"` State string `json:"state"`
Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"` BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"` BytesSent uint64 `json:"bytesSent"`
} }
@ -90,6 +92,7 @@ type apiWebRTCSession struct {
LocalCandidate string `json:"localCandidate"` LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"` RemoteCandidate string `json:"remoteCandidate"`
State string `json:"state"` State string `json:"state"`
Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"` BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"` BytesSent uint64 `json:"bytesSent"`
} }

44
internal/core/api_test.go

@ -685,25 +685,33 @@ func TestAPIProtocolList(t *testing.T) {
pa = "rtmpsconns" pa = "rtmpsconns"
} }
type item struct {
State string `json:"state"`
Path string `json:"path"`
}
var out struct { var out struct {
ItemCount int `json:"itemCount"` ItemCount int `json:"itemCount"`
Items []struct { Items []item `json:"items"`
State string `json:"state"`
} `json:"items"`
} }
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/"+pa+"/list", nil, &out) httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/"+pa+"/list", nil, &out)
if ca != "rtsp conns" && ca != "rtsps conns" { if ca != "rtsp conns" && ca != "rtsps conns" {
require.Equal(t, "publish", out.Items[0].State) require.Equal(t, item{
State: "publish",
Path: "mypath",
}, out.Items[0])
} }
case "hls": case "hls":
type item struct {
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
}
var out struct { var out struct {
ItemCount int `json:"itemCount"` ItemCount int `json:"itemCount"`
Items []struct { Items []item `json:"items"`
Created string `json:"created"`
LastRequest string `json:"lastRequest"`
} `json:"items"`
} }
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/hlsmuxers/list", nil, &out) httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/hlsmuxers/list", nil, &out)
@ -713,13 +721,9 @@ func TestAPIProtocolList(t *testing.T) {
case "webrtc": case "webrtc":
type item struct { type item struct {
Created time.Time `json:"created"` PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
RemoteAddr string `json:"remoteAddr"` State string `json:"state"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"` Path string `json:"path"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
} }
var out struct { var out struct {
@ -728,7 +732,11 @@ func TestAPIProtocolList(t *testing.T) {
} }
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/webrtcsessions/list", nil, &out) httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/webrtcsessions/list", nil, &out)
require.Equal(t, true, out.Items[0].PeerConnectionEstablished) require.Equal(t, item{
PeerConnectionEstablished: true,
State: "read",
Path: "mypath",
}, out.Items[0])
} }
}) })
} }

35
internal/core/rtmp_conn.go

@ -210,12 +210,13 @@ type rtmpConn struct {
pathManager rtmpConnPathManager pathManager rtmpConnPathManager
parent rtmpConnParent parent rtmpConnParent
ctx context.Context ctx context.Context
ctxCancel func() ctxCancel func()
uuid uuid.UUID uuid uuid.UUID
created time.Time created time.Time
state rtmpConnState mutex sync.Mutex
stateMutex sync.Mutex state rtmpConnState
pathName string
} }
func newRTMPConn( func newRTMPConn(
@ -279,12 +280,6 @@ func (c *rtmpConn) ip() net.IP {
return c.nconn.RemoteAddr().(*net.TCPAddr).IP return c.nconn.RemoteAddr().(*net.TCPAddr).IP
} }
func (c *rtmpConn) safeState() rtmpConnState {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.state
}
func (c *rtmpConn) run() { func (c *rtmpConn) run() {
defer c.wg.Done() defer c.wg.Done()
@ -380,9 +375,10 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
defer res.path.readerRemove(pathReaderRemoveReq{author: c}) defer res.path.readerRemove(pathReaderRemoveReq{author: c})
c.stateMutex.Lock() c.mutex.Lock()
c.state = rtmpConnStateRead c.state = rtmpConnStateRead
c.stateMutex.Unlock() c.pathName = pathName
c.mutex.Unlock()
ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount)) ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount))
go func() { go func() {
@ -794,9 +790,10 @@ func (c *rtmpConn) runPublish(u *url.URL) error {
defer res.path.publisherRemove(pathPublisherRemoveReq{author: c}) defer res.path.publisherRemove(pathPublisherRemoveReq{author: c})
c.stateMutex.Lock() c.mutex.Lock()
c.state = rtmpConnStatePublish c.state = rtmpConnStatePublish
c.stateMutex.Unlock() c.pathName = pathName
c.mutex.Unlock()
videoFormat, audioFormat, err := c.conn.ReadTracks() videoFormat, audioFormat, err := c.conn.ReadTracks()
if err != nil { if err != nil {
@ -892,12 +889,15 @@ func (c *rtmpConn) apiSourceDescribe() pathAPISourceOrReader {
} }
func (c *rtmpConn) apiItem() *apiRTMPConn { func (c *rtmpConn) apiItem() *apiRTMPConn {
c.mutex.Lock()
defer c.mutex.Unlock()
return &apiRTMPConn{ return &apiRTMPConn{
ID: c.uuid, ID: c.uuid,
Created: c.created, Created: c.created,
RemoteAddr: c.remoteAddr().String(), RemoteAddr: c.remoteAddr().String(),
State: func() string { State: func() string {
switch c.safeState() { switch c.state {
case rtmpConnStateRead: case rtmpConnStateRead:
return "read" return "read"
@ -906,6 +906,7 @@ func (c *rtmpConn) apiItem() *apiRTMPConn {
} }
return "idle" return "idle"
}(), }(),
Path: c.pathName,
BytesReceived: c.conn.BytesReceived(), BytesReceived: c.conn.BytesReceived(),
BytesSent: c.conn.BytesSent(), BytesSent: c.conn.BytesSent(),
} }

53
internal/core/rtsp_session.go

@ -37,13 +37,14 @@ type rtspSession struct {
pathManager rtspSessionPathManager pathManager rtspSessionPathManager
parent rtspSessionParent parent rtspSessionParent
uuid uuid.UUID uuid uuid.UUID
created time.Time created time.Time
path *path path *path
stream *stream stream *stream
state gortsplib.ServerSessionState onReadCmd *externalcmd.Cmd // read
stateMutex sync.Mutex mutex sync.Mutex
onReadCmd *externalcmd.Cmd // read state gortsplib.ServerSessionState
pathName string
} }
func newRTSPSession( func newRTSPSession(
@ -77,12 +78,6 @@ func (s *rtspSession) close() {
s.session.Close() s.session.Close()
} }
func (s *rtspSession) safeState() gortsplib.ServerSessionState {
s.stateMutex.Lock()
defer s.stateMutex.Unlock()
return s.state
}
func (s *rtspSession) remoteAddr() net.Addr { func (s *rtspSession) remoteAddr() net.Addr {
return s.author.NetConn().RemoteAddr() return s.author.NetConn().RemoteAddr()
} }
@ -157,9 +152,10 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
s.path = res.path s.path = res.path
s.stateMutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord s.state = gortsplib.ServerSessionStatePreRecord
s.stateMutex.Unlock() s.pathName = ctx.Path
s.mutex.Unlock()
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
@ -242,9 +238,10 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
s.path = res.path s.path = res.path
s.stream = res.stream s.stream = res.stream
s.stateMutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay s.state = gortsplib.ServerSessionStatePrePlay
s.stateMutex.Unlock() s.pathName = ctx.Path
s.mutex.Unlock()
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
@ -281,9 +278,9 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons
}) })
} }
s.stateMutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePlay s.state = gortsplib.ServerSessionStatePlay
s.stateMutex.Unlock() s.mutex.Unlock()
} }
return &base.Response{ return &base.Response{
@ -323,9 +320,9 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
} }
} }
s.stateMutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStateRecord s.state = gortsplib.ServerSessionStateRecord
s.stateMutex.Unlock() s.mutex.Unlock()
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
@ -341,16 +338,16 @@ func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Respo
s.onReadCmd.Close() s.onReadCmd.Close()
} }
s.stateMutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay s.state = gortsplib.ServerSessionStatePrePlay
s.stateMutex.Unlock() s.mutex.Unlock()
case gortsplib.ServerSessionStateRecord: case gortsplib.ServerSessionStateRecord:
s.path.publisherStop(pathPublisherStopReq{author: s}) s.path.publisherStop(pathPublisherStopReq{author: s})
s.stateMutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord s.state = gortsplib.ServerSessionStatePreRecord
s.stateMutex.Unlock() s.mutex.Unlock()
} }
return &base.Response{ return &base.Response{
@ -387,12 +384,15 @@ func (s *rtspSession) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx
} }
func (s *rtspSession) apiItem() *apiRTSPSession { func (s *rtspSession) apiItem() *apiRTSPSession {
s.mutex.Lock()
defer s.mutex.Unlock()
return &apiRTSPSession{ return &apiRTSPSession{
ID: s.uuid, ID: s.uuid,
Created: s.created, Created: s.created,
RemoteAddr: s.remoteAddr().String(), RemoteAddr: s.remoteAddr().String(),
State: func() string { State: func() string {
switch s.safeState() { switch s.state {
case gortsplib.ServerSessionStatePrePlay, case gortsplib.ServerSessionStatePrePlay,
gortsplib.ServerSessionStatePlay: gortsplib.ServerSessionStatePlay:
return "read" return "read"
@ -403,6 +403,7 @@ func (s *rtspSession) apiItem() *apiRTSPSession {
} }
return "idle" return "idle"
}(), }(),
Path: s.pathName,
BytesReceived: s.session.BytesReceived(), BytesReceived: s.session.BytesReceived(),
BytesSent: s.session.BytesSent(), BytesSent: s.session.BytesSent(),
} }

29
internal/core/webrtc_session.go

@ -126,9 +126,9 @@ type webRTCSession struct {
created time.Time created time.Time
uuid uuid.UUID uuid uuid.UUID
secret uuid.UUID secret uuid.UUID
pcMutex sync.RWMutex
pc *peerConnection
answerSent bool answerSent bool
mutex sync.RWMutex
pc *peerConnection
chAddRemoteCandidates chan webRTCSessionAddCandidatesReq chAddRemoteCandidates chan webRTCSessionAddCandidatesReq
} }
@ -180,12 +180,6 @@ func (s *webRTCSession) close() {
s.ctxCancel() s.ctxCancel()
} }
func (s *webRTCSession) safePC() *peerConnection {
s.pcMutex.RLock()
defer s.pcMutex.RUnlock()
return s.pc
}
func (s *webRTCSession) run() { func (s *webRTCSession) run() {
defer s.wg.Done() defer s.wg.Done()
@ -491,9 +485,9 @@ outer:
} }
} }
s.pcMutex.Lock() s.mutex.Lock()
s.pc = pc s.pc = pc
s.pcMutex.Unlock() s.mutex.Unlock()
return nil return nil
} }
@ -542,19 +536,21 @@ func (s *webRTCSession) apiReaderDescribe() pathAPISourceOrReader {
} }
func (s *webRTCSession) apiItem() *apiWebRTCSession { func (s *webRTCSession) apiItem() *apiWebRTCSession {
s.mutex.RLock()
defer s.mutex.RUnlock()
peerConnectionEstablished := false peerConnectionEstablished := false
localCandidate := "" localCandidate := ""
remoteCandidate := "" remoteCandidate := ""
bytesReceived := uint64(0) bytesReceived := uint64(0)
bytesSent := uint64(0) bytesSent := uint64(0)
pc := s.safePC() if s.pc != nil {
if pc != nil {
peerConnectionEstablished = true peerConnectionEstablished = true
localCandidate = pc.localCandidate() localCandidate = s.pc.localCandidate()
remoteCandidate = pc.remoteCandidate() remoteCandidate = s.pc.remoteCandidate()
bytesReceived = pc.bytesReceived() bytesReceived = s.pc.bytesReceived()
bytesSent = pc.bytesSent() bytesSent = s.pc.bytesSent()
} }
return &apiWebRTCSession{ return &apiWebRTCSession{
@ -570,6 +566,7 @@ func (s *webRTCSession) apiItem() *apiWebRTCSession {
} }
return "read" return "read"
}(), }(),
Path: s.req.pathName,
BytesReceived: bytesReceived, BytesReceived: bytesReceived,
BytesSent: bytesSent, BytesSent: bytesSent,
} }

Loading…
Cancel
Save