Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

519 lines
11 KiB

package main
import (
"fmt"
"io"
"log"
"net"
"os"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/sdp/v3"
"gopkg.in/alecthomas/kingpin.v2"
)
var Version = "v0.0.0"
const (
checkPathPeriod = 5 * time.Second
)
type logDestination int
const (
logDestinationStdout logDestination = iota
logDestinationFile
)
type programEvent interface {
isProgramEvent()
}
type programEventMetrics struct {
res chan *metricsData
}
func (programEventMetrics) isProgramEvent() {}
type programEventClientNew struct {
nconn net.Conn
}
func (programEventClientNew) isProgramEvent() {}
type programEventClientClose struct {
client *client
}
func (programEventClientClose) isProgramEvent() {}
type programEventClientDescribe struct {
client *client
pathName string
}
func (programEventClientDescribe) isProgramEvent() {}
type programEventClientAnnounce struct {
res chan error
client *client
pathName string
sdpText []byte
sdpParsed *sdp.SessionDescription
}
func (programEventClientAnnounce) isProgramEvent() {}
type programEventClientSetupPlay struct {
res chan error
client *client
pathName string
trackId int
}
func (programEventClientSetupPlay) isProgramEvent() {}
type programEventClientPlay struct {
client *client
}
func (programEventClientPlay) isProgramEvent() {}
type programEventClientRecord struct {
client *client
}
func (programEventClientRecord) isProgramEvent() {}
type programEventClientFrameUdp struct {
addr *net.UDPAddr
streamType gortsplib.StreamType
buf []byte
}
func (programEventClientFrameUdp) isProgramEvent() {}
type programEventClientFrameTcp struct {
path *path
trackId int
streamType gortsplib.StreamType
buf []byte
}
func (programEventClientFrameTcp) isProgramEvent() {}
type programEventSourceReady struct {
source *source
}
func (programEventSourceReady) isProgramEvent() {}
type programEventSourceNotReady struct {
source *source
}
func (programEventSourceNotReady) isProgramEvent() {}
type programEventSourceFrame struct {
source *source
trackId int
streamType gortsplib.StreamType
buf []byte
}
func (programEventSourceFrame) isProgramEvent() {}
type programEventTerminate struct{}
func (programEventTerminate) isProgramEvent() {}
type program struct {
conf *conf
logFile *os.File
metrics *metrics
pprof *pprof
paths map[string]*path
serverRtp *serverUdp
serverRtcp *serverUdp
serverRtsp *serverTcp
clients map[*client]struct{}
udpClientsByAddr map[udpClientAddr]*udpClient
publisherCount int
readerCount int
events chan programEvent
done chan struct{}
}
func newProgram(args []string, stdin io.Reader) (*program, error) {
k := kingpin.New("rtsp-simple-server",
"rtsp-simple-server "+Version+"\n\nRTSP server.")
argVersion := k.Flag("version", "print version").Bool()
argConfPath := k.Arg("confpath", "path to a config file. The default is rtsp-simple-server.yml. Use 'stdin' to read config from stdin").Default("rtsp-simple-server.yml").String()
kingpin.MustParse(k.Parse(args))
if *argVersion == true {
fmt.Println(Version)
os.Exit(0)
}
conf, err := loadConf(*argConfPath, stdin)
if err != nil {
return nil, err
}
p := &program{
conf: conf,
paths: make(map[string]*path),
clients: make(map[*client]struct{}),
udpClientsByAddr: make(map[udpClientAddr]*udpClient),
events: make(chan programEvent),
done: make(chan struct{}),
}
if _, ok := p.conf.logDestinationsParsed[logDestinationFile]; ok {
p.logFile, err = os.OpenFile(p.conf.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
}
p.log("rtsp-simple-server %s", Version)
if conf.Metrics {
p.metrics, err = newMetrics(p)
if err != nil {
return nil, err
}
}
if conf.Pprof {
p.pprof, err = newPprof(p)
if err != nil {
return nil, err
}
}
for name, confp := range conf.Paths {
if name == "all" {
continue
}
p.paths[name] = newPath(p, name, confp, true)
}
if _, ok := conf.protocolsParsed[gortsplib.StreamProtocolUdp]; ok {
p.serverRtp, err = newServerUdp(p, conf.RtpPort, gortsplib.StreamTypeRtp)
if err != nil {
return nil, err
}
p.serverRtcp, err = newServerUdp(p, conf.RtcpPort, gortsplib.StreamTypeRtcp)
if err != nil {
return nil, err
}
}
p.serverRtsp, err = newServerTcp(p)
if err != nil {
return nil, err
}
go p.run()
return p, nil
}
func (p *program) log(format string, args ...interface{}) {
line := fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{len(p.clients),
p.publisherCount, p.readerCount}, args...)...)
if _, ok := p.conf.logDestinationsParsed[logDestinationStdout]; ok {
log.Println(line)
}
if _, ok := p.conf.logDestinationsParsed[logDestinationFile]; ok {
p.logFile.WriteString(line + "\n")
}
}
func (p *program) run() {
if p.metrics != nil {
go p.metrics.run()
}
if p.pprof != nil {
go p.pprof.run()
}
if p.serverRtp != nil {
go p.serverRtp.run()
}
if p.serverRtcp != nil {
go p.serverRtcp.run()
}
go p.serverRtsp.run()
for _, p := range p.paths {
p.onInit()
}
checkPathsTicker := time.NewTicker(checkPathPeriod)
defer checkPathsTicker.Stop()
outer:
for {
select {
case <-checkPathsTicker.C:
for _, path := range p.paths {
path.onCheck()
}
case rawEvt := <-p.events:
switch evt := rawEvt.(type) {
case programEventMetrics:
evt.res <- &metricsData{
clientCount: len(p.clients),
publisherCount: p.publisherCount,
readerCount: p.readerCount,
}
case programEventClientNew:
c := newClient(p, evt.nconn)
p.clients[c] = struct{}{}
c.log("connected")
case programEventClientClose:
if _, ok := p.clients[evt.client]; !ok {
continue
}
evt.client.close()
case programEventClientDescribe:
// create path if not exist
if _, ok := p.paths[evt.pathName]; !ok {
p.paths[evt.pathName] = newPath(p, evt.pathName, p.findConfForPathName(evt.pathName), false)
}
p.paths[evt.pathName].onDescribe(evt.client)
case programEventClientAnnounce:
// create path if not exist
if path, ok := p.paths[evt.pathName]; !ok {
p.paths[evt.pathName] = newPath(p, evt.pathName, p.findConfForPathName(evt.pathName), false)
} else {
if path.publisher != nil {
evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.pathName)
continue
}
}
p.paths[evt.pathName].publisher = evt.client
p.paths[evt.pathName].publisherSdpText = evt.sdpText
p.paths[evt.pathName].publisherSdpParsed = evt.sdpParsed
evt.client.path = p.paths[evt.pathName]
evt.client.state = clientStatePreRecord
evt.res <- nil
case programEventClientSetupPlay:
path, ok := p.paths[evt.pathName]
if !ok || !path.publisherReady {
evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.pathName)
continue
}
if evt.trackId >= len(path.publisherSdpParsed.MediaDescriptions) {
evt.res <- fmt.Errorf("track %d does not exist", evt.trackId)
continue
}
evt.client.path = path
evt.client.state = clientStatePrePlay
evt.res <- nil
case programEventClientPlay:
p.readerCount += 1
evt.client.state = clientStatePlay
case programEventClientRecord:
p.publisherCount += 1
evt.client.state = clientStateRecord
if evt.client.streamProtocol == gortsplib.StreamProtocolUdp {
for trackId, track := range evt.client.streamTracks {
key := makeUdpClientAddr(evt.client.ip(), track.rtpPort)
p.udpClientsByAddr[key] = &udpClient{
client: evt.client,
trackId: trackId,
streamType: gortsplib.StreamTypeRtp,
}
key = makeUdpClientAddr(evt.client.ip(), track.rtcpPort)
p.udpClientsByAddr[key] = &udpClient{
client: evt.client,
trackId: trackId,
streamType: gortsplib.StreamTypeRtcp,
}
}
}
evt.client.path.onPublisherSetReady()
case programEventClientFrameUdp:
pub, ok := p.udpClientsByAddr[makeUdpClientAddr(evt.addr.IP, evt.addr.Port)]
if !ok {
continue
}
// client sent RTP on RTCP port or vice-versa
if pub.streamType != evt.streamType {
continue
}
pub.client.rtcpReceivers[pub.trackId].OnFrame(evt.streamType, evt.buf)
p.forwardFrame(pub.client.path, pub.trackId, evt.streamType, evt.buf)
case programEventClientFrameTcp:
p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf)
case programEventSourceReady:
evt.source.log("ready")
evt.source.path.onPublisherSetReady()
case programEventSourceNotReady:
evt.source.log("not ready")
evt.source.path.onPublisherSetNotReady()
case programEventSourceFrame:
p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf)
case programEventTerminate:
break outer
}
}
}
go func() {
for rawEvt := range p.events {
switch evt := rawEvt.(type) {
case programEventMetrics:
evt.res <- nil
case programEventClientAnnounce:
evt.res <- fmt.Errorf("terminated")
case programEventClientSetupPlay:
evt.res <- fmt.Errorf("terminated")
}
}
}()
for _, p := range p.paths {
p.onClose()
}
p.serverRtsp.close()
if p.serverRtcp != nil {
p.serverRtcp.close()
}
if p.serverRtp != nil {
p.serverRtp.close()
}
for c := range p.clients {
c.close()
<-c.done
}
if p.metrics != nil {
p.metrics.close()
}
if p.pprof != nil {
p.pprof.close()
}
if p.logFile != nil {
p.logFile.Close()
}
close(p.events)
close(p.done)
}
func (p *program) close() {
p.events <- programEventTerminate{}
<-p.done
}
func (p *program) findConfForPathName(name string) *confPath {
if confp, ok := p.conf.Paths[name]; ok {
return confp
}
if confp, ok := p.conf.Paths["all"]; ok {
return confp
}
return nil
}
func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.StreamType, frame []byte) {
for c := range p.clients {
if c.path != path ||
c.state != clientStatePlay {
continue
}
track, ok := c.streamTracks[trackId]
if !ok {
continue
}
if c.streamProtocol == gortsplib.StreamProtocolUdp {
if streamType == gortsplib.StreamTypeRtp {
p.serverRtp.write(&udpAddrBufPair{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: track.rtpPort,
},
buf: frame,
})
} else {
p.serverRtcp.write(&udpAddrBufPair{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: track.rtcpPort,
},
buf: frame,
})
}
} else {
c.tcpFrame <- &gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,
Content: frame,
}
}
}
}
func main() {
_, err := newProgram(os.Args[1:], os.Stdin)
if err != nil {
log.Fatal("ERR: ", err)
}
select {}
}