Browse Source

fix onDemandCmd when path is 'all'

pull/80/head
aler9 5 years ago
parent
commit
2fe7004bc9
  1. 1
      Makefile
  2. 47
      main.go
  3. 251
      path.go

1
Makefile

@ -76,6 +76,7 @@ paths: @@ -76,6 +76,7 @@ paths:
# runOnPublish: ffmpeg -i rtsp://localhost:8554/$$RTSP_SERVER_PATH -c copy -f mpegts myfile_$$RTSP_SERVER_PATH.ts
# readUser: test
# readPass: tast
runOnDemand: ffmpeg -re -stream_loop -1 -i test-images/ffmpeg/emptyvideo.ts -c copy -f rtsp rtsp://localhost:8554/$$RTSP_SERVER_PATH
# proxied:
# source: rtsp://192.168.2.198:8554/stream

47
main.go

@ -15,6 +15,10 @@ import ( @@ -15,6 +15,10 @@ import (
var Version = "v0.0.0"
const (
checkPathPeriod = 5 * time.Second
)
type logDestination int
const (
@ -260,6 +264,7 @@ func (p *program) log(format string, args ...interface{}) { @@ -260,6 +264,7 @@ func (p *program) log(format string, args ...interface{}) {
if _, ok := p.conf.logDestinationsParsed[logDestinationStdout]; ok {
log.Println(line)
}
if _, ok := p.conf.logDestinationsParsed[logDestinationFile]; ok {
p.logFile.WriteString(line + "\n")
}
@ -288,7 +293,7 @@ func (p *program) run() { @@ -288,7 +293,7 @@ func (p *program) run() {
p.onInit()
}
checkPathsTicker := time.NewTicker(5 * time.Second)
checkPathsTicker := time.NewTicker(checkPathPeriod)
defer checkPathsTicker.Stop()
outer:
@ -319,12 +324,7 @@ outer: @@ -319,12 +324,7 @@ outer:
if evt.client.pathName != "" {
if path, ok := p.paths[evt.client.pathName]; ok {
if path.publisher == evt.client {
path.publisherRemove()
if !path.permanent {
path.onClose()
delete(p.paths, evt.client.pathName)
}
path.onPublisherRemove()
}
}
}
@ -333,31 +333,26 @@ outer: @@ -333,31 +333,26 @@ outer:
close(evt.done)
case programEventClientDescribe:
path, ok := p.paths[evt.path]
if !ok {
evt.client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", evt.path)}
continue
// create path if not exist
if _, ok := p.paths[evt.path]; !ok {
p.paths[evt.path] = newPath(p, evt.path, p.findConfForPathName(evt.path), false)
}
path.describe(evt.client)
p.paths[evt.path].onDescribe(evt.client)
case programEventClientAnnounce:
if path, ok := p.paths[evt.path]; ok {
// create path if not exist
if path, ok := p.paths[evt.path]; !ok {
p.paths[evt.path] = newPath(p, evt.path, p.findConfForPathName(evt.path), false)
} else {
if path.publisher != nil {
evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path)
continue
}
} else {
p.paths[evt.path] = newPath(p, evt.path, p.findConfForPathName(evt.path), false)
}
p.paths[evt.path].publisher = evt.client
p.paths[evt.path].publisherSdpText = evt.sdpText
p.paths[evt.path].publisherSdpParsed = evt.sdpParsed
evt.client.pathName = evt.path
evt.client.state = clientStateAnnounce
p.paths[evt.path].onPublisherNew(evt.client, evt.sdpText, evt.sdpParsed)
evt.res <- nil
case programEventClientSetupPlay:
@ -426,7 +421,7 @@ outer: @@ -426,7 +421,7 @@ outer:
}
}
p.paths[evt.client.pathName].publisherSetReady()
p.paths[evt.client.pathName].onPublisherSetReady()
close(evt.done)
case programEventClientRecordStop:
@ -441,7 +436,7 @@ outer: @@ -441,7 +436,7 @@ outer:
delete(p.udpClientsByAddr, key)
}
}
p.paths[evt.client.pathName].publisherSetNotReady()
p.paths[evt.client.pathName].onPublisherSetNotReady()
close(evt.done)
case programEventClientFrameUdp:
@ -463,11 +458,11 @@ outer: @@ -463,11 +458,11 @@ outer:
case programEventSourceReady:
evt.source.log("ready")
p.paths[evt.source.pathName].publisherSetReady()
p.paths[evt.source.pathName].onPublisherSetReady()
case programEventSourceNotReady:
evt.source.log("not ready")
p.paths[evt.source.pathName].publisherSetNotReady()
p.paths[evt.source.pathName].onPublisherSetNotReady()
case programEventSourceFrame:
p.forwardFrame(evt.source.pathName, evt.trackId, evt.streamType, evt.buf)

251
path.go

@ -9,25 +9,31 @@ import ( @@ -9,25 +9,31 @@ import (
"github.com/aler9/sdp/v3"
)
const (
describeTimeout = 5 * time.Second
sourceStopAfterDescribeSecs = 10 * time.Second
onDemandCmdStopAfterDescribeSecs = 10 * time.Second
)
// a publisher is either a client or a source
type publisher interface {
isPublisher()
}
type path struct {
p *program
name string
confp *confPath
permanent bool
source *source
publisher publisher
publisherReady bool
publisherSdpText []byte
publisherSdpParsed *sdp.SessionDescription
lastRequested time.Time
lastActivation time.Time
onInitCmd *exec.Cmd
onDemandCmd *exec.Cmd
p *program
name string
confp *confPath
permanent bool
source *source
publisher publisher
publisherReady bool
publisherSdpText []byte
publisherSdpParsed *sdp.SessionDescription
lastDescribeReq time.Time
lastDescribeActivation time.Time
onInitCmd *exec.Cmd
onDemandCmd *exec.Cmd
}
func newPath(p *program, name string, confp *confPath, permanent bool) *path {
@ -87,19 +93,37 @@ func (pa *path) onClose() { @@ -87,19 +93,37 @@ func (pa *path) onClose() {
}
}
func (pa *path) onCheck() {
hasClientsWaitingDescribe := func() bool {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription && c.pathName == pa.name {
return true
}
func (pa *path) hasClients() bool {
for c := range pa.p.clients {
if c.pathName == pa.name {
return true
}
}
return false
}
func (pa *path) hasClientsWaitingDescribe() bool {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription && c.pathName == pa.name {
return true
}
}
return false
}
func (pa *path) hasClientReaders() bool {
for c := range pa.p.clients {
if c.pathName == pa.name && c != pa.publisher {
return true
}
return false
}()
}
return false
}
func (pa *path) onCheck() {
// reply to DESCRIBE requests if they are in timeout
if hasClientsWaitingDescribe &&
time.Since(pa.lastActivation) >= 5*time.Second {
if pa.hasClientsWaitingDescribe() &&
time.Since(pa.lastDescribeActivation) >= describeTimeout {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathName == pa.name {
@ -108,67 +132,89 @@ func (pa *path) onCheck() { @@ -108,67 +132,89 @@ func (pa *path) onCheck() {
c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
}
}
}
// perform actions below in next run
return
// stop on demand source if needed
if pa.source != nil &&
pa.confp.SourceOnDemand &&
pa.source.state == sourceStateRunning &&
!pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs {
pa.source.log("stopping since we're not requested anymore")
pa.source.state = sourceStateStopped
pa.source.events <- sourceEventApplyState{pa.source.state}
}
if source, ok := pa.publisher.(*source); ok {
// stop on demand source if needed
if pa.confp.SourceOnDemand &&
source.state == sourceStateRunning &&
time.Since(pa.lastRequested) >= 10*time.Second {
// stop on demand command if needed
if pa.onDemandCmd != nil &&
!pa.hasClientReaders() &&
time.Since(pa.lastDescribeReq) >= onDemandCmdStopAfterDescribeSecs {
pa.p.log("stopping on demand command (not requested anymore)")
pa.onDemandCmd.Process.Signal(os.Interrupt)
pa.onDemandCmd.Wait()
pa.onDemandCmd = nil
}
hasClients := func() bool {
for c := range pa.p.clients {
if c.pathName == pa.name {
return true
}
}
return false
}()
if !hasClients {
source.log("stopping since we're not requested anymore")
source.state = sourceStateStopped
source.events <- sourceEventApplyState{source.state}
}
// remove non-permanent paths
if !pa.permanent &&
pa.publisher == nil &&
!pa.hasClients() {
pa.onClose()
delete(pa.p.paths, pa.name)
}
}
func (pa *path) onPublisherNew(client *client, sdpText []byte, sdpParsed *sdp.SessionDescription) {
pa.publisher = client
pa.publisherSdpText = sdpText
pa.publisherSdpParsed = sdpParsed
client.pathName = pa.name
client.state = clientStateAnnounce
}
func (pa *path) onPublisherRemove() {
pa.publisher = nil
}
func (pa *path) onPublisherSetReady() {
pa.publisherReady = true
// reply to all clients that are waiting for a description
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathName == pa.name {
c.pathName = ""
c.state = clientStateInitial
c.describeRes <- describeRes{pa.publisherSdpText, nil}
}
}
}
} else {
// stop on demand command if needed
if pa.onDemandCmd != nil &&
time.Since(pa.lastRequested) >= 10*time.Second {
hasClientReaders := func() bool {
for c := range pa.p.clients {
if c.pathName == pa.name && c != pa.publisher {
return true
}
}
return false
}()
if !hasClientReaders {
pa.p.log("stopping on demand command (not requested anymore)")
pa.onDemandCmd.Process.Signal(os.Interrupt)
pa.onDemandCmd.Wait()
pa.onDemandCmd = nil
}
func (pa *path) onPublisherSetNotReady() {
pa.publisherReady = false
// close all clients that are reading
for c := range pa.p.clients {
if c.state != clientStateWaitingDescription &&
c != pa.publisher &&
c.pathName == pa.name {
c.conn.NetConn().Close()
}
}
}
func (pa *path) describe(client *client) {
pa.lastRequested = time.Now()
func (pa *path) onDescribe(client *client) {
pa.lastDescribeReq = time.Now()
// publisher not found
if pa.publisher == nil {
// on demand command is available: put the client on hold
if pa.confp.RunOnDemand != "" {
// start on demand command if needed
if pa.onDemandCmd == nil {
if pa.onDemandCmd == nil { // start if needed
pa.p.log("starting on demand command")
pa.lastActivation = time.Now()
pa.lastDescribeActivation = time.Now()
pa.onDemandCmd = exec.Command("/bin/sh", "-c", pa.confp.RunOnDemand)
pa.onDemandCmd.Env = append(os.Environ(),
"RTSP_SERVER_PATH="+pa.name,
@ -183,69 +229,26 @@ func (pa *path) describe(client *client) { @@ -183,69 +229,26 @@ func (pa *path) describe(client *client) {
client.pathName = pa.name
client.state = clientStateWaitingDescription
return
}
// no on-demand: reply with 404
client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)}
return
}
// no on-demand: reply with 404
} else {
client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)}
}
// publisher was found but is not ready: put the client on hold
if !pa.publisherReady {
// start source if needed
if source, ok := pa.publisher.(*source); ok && source.state == sourceStateStopped {
source.log("starting on demand")
pa.lastActivation = time.Now()
source.state = sourceStateRunning
source.events <- sourceEventApplyState{source.state}
// publisher was found but is not ready: put the client on hold
} else if !pa.publisherReady {
if pa.source != nil && pa.source.state == sourceStateStopped { // start if needed
pa.source.log("starting on demand")
pa.lastDescribeActivation = time.Now()
pa.source.state = sourceStateRunning
pa.source.events <- sourceEventApplyState{pa.source.state}
}
client.pathName = pa.name
client.state = clientStateWaitingDescription
return
}
// publisher was found and is ready
client.describeRes <- describeRes{pa.publisherSdpText, nil}
}
func (pa *path) publisherRemove() {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathName == pa.name {
c.pathName = ""
c.state = clientStateInitial
c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' is not available anymore", pa.name)}
}
}
pa.publisher = nil
}
func (pa *path) publisherSetReady() {
pa.publisherReady = true
// reply to all clients that are waiting for a description
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathName == pa.name {
c.pathName = ""
c.state = clientStateInitial
c.describeRes <- describeRes{pa.publisherSdpText, nil}
}
}
}
func (pa *path) publisherSetNotReady() {
pa.publisherReady = false
// close all clients that are reading
for c := range pa.p.clients {
if c.state != clientStateWaitingDescription &&
c != pa.publisher &&
c.pathName == pa.name {
c.conn.NetConn().Close()
}
// publisher was found and is ready
} else {
client.describeRes <- describeRes{pa.publisherSdpText, nil}
}
}

Loading…
Cancel
Save