Browse Source

api: add 'readyTime' to paths (#2049) (#2082)

pull/2083/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
0d18076201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      apidocs/openapi.yaml
  2. 4
      internal/core/api_defs.go
  3. 26
      internal/core/api_test.go
  4. 32
      internal/core/hls_manager.go
  5. 38
      internal/core/path.go
  6. 50
      internal/core/path_manager.go

6
apidocs/openapi.yaml

@ -328,6 +328,12 @@ components: @@ -328,6 +328,12 @@ components:
$ref: '#/components/schemas/PathSourceOrReader'
sourceReady:
type: boolean
description: deprecated, replaced by 'ready'
ready:
type: boolean
readyTime:
type: string
nullable: true
tracks:
type: array
items:

4
internal/core/api_defs.go

@ -13,7 +13,9 @@ type apiPath struct { @@ -13,7 +13,9 @@ type apiPath struct {
ConfName string `json:"confName"`
Conf *conf.PathConf `json:"conf"`
Source interface{} `json:"source"`
SourceReady bool `json:"sourceReady"`
SourceReady bool `json:"sourceReady"` // Deprecated: renamed to Ready
Ready bool `json:"ready"`
ReadyTime *time.Time `json:"readyTime"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
Readers []interface{} `json:"readers"`

26
internal/core/api_test.go

@ -217,7 +217,7 @@ func TestAPIPathsList(t *testing.T) { @@ -217,7 +217,7 @@ func TestAPIPathsList(t *testing.T) {
type path struct {
Name string `json:"name"`
Source pathSource `json:"source"`
SourceReady bool `json:"sourceReady"`
Ready bool `json:"ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
}
@ -280,7 +280,7 @@ func TestAPIPathsList(t *testing.T) { @@ -280,7 +280,7 @@ func TestAPIPathsList(t *testing.T) {
Source: pathSource{
Type: "rtspSession",
},
SourceReady: true,
Ready: true,
Tracks: []string{"H264", "MPEG-4 Audio"},
BytesReceived: 16,
}},
@ -343,8 +343,8 @@ func TestAPIPathsList(t *testing.T) { @@ -343,8 +343,8 @@ func TestAPIPathsList(t *testing.T) {
Source: pathSource{
Type: "rtspsSession",
},
SourceReady: true,
Tracks: []string{"H264", "MPEG-4 Audio"},
Ready: true,
Tracks: []string{"H264", "MPEG-4 Audio"},
}},
}, out)
})
@ -370,8 +370,8 @@ func TestAPIPathsList(t *testing.T) { @@ -370,8 +370,8 @@ func TestAPIPathsList(t *testing.T) {
Source: pathSource{
Type: "rtspSource",
},
SourceReady: false,
Tracks: []string{},
Ready: false,
Tracks: []string{},
}},
}, out)
})
@ -397,8 +397,8 @@ func TestAPIPathsList(t *testing.T) { @@ -397,8 +397,8 @@ func TestAPIPathsList(t *testing.T) {
Source: pathSource{
Type: "rtmpSource",
},
SourceReady: false,
Tracks: []string{},
Ready: false,
Tracks: []string{},
}},
}, out)
})
@ -424,8 +424,8 @@ func TestAPIPathsList(t *testing.T) { @@ -424,8 +424,8 @@ func TestAPIPathsList(t *testing.T) {
Source: pathSource{
Type: "hlsSource",
},
SourceReady: false,
Tracks: []string{},
Ready: false,
Tracks: []string{},
}},
}, out)
})
@ -449,7 +449,7 @@ func TestAPIPathsGet(t *testing.T) { @@ -449,7 +449,7 @@ func TestAPIPathsGet(t *testing.T) {
type path struct {
Name string `json:"name"`
Source pathSource `json:"source"`
SourceReady bool `json:"sourceReady"`
Ready bool `json:"Ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
}
@ -478,8 +478,8 @@ func TestAPIPathsGet(t *testing.T) { @@ -478,8 +478,8 @@ func TestAPIPathsGet(t *testing.T) {
Source: pathSource{
Type: "rtspSession",
},
SourceReady: true,
Tracks: []string{"H264"},
Ready: true,
Tracks: []string{"H264"},
}, out)
} else {
res, err := hc.Get("http://localhost:9997/v2/paths/get/" + pathName)

32
internal/core/hls_manager.go

@ -54,12 +54,12 @@ type hlsManager struct { @@ -54,12 +54,12 @@ type hlsManager struct {
muxers map[string]*hlsMuxer
// in
chPathSourceReady chan *path
chPathSourceNotReady chan *path
chHandleRequest chan hlsMuxerHandleRequestReq
chMuxerClose chan *hlsMuxer
chAPIMuxerList chan hlsManagerAPIMuxersListReq
chAPIMuxerGet chan hlsManagerAPIMuxersGetReq
chPathReady chan *path
chPathNotReady chan *path
chHandleRequest chan hlsMuxerHandleRequestReq
chMuxerClose chan *hlsMuxer
chAPIMuxerList chan hlsManagerAPIMuxersListReq
chAPIMuxerGet chan hlsManagerAPIMuxersGetReq
}
func newHLSManager(
@ -101,8 +101,8 @@ func newHLSManager( @@ -101,8 +101,8 @@ func newHLSManager(
ctx: ctx,
ctxCancel: ctxCancel,
muxers: make(map[string]*hlsMuxer),
chPathSourceReady: make(chan *path),
chPathSourceNotReady: make(chan *path),
chPathReady: make(chan *path),
chPathNotReady: make(chan *path),
chHandleRequest: make(chan hlsMuxerHandleRequestReq),
chMuxerClose: make(chan *hlsMuxer),
chAPIMuxerList: make(chan hlsManagerAPIMuxersListReq),
@ -157,14 +157,14 @@ func (m *hlsManager) run() { @@ -157,14 +157,14 @@ func (m *hlsManager) run() {
outer:
for {
select {
case pa := <-m.chPathSourceReady:
case pa := <-m.chPathReady:
if m.alwaysRemux && !pa.conf.SourceOnDemand {
if _, ok := m.muxers[pa.name]; !ok {
m.createMuxer(pa.name, "")
}
}
case pa := <-m.chPathSourceNotReady:
case pa := <-m.chPathNotReady:
c, ok := m.muxers[pa.name]
if ok && c.remoteAddr == "" { // created with "always remux"
c.close()
@ -258,18 +258,18 @@ func (m *hlsManager) muxerClose(c *hlsMuxer) { @@ -258,18 +258,18 @@ func (m *hlsManager) muxerClose(c *hlsMuxer) {
}
}
// pathSourceReady is called by pathManager.
func (m *hlsManager) pathSourceReady(pa *path) {
// pathReady is called by pathManager.
func (m *hlsManager) pathReady(pa *path) {
select {
case m.chPathSourceReady <- pa:
case m.chPathReady <- pa:
case <-m.ctx.Done():
}
}
// pathSourceNotReady is called by pathManager.
func (m *hlsManager) pathSourceNotReady(pa *path) {
// pathNotReady is called by pathManager.
func (m *hlsManager) pathNotReady(pa *path) {
select {
case m.chPathSourceNotReady <- pa:
case m.chPathNotReady <- pa:
case <-m.ctx.Done():
}
}

38
internal/core/path.go

@ -44,8 +44,8 @@ func (e pathErrNoOnePublishing) Error() string { @@ -44,8 +44,8 @@ func (e pathErrNoOnePublishing) Error() string {
type pathParent interface {
logger.Writer
pathSourceReady(*path)
pathSourceNotReady(*path)
pathReady(*path)
pathNotReady(*path)
onPathClose(*path)
}
@ -196,8 +196,9 @@ type path struct { @@ -196,8 +196,9 @@ type path struct {
ctxCancel func()
confMutex sync.RWMutex
source source
bytesReceived *uint64
stream *stream
readyTime time.Time
bytesReceived *uint64
readers map[reader]struct{}
describeRequestsOnHold []pathDescribeReq
readerAddRequestsOnHold []pathReaderAddReq
@ -359,7 +360,7 @@ func (pa *path) run() { @@ -359,7 +360,7 @@ func (pa *path) run() {
}
case <-pa.onDemandStaticSourceCloseTimer.C:
pa.sourceSetNotReady()
pa.setNotReady()
pa.onDemandStaticSourceStop()
if pa.shouldClose() {
@ -400,7 +401,7 @@ func (pa *path) run() { @@ -400,7 +401,7 @@ func (pa *path) run() {
pa.confMutex.Unlock()
case req := <-pa.chSourceStaticSetReady:
err := pa.sourceSetReady(req.medias, req.generateRTPPackets)
err := pa.setReady(req.medias, req.generateRTPPackets)
if err != nil {
req.res <- pathSourceStaticSetReadyRes{err: err}
} else {
@ -427,7 +428,7 @@ func (pa *path) run() { @@ -427,7 +428,7 @@ func (pa *path) run() {
}
case req := <-pa.chSourceStaticSetNotReady:
pa.sourceSetNotReady()
pa.setNotReady()
// send response before calling onDemandStaticSourceStop()
// in order to avoid a deadlock due to sourceStatic.stop()
@ -511,7 +512,7 @@ func (pa *path) run() { @@ -511,7 +512,7 @@ func (pa *path) run() {
}
if pa.stream != nil {
pa.sourceSetNotReady()
pa.setNotReady()
}
if pa.source != nil {
@ -626,7 +627,7 @@ func (pa *path) onDemandPublisherStop() { @@ -626,7 +627,7 @@ func (pa *path) onDemandPublisherStop() {
}
}
func (pa *path) sourceSetReady(medias media.Medias, allocateEncoder bool) error {
func (pa *path) setReady(medias media.Medias, allocateEncoder bool) error {
stream, err := newStream(
pa.udpMaxPayloadSize,
medias,
@ -639,6 +640,7 @@ func (pa *path) sourceSetReady(medias media.Medias, allocateEncoder bool) error @@ -639,6 +640,7 @@ func (pa *path) sourceSetReady(medias media.Medias, allocateEncoder bool) error
}
pa.stream = stream
pa.readyTime = time.Now()
if pa.conf.RunOnReady != "" {
pa.Log(logger.Info, "runOnReady command started")
@ -652,13 +654,13 @@ func (pa *path) sourceSetReady(medias media.Medias, allocateEncoder bool) error @@ -652,13 +654,13 @@ func (pa *path) sourceSetReady(medias media.Medias, allocateEncoder bool) error
})
}
pa.parent.pathSourceReady(pa)
pa.parent.pathReady(pa)
return nil
}
func (pa *path) sourceSetNotReady() {
pa.parent.pathSourceNotReady(pa)
func (pa *path) setNotReady() {
pa.parent.pathNotReady(pa)
for r := range pa.readers {
pa.doReaderRemove(r)
@ -683,7 +685,7 @@ func (pa *path) doReaderRemove(r reader) { @@ -683,7 +685,7 @@ func (pa *path) doReaderRemove(r reader) {
func (pa *path) doPublisherRemove() {
if pa.stream != nil {
pa.sourceSetNotReady()
pa.setNotReady()
}
pa.source = nil
@ -777,7 +779,7 @@ func (pa *path) handlePublisherStart(req pathPublisherStartReq) { @@ -777,7 +779,7 @@ func (pa *path) handlePublisherStart(req pathPublisherStartReq) {
return
}
err := pa.sourceSetReady(req.medias, req.generateRTPPackets)
err := pa.setReady(req.medias, req.generateRTPPackets)
if err != nil {
req.res <- pathPublisherRecordRes{err: err}
return
@ -807,7 +809,7 @@ func (pa *path) handlePublisherStart(req pathPublisherStartReq) { @@ -807,7 +809,7 @@ func (pa *path) handlePublisherStart(req pathPublisherStartReq) {
func (pa *path) handlePublisherStop(req pathPublisherStopReq) {
if req.author == pa.source && pa.stream != nil {
pa.sourceSetNotReady()
pa.setNotReady()
}
close(req.res)
}
@ -892,6 +894,14 @@ func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) { @@ -892,6 +894,14 @@ func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) {
return pa.source.apiSourceDescribe()
}(),
SourceReady: pa.stream != nil,
Ready: pa.stream != nil,
ReadyTime: func() *time.Time {
if pa.stream == nil {
return nil
}
v := pa.readyTime
return &v
}(),
Tracks: func() []string {
if pa.stream == nil {
return []string{}

50
internal/core/path_manager.go

@ -55,8 +55,8 @@ func getConfForPath(pathConfs map[string]*conf.PathConf, name string) (string, * @@ -55,8 +55,8 @@ func getConfForPath(pathConfs map[string]*conf.PathConf, name string) (string, *
}
type pathManagerHLSManager interface {
pathSourceReady(*path)
pathSourceNotReady(*path)
pathReady(*path)
pathNotReady(*path)
}
type pathManagerParent interface {
@ -84,17 +84,17 @@ type pathManager struct { @@ -84,17 +84,17 @@ type pathManager struct {
pathsByConf map[string]map[*path]struct{}
// in
chConfReload chan map[string]*conf.PathConf
chPathClose chan *path
chPathSourceReady chan *path
chPathSourceNotReady chan *path
chGetConfForPath chan pathGetConfForPathReq
chDescribe chan pathDescribeReq
chReaderAdd chan pathReaderAddReq
chPublisherAdd chan pathPublisherAddReq
chHLSManagerSet chan pathManagerHLSManager
chAPIPathsList chan pathAPIPathsListReq
chAPIPathsGet chan pathAPIPathsGetReq
chConfReload chan map[string]*conf.PathConf
chPathClose chan *path
chPathReady chan *path
chPathNotReady chan *path
chGetConfForPath chan pathGetConfForPathReq
chDescribe chan pathDescribeReq
chReaderAdd chan pathReaderAddReq
chPublisherAdd chan pathPublisherAddReq
chHLSManagerSet chan pathManagerHLSManager
chAPIPathsList chan pathAPIPathsListReq
chAPIPathsGet chan pathAPIPathsGetReq
}
func newPathManager(
@ -130,8 +130,8 @@ func newPathManager( @@ -130,8 +130,8 @@ func newPathManager(
pathsByConf: make(map[string]map[*path]struct{}),
chConfReload: make(chan map[string]*conf.PathConf),
chPathClose: make(chan *path),
chPathSourceReady: make(chan *path),
chPathSourceNotReady: make(chan *path),
chPathReady: make(chan *path),
chPathNotReady: make(chan *path),
chGetConfForPath: make(chan pathGetConfForPathReq),
chDescribe: make(chan pathDescribeReq),
chReaderAdd: make(chan pathReaderAddReq),
@ -218,14 +218,14 @@ outer: @@ -218,14 +218,14 @@ outer:
}
pm.removePath(pa)
case pa := <-pm.chPathSourceReady:
case pa := <-pm.chPathReady:
if pm.hlsManager != nil {
pm.hlsManager.pathSourceReady(pa)
pm.hlsManager.pathReady(pa)
}
case pa := <-pm.chPathSourceNotReady:
case pa := <-pm.chPathNotReady:
if pm.hlsManager != nil {
pm.hlsManager.pathSourceNotReady(pa)
pm.hlsManager.pathNotReady(pa)
}
case req := <-pm.chGetConfForPath:
@ -386,19 +386,19 @@ func (pm *pathManager) confReload(pathConfs map[string]*conf.PathConf) { @@ -386,19 +386,19 @@ func (pm *pathManager) confReload(pathConfs map[string]*conf.PathConf) {
}
}
// pathSourceReady is called by path.
func (pm *pathManager) pathSourceReady(pa *path) {
// pathReady is called by path.
func (pm *pathManager) pathReady(pa *path) {
select {
case pm.chPathSourceReady <- pa:
case pm.chPathReady <- pa:
case <-pm.ctx.Done():
case <-pa.ctx.Done(): // in case pathManager is blocked by path.wait()
}
}
// pathSourceNotReady is called by path.
func (pm *pathManager) pathSourceNotReady(pa *path) {
// pathNotReady is called by path.
func (pm *pathManager) pathNotReady(pa *path) {
select {
case pm.chPathSourceNotReady <- pa:
case pm.chPathNotReady <- pa:
case <-pm.ctx.Done():
case <-pa.ctx.Done(): // in case pathManager is blocked by path.wait()
}

Loading…
Cancel
Save