diff --git a/Makefile b/Makefile index 5d9781e4..4d7b251f 100644 --- a/Makefile +++ b/Makefile @@ -76,6 +76,7 @@ paths: # runOnPublish: ffmpeg -i rtsp://localhost:8554/$$RTSP_SERVER_PATH -c copy -f mpegts myfile_$$RTSP_SERVER_PATH.ts # readUser: test # readPass: tast + runOnDemand: ffmpeg -re -stream_loop -1 -i test-images/ffmpeg/emptyvideo.ts -c copy -f rtsp rtsp://localhost:8554/$$RTSP_SERVER_PATH # proxied: # source: rtsp://192.168.2.198:8554/stream diff --git a/main.go b/main.go index c730863a..eb7a246d 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,10 @@ import ( var Version = "v0.0.0" +const ( + checkPathPeriod = 5 * time.Second +) + type logDestination int const ( @@ -260,6 +264,7 @@ func (p *program) log(format string, args ...interface{}) { if _, ok := p.conf.logDestinationsParsed[logDestinationStdout]; ok { log.Println(line) } + if _, ok := p.conf.logDestinationsParsed[logDestinationFile]; ok { p.logFile.WriteString(line + "\n") } @@ -288,7 +293,7 @@ func (p *program) run() { p.onInit() } - checkPathsTicker := time.NewTicker(5 * time.Second) + checkPathsTicker := time.NewTicker(checkPathPeriod) defer checkPathsTicker.Stop() outer: @@ -319,12 +324,7 @@ outer: if evt.client.pathName != "" { if path, ok := p.paths[evt.client.pathName]; ok { if path.publisher == evt.client { - path.publisherRemove() - - if !path.permanent { - path.onClose() - delete(p.paths, evt.client.pathName) - } + path.onPublisherRemove() } } } @@ -333,31 +333,26 @@ outer: close(evt.done) case programEventClientDescribe: - path, ok := p.paths[evt.path] - if !ok { - evt.client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", evt.path)} - continue + // create path if not exist + if _, ok := p.paths[evt.path]; !ok { + p.paths[evt.path] = newPath(p, evt.path, p.findConfForPathName(evt.path), false) } - path.describe(evt.client) + p.paths[evt.path].onDescribe(evt.client) case programEventClientAnnounce: - if path, ok := p.paths[evt.path]; ok { + // create path if not exist + if path, ok := p.paths[evt.path]; !ok { + p.paths[evt.path] = newPath(p, evt.path, p.findConfForPathName(evt.path), false) + + } else { if path.publisher != nil { evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path) continue } - - } else { - p.paths[evt.path] = newPath(p, evt.path, p.findConfForPathName(evt.path), false) } - p.paths[evt.path].publisher = evt.client - p.paths[evt.path].publisherSdpText = evt.sdpText - p.paths[evt.path].publisherSdpParsed = evt.sdpParsed - - evt.client.pathName = evt.path - evt.client.state = clientStateAnnounce + p.paths[evt.path].onPublisherNew(evt.client, evt.sdpText, evt.sdpParsed) evt.res <- nil case programEventClientSetupPlay: @@ -426,7 +421,7 @@ outer: } } - p.paths[evt.client.pathName].publisherSetReady() + p.paths[evt.client.pathName].onPublisherSetReady() close(evt.done) case programEventClientRecordStop: @@ -441,7 +436,7 @@ outer: delete(p.udpClientsByAddr, key) } } - p.paths[evt.client.pathName].publisherSetNotReady() + p.paths[evt.client.pathName].onPublisherSetNotReady() close(evt.done) case programEventClientFrameUdp: @@ -463,11 +458,11 @@ outer: case programEventSourceReady: evt.source.log("ready") - p.paths[evt.source.pathName].publisherSetReady() + p.paths[evt.source.pathName].onPublisherSetReady() case programEventSourceNotReady: evt.source.log("not ready") - p.paths[evt.source.pathName].publisherSetNotReady() + p.paths[evt.source.pathName].onPublisherSetNotReady() case programEventSourceFrame: p.forwardFrame(evt.source.pathName, evt.trackId, evt.streamType, evt.buf) diff --git a/path.go b/path.go index e007c6d3..69f8693a 100644 --- a/path.go +++ b/path.go @@ -9,25 +9,31 @@ import ( "github.com/aler9/sdp/v3" ) +const ( + describeTimeout = 5 * time.Second + sourceStopAfterDescribeSecs = 10 * time.Second + onDemandCmdStopAfterDescribeSecs = 10 * time.Second +) + // a publisher is either a client or a source type publisher interface { isPublisher() } type path struct { - p *program - name string - confp *confPath - permanent bool - source *source - publisher publisher - publisherReady bool - publisherSdpText []byte - publisherSdpParsed *sdp.SessionDescription - lastRequested time.Time - lastActivation time.Time - onInitCmd *exec.Cmd - onDemandCmd *exec.Cmd + p *program + name string + confp *confPath + permanent bool + source *source + publisher publisher + publisherReady bool + publisherSdpText []byte + publisherSdpParsed *sdp.SessionDescription + lastDescribeReq time.Time + lastDescribeActivation time.Time + onInitCmd *exec.Cmd + onDemandCmd *exec.Cmd } func newPath(p *program, name string, confp *confPath, permanent bool) *path { @@ -87,19 +93,37 @@ func (pa *path) onClose() { } } -func (pa *path) onCheck() { - hasClientsWaitingDescribe := func() bool { - for c := range pa.p.clients { - if c.state == clientStateWaitingDescription && c.pathName == pa.name { - return true - } +func (pa *path) hasClients() bool { + for c := range pa.p.clients { + if c.pathName == pa.name { + return true + } + } + return false +} + +func (pa *path) hasClientsWaitingDescribe() bool { + for c := range pa.p.clients { + if c.state == clientStateWaitingDescription && c.pathName == pa.name { + return true + } + } + return false +} + +func (pa *path) hasClientReaders() bool { + for c := range pa.p.clients { + if c.pathName == pa.name && c != pa.publisher { + return true } - return false - }() + } + return false +} +func (pa *path) onCheck() { // reply to DESCRIBE requests if they are in timeout - if hasClientsWaitingDescribe && - time.Since(pa.lastActivation) >= 5*time.Second { + if pa.hasClientsWaitingDescribe() && + time.Since(pa.lastDescribeActivation) >= describeTimeout { for c := range pa.p.clients { if c.state == clientStateWaitingDescription && c.pathName == pa.name { @@ -108,67 +132,89 @@ func (pa *path) onCheck() { c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} } } + } - // perform actions below in next run - return + // stop on demand source if needed + if pa.source != nil && + pa.confp.SourceOnDemand && + pa.source.state == sourceStateRunning && + !pa.hasClients() && + time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs { + pa.source.log("stopping since we're not requested anymore") + pa.source.state = sourceStateStopped + pa.source.events <- sourceEventApplyState{pa.source.state} } - if source, ok := pa.publisher.(*source); ok { - // stop on demand source if needed - if pa.confp.SourceOnDemand && - source.state == sourceStateRunning && - time.Since(pa.lastRequested) >= 10*time.Second { + // stop on demand command if needed + if pa.onDemandCmd != nil && + !pa.hasClientReaders() && + time.Since(pa.lastDescribeReq) >= onDemandCmdStopAfterDescribeSecs { + pa.p.log("stopping on demand command (not requested anymore)") + pa.onDemandCmd.Process.Signal(os.Interrupt) + pa.onDemandCmd.Wait() + pa.onDemandCmd = nil + } - hasClients := func() bool { - for c := range pa.p.clients { - if c.pathName == pa.name { - return true - } - } - return false - }() - if !hasClients { - source.log("stopping since we're not requested anymore") - source.state = sourceStateStopped - source.events <- sourceEventApplyState{source.state} - } + // remove non-permanent paths + if !pa.permanent && + pa.publisher == nil && + !pa.hasClients() { + pa.onClose() + delete(pa.p.paths, pa.name) + } +} + +func (pa *path) onPublisherNew(client *client, sdpText []byte, sdpParsed *sdp.SessionDescription) { + pa.publisher = client + pa.publisherSdpText = sdpText + pa.publisherSdpParsed = sdpParsed + + client.pathName = pa.name + client.state = clientStateAnnounce +} + +func (pa *path) onPublisherRemove() { + pa.publisher = nil +} + +func (pa *path) onPublisherSetReady() { + pa.publisherReady = true + + // reply to all clients that are waiting for a description + for c := range pa.p.clients { + if c.state == clientStateWaitingDescription && + c.pathName == pa.name { + c.pathName = "" + c.state = clientStateInitial + c.describeRes <- describeRes{pa.publisherSdpText, nil} } + } +} - } else { - // stop on demand command if needed - if pa.onDemandCmd != nil && - time.Since(pa.lastRequested) >= 10*time.Second { - - hasClientReaders := func() bool { - for c := range pa.p.clients { - if c.pathName == pa.name && c != pa.publisher { - return true - } - } - return false - }() - if !hasClientReaders { - pa.p.log("stopping on demand command (not requested anymore)") - pa.onDemandCmd.Process.Signal(os.Interrupt) - pa.onDemandCmd.Wait() - pa.onDemandCmd = nil - } +func (pa *path) onPublisherSetNotReady() { + pa.publisherReady = false + + // close all clients that are reading + for c := range pa.p.clients { + if c.state != clientStateWaitingDescription && + c != pa.publisher && + c.pathName == pa.name { + c.conn.NetConn().Close() } } } -func (pa *path) describe(client *client) { - pa.lastRequested = time.Now() +func (pa *path) onDescribe(client *client) { + pa.lastDescribeReq = time.Now() // publisher not found if pa.publisher == nil { // on demand command is available: put the client on hold if pa.confp.RunOnDemand != "" { - // start on demand command if needed - if pa.onDemandCmd == nil { + if pa.onDemandCmd == nil { // start if needed pa.p.log("starting on demand command") - pa.lastActivation = time.Now() + pa.lastDescribeActivation = time.Now() pa.onDemandCmd = exec.Command("/bin/sh", "-c", pa.confp.RunOnDemand) pa.onDemandCmd.Env = append(os.Environ(), "RTSP_SERVER_PATH="+pa.name, @@ -183,69 +229,26 @@ func (pa *path) describe(client *client) { client.pathName = pa.name client.state = clientStateWaitingDescription - return - } - // no on-demand: reply with 404 - client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)} - return - } + // no on-demand: reply with 404 + } else { + client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)} + } - // publisher was found but is not ready: put the client on hold - if !pa.publisherReady { - // start source if needed - if source, ok := pa.publisher.(*source); ok && source.state == sourceStateStopped { - source.log("starting on demand") - pa.lastActivation = time.Now() - source.state = sourceStateRunning - source.events <- sourceEventApplyState{source.state} + // publisher was found but is not ready: put the client on hold + } else if !pa.publisherReady { + if pa.source != nil && pa.source.state == sourceStateStopped { // start if needed + pa.source.log("starting on demand") + pa.lastDescribeActivation = time.Now() + pa.source.state = sourceStateRunning + pa.source.events <- sourceEventApplyState{pa.source.state} } client.pathName = pa.name client.state = clientStateWaitingDescription - return - } - - // publisher was found and is ready - client.describeRes <- describeRes{pa.publisherSdpText, nil} -} - -func (pa *path) publisherRemove() { - for c := range pa.p.clients { - if c.state == clientStateWaitingDescription && - c.pathName == pa.name { - c.pathName = "" - c.state = clientStateInitial - c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' is not available anymore", pa.name)} - } - } - - pa.publisher = nil -} - -func (pa *path) publisherSetReady() { - pa.publisherReady = true - - // reply to all clients that are waiting for a description - for c := range pa.p.clients { - if c.state == clientStateWaitingDescription && - c.pathName == pa.name { - c.pathName = "" - c.state = clientStateInitial - c.describeRes <- describeRes{pa.publisherSdpText, nil} - } - } -} - -func (pa *path) publisherSetNotReady() { - pa.publisherReady = false - // close all clients that are reading - for c := range pa.p.clients { - if c.state != clientStateWaitingDescription && - c != pa.publisher && - c.pathName == pa.name { - c.conn.NetConn().Close() - } + // publisher was found and is ready + } else { + client.describeRes <- describeRes{pa.publisherSdpText, nil} } }