Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

235 lines
5.5 KiB

package main
import (
"fmt"
"os"
"os/exec"
"time"
"github.com/aler9/sdp/v3"
)
// a publisher is either a client or a source
type publisher interface {
isPublisher()
}
type path struct {
p *program
name string
confp *confPath
permanent bool
publisher publisher
publisherReady bool
publisherSdpText []byte
publisherSdpParsed *sdp.SessionDescription
lastRequested time.Time
lastActivation time.Time
onInitCmd *exec.Cmd
onDemandCmd *exec.Cmd
}
func newPath(p *program, name string, confp *confPath, permanent bool) *path {
pa := &path{
p: p,
name: name,
confp: confp,
permanent: permanent,
}
return pa
}
func (pa *path) onInit() {
if pa.confp.RunOnInit != "" {
pa.p.log("starting on init command")
pa.onInitCmd = exec.Command("/bin/sh", "-c", pa.confp.RunOnInit)
pa.onInitCmd.Env = append(os.Environ(),
"RTSP_SERVER_PATH="+pa.name,
)
pa.onInitCmd.Stdout = os.Stdout
pa.onInitCmd.Stderr = os.Stderr
err := pa.onInitCmd.Start()
if err != nil {
pa.p.log("ERR: %s", err)
}
}
}
func (pa *path) onClose() {
if pa.onInitCmd != nil {
pa.p.log("stopping on init command (exited)")
pa.onInitCmd.Process.Signal(os.Interrupt)
pa.onInitCmd.Wait()
}
if pa.onDemandCmd != nil {
pa.p.log("stopping on demand command (exited)")
pa.onDemandCmd.Process.Signal(os.Interrupt)
pa.onDemandCmd.Wait()
}
}
func (pa *path) onCheck() {
hasClientsWaitingDescribe := func() bool {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription && c.pathName == pa.name {
return true
}
}
return false
}()
// reply to DESCRIBE requests if they are in timeout
if hasClientsWaitingDescribe &&
time.Since(pa.lastActivation) >= 5*time.Second {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathName == pa.name {
c.pathName = ""
c.state = clientStateInitial
c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
}
}
// perform actions below in next run
return
}
if source, ok := pa.publisher.(*source); ok {
// stop on demand source if needed
if pa.confp.SourceOnDemand &&
source.state == sourceStateRunning &&
time.Since(pa.lastRequested) >= 10*time.Second {
hasClients := func() bool {
for c := range pa.p.clients {
if c.pathName == pa.name {
return true
}
}
return false
}()
if !hasClients {
source.log("stopping since we're not requested anymore")
source.state = sourceStateStopped
source.events <- sourceEventApplyState{source.state}
}
}
} else {
// stop on demand command if needed
if pa.onDemandCmd != nil &&
time.Since(pa.lastRequested) >= 10*time.Second {
hasClientReaders := func() bool {
for c := range pa.p.clients {
if c.pathName == pa.name && c != pa.publisher {
return true
}
}
return false
}()
if !hasClientReaders {
pa.p.log("stopping on demand command (not requested anymore)")
pa.onDemandCmd.Process.Signal(os.Interrupt)
pa.onDemandCmd.Wait()
pa.onDemandCmd = nil
}
}
}
}
func (pa *path) describe(client *client) {
pa.lastRequested = time.Now()
// publisher not found
if pa.publisher == nil {
// on demand command is available: put the client on hold
if pa.confp.RunOnDemand != "" {
// start on demand command if needed
if pa.onDemandCmd == nil {
pa.p.log("starting on demand command")
pa.lastActivation = time.Now()
pa.onDemandCmd = exec.Command("/bin/sh", "-c", pa.confp.RunOnDemand)
pa.onDemandCmd.Env = append(os.Environ(),
"RTSP_SERVER_PATH="+pa.name,
)
pa.onDemandCmd.Stdout = os.Stdout
pa.onDemandCmd.Stderr = os.Stderr
err := pa.onDemandCmd.Start()
if err != nil {
pa.p.log("ERR: %s", err)
}
}
client.pathName = pa.name
client.state = clientStateWaitingDescription
return
}
// no on-demand: reply with 404
client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)}
return
}
// publisher was found but is not ready: put the client on hold
if !pa.publisherReady {
// start source if needed
if source, ok := pa.publisher.(*source); ok && source.state == sourceStateStopped {
source.log("starting on demand")
pa.lastActivation = time.Now()
source.state = sourceStateRunning
source.events <- sourceEventApplyState{source.state}
}
client.pathName = pa.name
client.state = clientStateWaitingDescription
return
}
// publisher was found and is ready
client.describeRes <- describeRes{pa.publisherSdpText, nil}
}
func (pa *path) publisherRemove() {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathName == pa.name {
c.pathName = ""
c.state = clientStateInitial
c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' is not available anymore", pa.name)}
}
}
pa.publisher = nil
}
func (pa *path) publisherSetReady() {
pa.publisherReady = true
// reply to all clients that are waiting for a description
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathName == pa.name {
c.pathName = ""
c.state = clientStateInitial
c.describeRes <- describeRes{pa.publisherSdpText, nil}
}
}
}
func (pa *path) publisherSetNotReady() {
pa.publisherReady = false
// close all clients that are reading
for c := range pa.p.clients {
if c.state != clientStateWaitingDescription &&
c != pa.publisher &&
c.pathName == pa.name {
c.conn.NetConn().Close()
}
}
}