Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

255 lines
5.7 KiB

package main
import (
"fmt"
"time"
)
const (
describeTimeout = 5 * time.Second
sourceStopAfterDescribeSecs = 10 * time.Second
onDemandCmdStopAfterDescribeSecs = 10 * time.Second
)
// a publisher is either a client or a source
type publisher interface {
isPublisher()
}
type path struct {
p *program
name string
conf *pathConf
source *source
publisher publisher
publisherReady bool
publisherTrackCount int
publisherSdp []byte
lastDescribeReq time.Time
lastDescribeActivation time.Time
onInitCmd *externalCmd
onDemandCmd *externalCmd
}
func newPath(p *program, name string, conf *pathConf) *path {
pa := &path{
p: p,
name: name,
conf: conf,
}
if conf.Source != "record" {
s := newSource(p, pa, conf)
pa.source = s
pa.publisher = s
}
return pa
}
func (pa *path) log(format string, args ...interface{}) {
pa.p.log("[path "+pa.name+"] "+format, args...)
}
func (pa *path) onInit() {
if pa.source != nil {
go pa.source.run(pa.source.state)
}
if pa.conf.RunOnInit != "" {
pa.log("starting on init command")
var err error
pa.onInitCmd, err = startExternalCommand(pa.conf.RunOnInit, pa.name)
if err != nil {
pa.log("ERR: %s", err)
}
}
}
func (pa *path) onClose(wait bool) {
if pa.source != nil {
close(pa.source.terminate)
<-pa.source.done
}
if pa.onInitCmd != nil {
pa.log("stopping on init command (closing)")
pa.onInitCmd.close()
}
if pa.onDemandCmd != nil {
pa.log("stopping on demand command (closing)")
pa.onDemandCmd.close()
}
for c := range pa.p.clients {
if c.path == pa {
if c.state == clientStateWaitDescription {
c.path = nil
c.state = clientStateInitial
c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
} else {
c.close()
if wait {
<-c.done
}
}
}
}
}
func (pa *path) hasClients() bool {
for c := range pa.p.clients {
if c.path == pa {
return true
}
}
return false
}
func (pa *path) hasClientsWaitingDescribe() bool {
for c := range pa.p.clients {
if c.state == clientStateWaitDescription && c.path == pa {
return true
}
}
return false
}
func (pa *path) hasClientReaders() bool {
for c := range pa.p.clients {
if c.path == pa && c != pa.publisher {
return true
}
}
return false
}
func (pa *path) onCheck() {
// reply to DESCRIBE requests if they are in timeout
if pa.hasClientsWaitingDescribe() &&
time.Since(pa.lastDescribeActivation) >= describeTimeout {
for c := range pa.p.clients {
if c.state == clientStateWaitDescription &&
c.path == pa {
c.path = nil
c.state = clientStateInitial
c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
}
}
}
// stop on demand source if needed
if pa.source != nil &&
pa.conf.SourceOnDemand &&
pa.source.state == sourceStateRunning &&
!pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs {
pa.log("stopping on demand source since (not requested anymore)")
pa.source.state = sourceStateStopped
pa.source.setState <- pa.source.state
}
// stop on demand command if needed
if pa.onDemandCmd != nil &&
!pa.hasClientReaders() &&
time.Since(pa.lastDescribeReq) >= onDemandCmdStopAfterDescribeSecs {
pa.log("stopping on demand command (not requested anymore)")
pa.onDemandCmd.close()
pa.onDemandCmd = nil
}
// remove regular expression paths
if pa.conf.regexp != nil &&
pa.publisher == nil &&
!pa.hasClients() {
pa.onClose(false)
delete(pa.p.paths, pa.name)
}
}
func (pa *path) onPublisherRemove() {
pa.publisher = nil
// close all clients that are reading or waiting for reading
for c := range pa.p.clients {
if c.path == pa &&
c.state != clientStateWaitDescription &&
c != pa.publisher {
c.close()
}
}
}
func (pa *path) onPublisherSetReady() {
pa.publisherReady = true
// reply to all clients that are waiting for a description
for c := range pa.p.clients {
if c.state == clientStateWaitDescription &&
c.path == pa {
c.path = nil
c.state = clientStateInitial
c.describe <- describeRes{pa.publisherSdp, nil}
}
}
}
func (pa *path) onPublisherSetNotReady() {
pa.publisherReady = false
// close all clients that are reading or waiting for reading
for c := range pa.p.clients {
if c.path == pa &&
c.state != clientStateWaitDescription &&
c != pa.publisher {
c.close()
}
}
}
func (pa *path) onDescribe(client *client) {
pa.lastDescribeReq = time.Now()
// publisher not found
if pa.publisher == nil {
// on demand command is available: put the client on hold
if pa.conf.RunOnDemand != "" {
if pa.onDemandCmd == nil { // start if needed
pa.log("starting on demand command")
pa.lastDescribeActivation = time.Now()
var err error
pa.onDemandCmd, err = startExternalCommand(pa.conf.RunOnDemand, pa.name)
if err != nil {
pa.log("ERR: %s", err)
}
}
client.path = pa
client.state = clientStateWaitDescription
// no on-demand: reply with 404
} else {
client.describe <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)}
}
// publisher was found but is not ready: put the client on hold
} else if !pa.publisherReady {
if pa.source != nil && pa.source.state == sourceStateStopped { // start if needed
pa.log("starting on demand source")
pa.lastDescribeActivation = time.Now()
pa.source.state = sourceStateRunning
pa.source.setState <- pa.source.state
}
client.path = pa
client.state = clientStateWaitDescription
// publisher was found and is ready
} else {
client.describe <- describeRes{pa.publisherSdp, nil}
}
}