Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1069 lines
26 KiB

package main
import (
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"strings"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/sdp/v3"
)
const (
clientCheckStreamInterval = 5 * time.Second
clientReceiverReportInterval = 10 * time.Second
clientTcpReadBufferSize = 128 * 1024
clientTcpWriteBufferSize = 128 * 1024
clientUdpReadBufferSize = 2048
clientUdpWriteBufferSize = 128 * 1024
)
type serverClientTrack struct {
rtpPort int
rtcpPort int
}
type serverClientEvent interface {
isServerClientEvent()
}
type serverClientEventFrameTcp struct {
frame *gortsplib.InterleavedFrame
}
func (serverClientEventFrameTcp) isServerClientEvent() {}
type serverClientState int
const (
clientStateStarting serverClientState = iota
clientStateAnnounce
clientStatePrePlay
clientStatePlay
clientStatePreRecord
clientStateRecord
)
func (cs serverClientState) String() string {
switch cs {
case clientStateStarting:
return "STARTING"
case clientStateAnnounce:
return "ANNOUNCE"
case clientStatePrePlay:
return "PRE_PLAY"
case clientStatePlay:
return "PLAY"
case clientStatePreRecord:
return "PRE_RECORD"
case clientStateRecord:
return "RECORD"
}
return "UNKNOWN"
}
type serverClient struct {
p *program
conn *gortsplib.ConnServer
state serverClientState
path string
authUser string
authPass string
authHelper *gortsplib.AuthServer
authFailures int
streamSdpText []byte // only if publisher
streamSdpParsed *sdp.SessionDescription // only if publisher
streamProtocol gortsplib.StreamProtocol
streamTracks []*serverClientTrack
rtcpReceivers []*gortsplib.RtcpReceiver
readBuf *doubleBuffer
writeBuf *doubleBuffer
events chan serverClientEvent // only if state = Play and gortsplib.StreamProtocol = TCP
done chan struct{}
}
func newServerClient(p *program, nconn net.Conn) *serverClient {
c := &serverClient{
p: p,
conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{
Conn: nconn,
ReadTimeout: p.conf.ReadTimeout,
WriteTimeout: p.conf.WriteTimeout,
}),
state: clientStateStarting,
readBuf: newDoubleBuffer(clientTcpReadBufferSize),
done: make(chan struct{}),
}
go c.run()
return c
}
func (c *serverClient) log(format string, args ...interface{}) {
c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
}
func (c *serverClient) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
}
func (c *serverClient) zone() string {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone
}
func (c *serverClient) publisherIsReady() bool {
return c.state == clientStateRecord
}
func (c *serverClient) publisherSdpText() []byte {
return c.streamSdpText
}
func (c *serverClient) publisherSdpParsed() *sdp.SessionDescription {
return c.streamSdpParsed
}
func (c *serverClient) run() {
var runOnConnectCmd *exec.Cmd
if c.p.conf.RunOnConnect != "" {
runOnConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect)
runOnConnectCmd.Stdout = os.Stdout
runOnConnectCmd.Stderr = os.Stderr
err := runOnConnectCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}
outer:
for {
req, err := c.conn.ReadRequest()
if err != nil {
if err != io.EOF {
c.log("ERR: %s", err)
}
break outer
}
ok := c.handleRequest(req)
if !ok {
break outer
}
}
done := make(chan struct{})
c.p.events <- programEventClientClose{done, c}
<-done
c.conn.NetConn().Close() // close socket in case it has not been closed yet
if runOnConnectCmd != nil {
runOnConnectCmd.Process.Signal(os.Interrupt)
runOnConnectCmd.Wait()
}
close(c.done) // close() never blocks
}
func (c *serverClient) close() {
c.conn.NetConn().Close()
<-c.done
}
func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) {
c.log("ERR: %s", err)
header := gortsplib.Header{}
if cseq, ok := req.Header["CSeq"]; ok && len(cseq) == 1 {
header["CSeq"] = cseq
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: code,
Header: header,
})
}
func (c *serverClient) findConfForPath(path string) *ConfPath {
if pconf, ok := c.p.conf.Paths[path]; ok {
return pconf
}
if pconf, ok := c.p.conf.Paths["all"]; ok {
return pconf
}
return nil
}
var errAuthCritical = errors.New("auth critical")
var errAuthNotCritical = errors.New("auth not critical")
func (c *serverClient) authenticate(ips []interface{}, user string, pass string, req *gortsplib.Request) error {
// validate ip
err := func() error {
if ips == nil {
return nil
}
ip := c.ip()
if !ipEqualOrInRange(ip, ips) {
c.log("ERR: ip '%s' not allowed", ip)
return errAuthCritical
}
return nil
}()
if err != nil {
return err
}
// validate credentials
err = func() error {
if user == "" {
return nil
}
// reset authHelper every time the credentials change
if c.authHelper == nil || c.authUser != user || c.authPass != pass {
c.authUser = user
c.authPass = pass
c.authHelper = gortsplib.NewAuthServer(user, pass, c.p.conf.authMethodsParsed)
}
err := c.authHelper.ValidateHeader(req.Header["Authorization"], req.Method, req.Url)
if err != nil {
c.authFailures += 1
// vlc with login prompt sends 4 requests:
// 1) without credentials
// 2) with password but without the username
// 3) without credentials
// 4) with password and username
// hence we must allow up to 3 failures
var retErr error
if c.authFailures > 3 {
c.log("ERR: unauthorized: %s", err)
retErr = errAuthCritical
} else if c.authFailures > 1 {
c.log("WARN: unauthorized: %s", err)
retErr = errAuthNotCritical
} else {
retErr = errAuthNotCritical
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusUnauthorized,
Header: gortsplib.Header{
"CSeq": req.Header["CSeq"],
"WWW-Authenticate": c.authHelper.GenerateHeader(),
},
})
return retErr
}
// reset authFailures after a successful login
c.authFailures = 0
return nil
}()
if err != nil {
return err
}
return nil
}
func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
c.log(string(req.Method))
cseq, ok := req.Header["CSeq"]
if !ok || len(cseq) != 1 {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("cseq missing"))
return false
}
path := func() string {
ret := req.Url.Path
// remove leading slash
if len(ret) > 1 {
ret = ret[1:]
}
// strip any subpath
if n := strings.Index(ret, "/"); n >= 0 {
ret = ret[:n]
}
return ret
}()
switch req.Method {
case gortsplib.OPTIONS:
// do not check state, since OPTIONS can be requested
// in any state
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Public": gortsplib.HeaderValue{strings.Join([]string{
string(gortsplib.DESCRIBE),
string(gortsplib.ANNOUNCE),
string(gortsplib.SETUP),
string(gortsplib.PLAY),
string(gortsplib.RECORD),
string(gortsplib.TEARDOWN),
}, ", ")},
},
})
return true
case gortsplib.DESCRIBE:
if c.state != clientStateStarting {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting))
return false
}
pconf := c.findConfForPath(path)
if pconf == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
return false
}
err := c.authenticate(pconf.readIpsParsed, pconf.ReadUser, pconf.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return false
}
return true
}
res := make(chan []byte)
c.p.events <- programEventClientDescribe{path, res}
sdp := <-res
if sdp == nil {
c.writeResError(req, gortsplib.StatusNotFound, fmt.Errorf("no one is publishing on path '%s'", path))
return false
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Content-Base": gortsplib.HeaderValue{req.Url.String() + "/"},
"Content-Type": gortsplib.HeaderValue{"application/sdp"},
},
Content: sdp,
})
return true
case gortsplib.ANNOUNCE:
if c.state != clientStateStarting {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting))
return false
}
if len(path) == 0 {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path can't be empty"))
return false
}
pconf := c.findConfForPath(path)
if pconf == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
return false
}
err := c.authenticate(pconf.publishIpsParsed, pconf.PublishUser, pconf.PublishPass, req)
if err != nil {
if err == errAuthCritical {
return false
}
return true
}
ct, ok := req.Header["Content-Type"]
if !ok || len(ct) != 1 {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("Content-Type header missing"))
return false
}
if ct[0] != "application/sdp" {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unsupported Content-Type '%s'", ct))
return false
}
sdpParsed := &sdp.SessionDescription{}
err = sdpParsed.Unmarshal(req.Content)
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("invalid SDP: %s", err))
return false
}
if len(sdpParsed.MediaDescriptions) == 0 {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("no tracks defined"))
return false
}
var tracks []*gortsplib.Track
for i, media := range sdpParsed.MediaDescriptions {
tracks = append(tracks, &gortsplib.Track{
Id: i,
Media: media,
})
}
sdpParsed, req.Content = sdpForServer(tracks)
res := make(chan error)
c.p.events <- programEventClientAnnounce{res, c, path}
err = <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
return false
}
c.streamSdpText = req.Content
c.streamSdpParsed = sdpParsed
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
},
})
return true
case gortsplib.SETUP:
th, err := gortsplib.ReadHeaderTransport(req.Header["Transport"])
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header: %s", err))
return false
}
if _, ok := th["multicast"]; ok {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("multicast is not supported"))
return false
}
switch c.state {
// play
case clientStateStarting, clientStatePrePlay:
pconf := c.findConfForPath(path)
if pconf == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
return false
}
err := c.authenticate(pconf.readIpsParsed, pconf.ReadUser, pconf.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return false
}
return true
}
// play via UDP
if func() bool {
_, ok := th["RTP/AVP"]
if ok {
return true
}
_, ok = th["RTP/AVP/UDP"]
if ok {
return true
}
return false
}() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return false
}
rtpPort, rtcpPort := th.Ports("client_port")
if rtpPort == 0 || rtcpPort == 0 {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%v)", req.Header["Transport"]))
return false
}
if c.path != "" && path != c.path {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupPlay{res, c, path, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort}
err = <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
return false
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Transport": gortsplib.HeaderValue{strings.Join([]string{
"RTP/AVP/UDP",
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.conf.RtpPort, c.p.conf.RtcpPort),
}, ";")},
"Session": gortsplib.HeaderValue{"12345678"},
},
})
return true
// play via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return false
}
if c.path != "" && path != c.path {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupPlay{res, c, path, gortsplib.StreamProtocolTcp, 0, 0}
err = <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
return false
}
interleaved := fmt.Sprintf("%d-%d", ((len(c.streamTracks) - 1) * 2), ((len(c.streamTracks)-1)*2)+1)
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Transport": gortsplib.HeaderValue{strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%s", interleaved),
}, ";")},
"Session": gortsplib.HeaderValue{"12345678"},
},
})
return true
} else {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", req.Header["Transport"]))
return false
}
// record
case clientStateAnnounce, clientStatePreRecord:
if strings.ToLower(th.Value("mode")) != "record" {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record"))
return false
}
// after ANNOUNCE, c.path is already set
if path != c.path {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
// record via UDP
if func() bool {
_, ok := th["RTP/AVP"]
if ok {
return true
}
_, ok = th["RTP/AVP/UDP"]
if ok {
return true
}
return false
}() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return false
}
rtpPort, rtcpPort := th.Ports("client_port")
if rtpPort == 0 || rtcpPort == 0 {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"]))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return false
}
if len(c.streamTracks) >= len(c.streamSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort}
err := <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
return false
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Transport": gortsplib.HeaderValue{strings.Join([]string{
"RTP/AVP/UDP",
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.conf.RtpPort, c.p.conf.RtcpPort),
}, ";")},
"Session": gortsplib.HeaderValue{"12345678"},
},
})
return true
// record via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return false
}
interleaved := th.Value("interleaved")
if interleaved == "" {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain the interleaved field"))
return false
}
expInterleaved := fmt.Sprintf("%d-%d", 0+len(c.streamTracks)*2, 1+len(c.streamTracks)*2)
if interleaved != expInterleaved {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("wrong interleaved value, expected '%s', got '%s'", expInterleaved, interleaved))
return false
}
if len(c.streamTracks) >= len(c.streamSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolTcp, 0, 0}
err := <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
return false
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Transport": gortsplib.HeaderValue{strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%s", interleaved),
}, ";")},
"Session": gortsplib.HeaderValue{"12345678"},
},
})
return true
} else {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", req.Header["Transport"]))
return false
}
default:
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s'", c.state))
return false
}
case gortsplib.PLAY:
if c.state != clientStatePrePlay {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePrePlay))
return false
}
if path != c.path {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
// check publisher existence
res := make(chan error)
c.p.events <- programEventClientPlay1{res, c}
err := <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
return false
}
// write response before setting state
// otherwise, in case of TCP connections, RTP packets could be sent
// before the response
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Session": gortsplib.HeaderValue{"12345678"},
},
})
c.runPlay(path)
return false
case gortsplib.RECORD:
if c.state != clientStatePreRecord {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePreRecord))
return false
}
if path != c.path {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
if len(c.streamTracks) != len(c.streamSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup"))
return false
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Session": gortsplib.HeaderValue{"12345678"},
},
})
c.runRecord(path)
return false
case gortsplib.TEARDOWN:
// close connection silently
return false
default:
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unhandled method '%s'", req.Method))
return false
}
}
func (c *serverClient) runPlay(path string) {
pconf := c.findConfForPath(path)
if c.streamProtocol == gortsplib.StreamProtocolTcp {
c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize)
c.events = make(chan serverClientEvent)
}
done := make(chan struct{})
c.p.events <- programEventClientPlay2{done, c}
<-done
c.log("is receiving on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
return "tracks"
}(), c.streamProtocol)
var runOnReadCmd *exec.Cmd
if pconf.RunOnRead != "" {
runOnReadCmd = exec.Command("/bin/sh", "-c", pconf.RunOnRead)
runOnReadCmd.Stdout = os.Stdout
runOnReadCmd.Stderr = os.Stderr
err := runOnReadCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}
if c.streamProtocol == gortsplib.StreamProtocolTcp {
readDone := make(chan error)
go func() {
buf := make([]byte, 2048)
for {
_, err := c.conn.NetConn().Read(buf)
if err != nil {
readDone <- err
break
}
}
}()
outer:
for {
select {
case err := <-readDone:
if err != io.EOF {
c.log("ERR: %s", err)
}
break outer
case rawEvt := <-c.events:
switch evt := rawEvt.(type) {
case serverClientEventFrameTcp:
c.conn.WriteFrame(evt.frame)
}
}
}
go func() {
for range c.events {
}
}()
done := make(chan struct{})
c.p.events <- programEventClientPlayStop{done, c}
<-done
close(c.events)
} else {
for {
req, err := c.conn.ReadRequest()
if err != nil {
if err != io.EOF {
c.log("ERR: %s", err)
}
break
}
ok := c.handleRequest(req)
if !ok {
break
}
}
done := make(chan struct{})
c.p.events <- programEventClientPlayStop{done, c}
<-done
}
if runOnReadCmd != nil {
runOnReadCmd.Process.Signal(os.Interrupt)
runOnReadCmd.Wait()
}
}
func (c *serverClient) runRecord(path string) {
pconf := c.findConfForPath(path)
c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks {
c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
}
done := make(chan struct{})
c.p.events <- programEventClientRecord{done, c}
<-done
c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
return "tracks"
}(), c.streamProtocol)
var runOnPublishCmd *exec.Cmd
if pconf.RunOnPublish != "" {
runOnPublishCmd = exec.Command("/bin/sh", "-c", pconf.RunOnPublish)
runOnPublishCmd.Stdout = os.Stdout
runOnPublishCmd.Stderr = os.Stderr
err := runOnPublishCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}
if c.streamProtocol == gortsplib.StreamProtocolTcp {
frame := &gortsplib.InterleavedFrame{}
readDone := make(chan error)
go func() {
for {
frame.Content = c.readBuf.swap()
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame)
if err != nil {
readDone <- err
break
}
switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame:
if frame.TrackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", frame.TrackId)
readDone <- nil
break
}
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.events <- programEventClientFrameTcp{
c.path,
frame.TrackId,
frame.StreamType,
frame.Content,
}
case *gortsplib.Request:
ok := c.handleRequest(recvt)
if !ok {
readDone <- nil
break
}
}
}
}()
checkStreamTicker := time.NewTicker(clientCheckStreamInterval)
receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
outer1:
for {
select {
case err := <-readDone:
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
}
break outer1
case <-checkStreamTicker.C:
for trackId := range c.streamTracks {
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout {
c.log("ERR: stream is dead")
c.conn.NetConn().Close()
<-readDone
break outer1
}
}
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
frame := c.rtcpReceivers[trackId].Report()
c.conn.WriteFrame(&gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: gortsplib.StreamTypeRtcp,
Content: frame,
})
}
}
}
checkStreamTicker.Stop()
receiverReportTicker.Stop()
} else {
readDone := make(chan error)
go func() {
for {
req, err := c.conn.ReadRequest()
if err != nil {
readDone <- err
break
}
ok := c.handleRequest(req)
if !ok {
readDone <- nil
break
}
}
}()
checkStreamTicker := time.NewTicker(clientCheckStreamInterval)
receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
outer2:
for {
select {
case err := <-readDone:
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
}
break outer2
case <-checkStreamTicker.C:
for trackId := range c.streamTracks {
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout {
c.log("ERR: stream is dead")
c.conn.NetConn().Close()
<-readDone
break outer2
}
}
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
frame := c.rtcpReceivers[trackId].Report()
c.p.rtcpl.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[trackId].rtcpPort,
},
buf: frame,
}
}
}
}
checkStreamTicker.Stop()
receiverReportTicker.Stop()
}
done = make(chan struct{})
c.p.events <- programEventClientRecordStop{done, c}
<-done
for trackId := range c.streamTracks {
c.rtcpReceivers[trackId].Close()
}
if runOnPublishCmd != nil {
runOnPublishCmd.Process.Signal(os.Interrupt)
runOnPublishCmd.Wait()
}
}