Browse Source

api: add transport to RTSP sessions (#2151)

pull/2154/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
dd91abae9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      apidocs/openapi.yaml
  2. 108
      internal/core/api_defs.go
  3. 6
      internal/core/metrics.go
  4. 81
      internal/core/path.go
  5. 12
      internal/core/rtmp_conn.go
  6. 29
      internal/core/rtsp_session.go
  7. 12
      internal/core/srt_conn.go
  8. 10
      internal/core/webrtc_session.go

3
apidocs/openapi.yaml

@ -482,6 +482,9 @@ components:
enum: [idle, read, publish] enum: [idle, read, publish]
path: path:
type: string type: string
transport:
type: string
nullable: true
bytesReceived: bytesReceived:
type: integer type: integer
format: int64 format: int64

108
internal/core/api_defs.go

@ -54,14 +54,22 @@ type apiRTSPConnsList struct {
Items []*apiRTSPConn `json:"items"` Items []*apiRTSPConn `json:"items"`
} }
type apiRTMPConnState string
const (
apiRTMPConnStateIdle apiRTMPConnState = "idle"
apiRTMPConnStateRead apiRTMPConnState = "read"
apiRTMPConnStatePublish apiRTMPConnState = "publish"
)
type apiRTMPConn struct { type apiRTMPConn struct {
ID uuid.UUID `json:"id"` ID uuid.UUID `json:"id"`
Created time.Time `json:"created"` Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"` RemoteAddr string `json:"remoteAddr"`
State string `json:"state"` State apiRTMPConnState `json:"state"`
Path string `json:"path"` Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"` BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"` BytesSent uint64 `json:"bytesSent"`
} }
type apiRTMPConnsList struct { type apiRTMPConnsList struct {
@ -70,14 +78,23 @@ type apiRTMPConnsList struct {
Items []*apiRTMPConn `json:"items"` Items []*apiRTMPConn `json:"items"`
} }
type apiRTSPSessionState string
const (
apiRTSPSessionStateIdle apiRTSPSessionState = "idle"
apiRTSPSessionStateRead apiRTSPSessionState = "read"
apiRTSPSessionStatePublish apiRTSPSessionState = "publish"
)
type apiRTSPSession struct { type apiRTSPSession struct {
ID uuid.UUID `json:"id"` ID uuid.UUID `json:"id"`
Created time.Time `json:"created"` Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"` RemoteAddr string `json:"remoteAddr"`
State string `json:"state"` State apiRTSPSessionState `json:"state"`
Path string `json:"path"` Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"` Transport *string `json:"transport"`
BytesSent uint64 `json:"bytesSent"` BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
} }
type apiRTSPSessionsList struct { type apiRTSPSessionsList struct {
@ -86,33 +103,22 @@ type apiRTSPSessionsList struct {
Items []*apiRTSPSession `json:"items"` Items []*apiRTSPSession `json:"items"`
} }
type apiWebRTCSession struct { type apiSRTConnState string
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
State string `json:"state"`
Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type apiWebRTCSessionsList struct { const (
ItemCount int `json:"itemCount"` apiSRTConnStateIdle apiSRTConnState = "idle"
PageCount int `json:"pageCount"` apiSRTConnStateRead apiSRTConnState = "read"
Items []*apiWebRTCSession `json:"items"` apiSRTConnStatePublish apiSRTConnState = "publish"
} )
type apiSRTConn struct { type apiSRTConn struct {
ID uuid.UUID `json:"id"` ID uuid.UUID `json:"id"`
Created time.Time `json:"created"` Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"` RemoteAddr string `json:"remoteAddr"`
State string `json:"state"` State apiSRTConnState `json:"state"`
Path string `json:"path"` Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"` BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"` BytesSent uint64 `json:"bytesSent"`
} }
type apiSRTConnsList struct { type apiSRTConnsList struct {
@ -120,3 +126,29 @@ type apiSRTConnsList struct {
PageCount int `json:"pageCount"` PageCount int `json:"pageCount"`
Items []*apiSRTConn `json:"items"` Items []*apiSRTConn `json:"items"`
} }
type apiWebRTCSessionState string
const (
apiWebRTCSessionStateRead apiWebRTCSessionState = "read"
apiWebRTCSessionStatePublish apiWebRTCSessionState = "publish"
)
type apiWebRTCSession struct {
ID uuid.UUID `json:"id"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
PeerConnectionEstablished bool `json:"peerConnectionEstablished"`
LocalCandidate string `json:"localCandidate"`
RemoteCandidate string `json:"remoteCandidate"`
State apiWebRTCSessionState `json:"state"`
Path string `json:"path"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}
type apiWebRTCSessionsList struct {
ItemCount int `json:"itemCount"`
PageCount int `json:"pageCount"`
Items []*apiWebRTCSession `json:"items"`
}

6
internal/core/metrics.go

@ -135,7 +135,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
data, err := m.rtspServer.apiSessionsList() data, err := m.rtspServer.apiSessionsList()
if err == nil && len(data.Items) != 0 { if err == nil && len(data.Items) != 0 {
for _, i := range data.Items { for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}" tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}"
out += metric("rtsp_sessions", tags, 1) out += metric("rtsp_sessions", tags, 1)
out += metric("rtsp_sessions_bytes_received", tags, int64(i.BytesReceived)) out += metric("rtsp_sessions_bytes_received", tags, int64(i.BytesReceived))
out += metric("rtsp_sessions_bytes_sent", tags, int64(i.BytesSent)) out += metric("rtsp_sessions_bytes_sent", tags, int64(i.BytesSent))
@ -169,7 +169,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
data, err := m.rtspsServer.apiSessionsList() data, err := m.rtspsServer.apiSessionsList()
if err == nil && len(data.Items) != 0 { if err == nil && len(data.Items) != 0 {
for _, i := range data.Items { for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}" tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}"
out += metric("rtsps_sessions", tags, 1) out += metric("rtsps_sessions", tags, 1)
out += metric("rtsps_sessions_bytes_received", tags, int64(i.BytesReceived)) out += metric("rtsps_sessions_bytes_received", tags, int64(i.BytesReceived))
out += metric("rtsps_sessions_bytes_sent", tags, int64(i.BytesSent)) out += metric("rtsps_sessions_bytes_sent", tags, int64(i.BytesSent))
@ -186,7 +186,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
data, err := m.rtmpServer.apiConnsList() data, err := m.rtmpServer.apiConnsList()
if err == nil && len(data.Items) != 0 { if err == nil && len(data.Items) != 0 {
for _, i := range data.Items { for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}" tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}"
out += metric("rtmp_conns", tags, 1) out += metric("rtmp_conns", tags, 1)
out += metric("rtmp_conns_bytes_received", tags, int64(i.BytesReceived)) out += metric("rtmp_conns_bytes_received", tags, int64(i.BytesReceived))
out += metric("rtmp_conns_bytes_sent", tags, int64(i.BytesSent)) out += metric("rtmp_conns_bytes_sent", tags, int64(i.BytesSent))

81
internal/core/path.go

@ -438,42 +438,10 @@ func (pa *path) runInner() error {
pa.confMutex.Unlock() pa.confMutex.Unlock()
case req := <-pa.chSourceStaticSetReady: case req := <-pa.chSourceStaticSetReady:
err := pa.setReady(req.medias, req.generateRTPPackets) pa.handleSourceStaticSetReady(req)
if err != nil {
req.res <- pathSourceStaticSetReadyRes{err: err}
} else {
if pa.conf.HasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
pa.onDemandStaticSourceScheduleClose()
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold {
pa.handleAddReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
}
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
}
case req := <-pa.chSourceStaticSetNotReady: case req := <-pa.chSourceStaticSetNotReady:
pa.setNotReady() pa.handleSourceStaticSetNotReady(req)
// send response before calling onDemandStaticSourceStop()
// in order to avoid a deadlock due to sourceStatic.stop()
close(req.res)
if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
pa.onDemandStaticSourceStop()
}
if pa.shouldClose() { if pa.shouldClose() {
return fmt.Errorf("not in use") return fmt.Errorf("not in use")
@ -685,6 +653,47 @@ func (pa *path) doPublisherRemove() {
pa.source = nil pa.source = nil
} }
func (pa *path) handleSourceStaticSetReady(req pathSourceStaticSetReadyReq) {
err := pa.setReady(req.medias, req.generateRTPPackets)
if err != nil {
req.res <- pathSourceStaticSetReadyRes{err: err}
return
}
if pa.conf.HasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
pa.onDemandStaticSourceScheduleClose()
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold {
pa.handleAddReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
}
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
}
func (pa *path) handleSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
pa.setNotReady()
// send response before calling onDemandStaticSourceStop()
// in order to avoid a deadlock due to sourceStatic.stop()
close(req.res)
if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
pa.onDemandStaticSourceStop()
}
}
func (pa *path) handleDescribe(req pathDescribeReq) { func (pa *path) handleDescribe(req pathDescribeReq) {
if _, ok := pa.source.(*sourceRedirect); ok { if _, ok := pa.source.(*sourceRedirect); ok {
req.res <- pathDescribeRes{ req.res <- pathDescribeRes{
@ -779,6 +788,10 @@ func (pa *path) handleStartPublisher(req pathStartPublisherReq) {
return return
} }
req.author.Log(logger.Info, "is publishing to path '%s', %s",
pa.name,
sourceMediaInfo(req.medias))
if pa.conf.HasOnDemandPublisher() { if pa.conf.HasOnDemandPublisher() {
pa.onDemandPublisherReadyTimer.Stop() pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = newEmptyTimer() pa.onDemandPublisherReadyTimer = newEmptyTimer()

12
internal/core/rtmp_conn.go

@ -696,10 +696,6 @@ func (c *rtmpConn) runPublish(conn *rtmp.Conn, u *url.URL) error {
return rres.err return rres.err
} }
c.Log(logger.Info, "is publishing to path '%s', %s",
res.path.name,
sourceMediaInfo(medias))
stream = rres.stream stream = rres.stream
// disable write deadline to allow outgoing acknowledges // disable write deadline to allow outgoing acknowledges
@ -748,16 +744,16 @@ func (c *rtmpConn) apiItem() *apiRTMPConn {
ID: c.uuid, ID: c.uuid,
Created: c.created, Created: c.created,
RemoteAddr: c.remoteAddr().String(), RemoteAddr: c.remoteAddr().String(),
State: func() string { State: func() apiRTMPConnState {
switch c.state { switch c.state {
case rtmpConnStateRead: case rtmpConnStateRead:
return "read" return apiRTMPConnStateRead
case rtmpConnStatePublish: case rtmpConnStatePublish:
return "publish" return apiRTMPConnStatePublish
default: default:
return "idle" return apiRTMPConnStateIdle
} }
}(), }(),
Path: c.pathName, Path: c.pathName,

29
internal/core/rtsp_session.go

@ -45,6 +45,7 @@ type rtspSession struct {
onReadCmd *externalcmd.Cmd // read onReadCmd *externalcmd.Cmd // read
mutex sync.Mutex mutex sync.Mutex
state gortsplib.ServerSessionState state gortsplib.ServerSessionState
transport *gortsplib.Transport
pathName string pathName string
} }
@ -293,6 +294,7 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons
s.mutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStatePlay s.state = gortsplib.ServerSessionStatePlay
s.transport = s.session.SetuppedTransport()
s.mutex.Unlock() s.mutex.Unlock()
} }
@ -303,7 +305,7 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons
} }
// onRecord is called by rtspServer. // onRecord is called by rtspServer.
func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { func (s *rtspSession) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
res := s.path.startPublisher(pathStartPublisherReq{ res := s.path.startPublisher(pathStartPublisherReq{
author: s, author: s,
medias: s.session.AnnouncedMedias(), medias: s.session.AnnouncedMedias(),
@ -315,11 +317,6 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
}, res.err }, res.err
} }
s.Log(logger.Info, "is publishing to path '%s', with %s, %s",
s.path.name,
s.session.SetuppedTransport(),
sourceMediaInfo(s.session.AnnouncedMedias()))
s.stream = res.stream s.stream = res.stream
for _, medi := range s.session.AnnouncedMedias() { for _, medi := range s.session.AnnouncedMedias() {
@ -327,7 +324,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
cmedi := medi cmedi := medi
cforma := forma cforma := forma
ctx.Session.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) { s.session.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) {
res.stream.WriteRTPPacket(cmedi, cforma, pkt, time.Now()) res.stream.WriteRTPPacket(cmedi, cforma, pkt, time.Now())
}) })
} }
@ -335,6 +332,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
s.mutex.Lock() s.mutex.Lock()
s.state = gortsplib.ServerSessionStateRecord s.state = gortsplib.ServerSessionStateRecord
s.transport = s.session.SetuppedTransport()
s.mutex.Unlock() s.mutex.Unlock()
return &base.Response{ return &base.Response{
@ -404,19 +402,26 @@ func (s *rtspSession) apiItem() *apiRTSPSession {
ID: s.uuid, ID: s.uuid,
Created: s.created, Created: s.created,
RemoteAddr: s.remoteAddr().String(), RemoteAddr: s.remoteAddr().String(),
State: func() string { State: func() apiRTSPSessionState {
switch s.state { switch s.state {
case gortsplib.ServerSessionStatePrePlay, case gortsplib.ServerSessionStatePrePlay,
gortsplib.ServerSessionStatePlay: gortsplib.ServerSessionStatePlay:
return "read" return apiRTSPSessionStateRead
case gortsplib.ServerSessionStatePreRecord, case gortsplib.ServerSessionStatePreRecord,
gortsplib.ServerSessionStateRecord: gortsplib.ServerSessionStateRecord:
return "publish" return apiRTSPSessionStatePublish
}
return apiRTSPSessionStateIdle
}(),
Path: s.pathName,
Transport: func() *string {
if s.transport == nil {
return nil
} }
return "idle" v := s.transport.String()
return &v
}(), }(),
Path: s.pathName,
BytesReceived: s.session.BytesReceived(), BytesReceived: s.session.BytesReceived(),
BytesSent: s.session.BytesSent(), BytesSent: s.session.BytesSent(),
} }

12
internal/core/srt_conn.go

@ -376,10 +376,6 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error {
return rres.err return rres.err
} }
c.Log(logger.Info, "is publishing to path '%s', %s",
path.name,
sourceMediaInfo(medias))
stream = rres.stream stream = rres.stream
for { for {
@ -830,16 +826,16 @@ func (c *srtConn) apiItem() *apiSRTConn {
ID: c.uuid, ID: c.uuid,
Created: c.created, Created: c.created,
RemoteAddr: c.connReq.RemoteAddr().String(), RemoteAddr: c.connReq.RemoteAddr().String(),
State: func() string { State: func() apiSRTConnState {
switch c.state { switch c.state {
case srtConnStateRead: case srtConnStateRead:
return "read" return apiSRTConnStateRead
case srtConnStatePublish: case srtConnStatePublish:
return "publish" return apiSRTConnStatePublish
default: default:
return "idle" return apiSRTConnStateIdle
} }
}(), }(),
Path: c.pathName, Path: c.pathName,

10
internal/core/webrtc_session.go

@ -402,10 +402,6 @@ func (s *webRTCSession) runPublish() (int, error) {
return 0, rres.err return 0, rres.err
} }
s.Log(logger.Info, "is publishing to path '%s', %s",
res.path.name,
sourceMediaInfo(medias))
for _, track := range tracks { for _, track := range tracks {
track.start(rres.stream) track.start(rres.stream)
} }
@ -636,11 +632,11 @@ func (s *webRTCSession) apiItem() *apiWebRTCSession {
PeerConnectionEstablished: peerConnectionEstablished, PeerConnectionEstablished: peerConnectionEstablished,
LocalCandidate: localCandidate, LocalCandidate: localCandidate,
RemoteCandidate: remoteCandidate, RemoteCandidate: remoteCandidate,
State: func() string { State: func() apiWebRTCSessionState {
if s.req.publish { if s.req.publish {
return "publish" return apiWebRTCSessionStatePublish
} }
return "read" return apiWebRTCSessionStateRead
}(), }(),
Path: s.req.pathName, Path: s.req.pathName,
BytesReceived: bytesReceived, BytesReceived: bytesReceived,

Loading…
Cancel
Save