Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1115 lines
28 KiB

package main
import (
"errors"
"fmt"
"io"
"net"
"os"
"os/exec"
"strconv"
"strings"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/sdp/v3"
)
const (
clientCheckStreamInterval = 5 * time.Second
clientReceiverReportInterval = 10 * time.Second
clientTcpReadBufferSize = 128 * 1024
clientTcpWriteBufferSize = 128 * 1024
clientUdpReadBufferSize = 2048
clientUdpWriteBufferSize = 128 * 1024
)
type udpClient struct {
client *client
trackId int
streamType gortsplib.StreamType
}
type udpClientAddr struct {
ip [net.IPv6len]byte // use a fixed-size array to enable the equality operator
port int
}
func makeUdpClientAddr(ip net.IP, port int) udpClientAddr {
ret := udpClientAddr{
port: port,
}
if len(ip) == net.IPv4len {
copy(ret.ip[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix
copy(ret.ip[12:], ip)
} else {
copy(ret.ip[:], ip)
}
return ret
}
type describeRes struct {
sdp []byte
err error
}
type clientTrack struct {
rtpPort int
rtcpPort int
}
type clientEvent interface {
isServerClientEvent()
}
type clientEventFrameTcp struct {
frame *gortsplib.InterleavedFrame
}
func (clientEventFrameTcp) isServerClientEvent() {}
type clientState int
const (
clientStateInitial clientState = iota
clientStateWaitingDescription
clientStateAnnounce
clientStatePrePlay
clientStatePlay
clientStatePreRecord
clientStateRecord
)
func (cs clientState) String() string {
switch cs {
case clientStateInitial:
return "INITIAL"
case clientStateAnnounce:
return "ANNOUNCE"
case clientStatePrePlay:
return "PRE_PLAY"
case clientStatePlay:
return "PLAY"
case clientStatePreRecord:
return "PRE_RECORD"
case clientStateRecord:
return "RECORD"
}
return "UNKNOWN"
}
type client struct {
p *program
conn *gortsplib.ConnServer
state clientState
path *path
authUser string
authPass string
authHelper *gortsplib.AuthServer
authFailures int
streamProtocol gortsplib.StreamProtocol
streamTracks map[int]*clientTrack
rtcpReceivers []*gortsplib.RtcpReceiver
describeRes chan describeRes
events chan clientEvent // only if state = Play and gortsplib.StreamProtocol = TCP
done chan struct{}
}
func newClient(p *program, nconn net.Conn) *client {
c := &client{
p: p,
conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{
Conn: nconn,
ReadTimeout: p.conf.ReadTimeout,
WriteTimeout: p.conf.WriteTimeout,
}),
state: clientStateInitial,
streamTracks: make(map[int]*clientTrack),
done: make(chan struct{}),
}
go c.run()
return c
}
func (c *client) log(format string, args ...interface{}) {
c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
}
func (c *client) isPublisher() {}
func (c *client) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
}
func (c *client) zone() string {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone
}
func (c *client) run() {
var onConnectCmd *exec.Cmd
if c.p.conf.RunOnConnect != "" {
onConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect)
onConnectCmd.Stdout = os.Stdout
onConnectCmd.Stderr = os.Stderr
err := onConnectCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}
outer:
for {
req, err := c.conn.ReadRequest()
if err != nil {
if err != io.EOF {
c.log("ERR: %s", err)
}
break outer
}
ok := c.handleRequest(req)
if !ok {
break outer
}
}
done := make(chan struct{})
c.p.events <- programEventClientClose{done, c}
<-done
c.conn.NetConn().Close() // close socket in case it has not been closed yet
if onConnectCmd != nil {
onConnectCmd.Process.Signal(os.Interrupt)
onConnectCmd.Wait()
}
close(c.done) // close() never blocks
}
func (c *client) writeResError(cseq gortsplib.HeaderValue, code gortsplib.StatusCode, err error) {
c.log("ERR: %s", err)
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: code,
Header: gortsplib.Header{
"CSeq": cseq,
},
})
}
var errAuthCritical = errors.New("auth critical")
var errAuthNotCritical = errors.New("auth not critical")
func (c *client) authenticate(ips []interface{}, user string, pass string, req *gortsplib.Request) error {
// validate ip
err := func() error {
if ips == nil {
return nil
}
ip := c.ip()
if !ipEqualOrInRange(ip, ips) {
c.log("ERR: ip '%s' not allowed", ip)
return errAuthCritical
}
return nil
}()
if err != nil {
return err
}
// validate credentials
err = func() error {
if user == "" {
return nil
}
// reset authHelper every time the credentials change
if c.authHelper == nil || c.authUser != user || c.authPass != pass {
c.authUser = user
c.authPass = pass
c.authHelper = gortsplib.NewAuthServer(user, pass, c.p.conf.authMethodsParsed)
}
err := c.authHelper.ValidateHeader(req.Header["Authorization"], req.Method, req.Url)
if err != nil {
c.authFailures += 1
// vlc with login prompt sends 4 requests:
// 1) without credentials
// 2) with password but without the username
// 3) without credentials
// 4) with password and username
// hence we must allow up to 3 failures
var retErr error
if c.authFailures > 3 {
c.log("ERR: unauthorized: %s", err)
retErr = errAuthCritical
} else if c.authFailures > 1 {
c.log("WARN: unauthorized: %s", err)
retErr = errAuthNotCritical
} else {
retErr = errAuthNotCritical
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusUnauthorized,
Header: gortsplib.Header{
"CSeq": req.Header["CSeq"],
"WWW-Authenticate": c.authHelper.GenerateHeader(),
},
})
return retErr
}
// reset authFailures after a successful login
c.authFailures = 0
return nil
}()
if err != nil {
return err
}
return nil
}
func (c *client) handleRequest(req *gortsplib.Request) bool {
c.log(string(req.Method))
cseq, ok := req.Header["CSeq"]
if !ok || len(cseq) != 1 {
c.writeResError(nil, gortsplib.StatusBadRequest, fmt.Errorf("cseq missing"))
return false
}
path := req.Url.Path
if len(path) < 1 || path[0] != '/' {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path must begin with a slash"))
return false
}
path = path[1:] // strip leading slash
switch req.Method {
case gortsplib.OPTIONS:
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Public": gortsplib.HeaderValue{strings.Join([]string{
string(gortsplib.DESCRIBE),
string(gortsplib.ANNOUNCE),
string(gortsplib.SETUP),
string(gortsplib.PLAY),
string(gortsplib.RECORD),
string(gortsplib.TEARDOWN),
}, ", ")},
},
})
return true
case gortsplib.DESCRIBE:
if c.state != clientStateInitial {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial))
return false
}
confp := c.p.findConfForPathName(path)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
return false
}
err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return false
}
return true
}
c.describeRes = make(chan describeRes)
c.p.events <- programEventClientDescribe{c, path}
describeRes := <-c.describeRes
if describeRes.err != nil {
c.writeResError(cseq, gortsplib.StatusNotFound, describeRes.err)
return false
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Content-Base": gortsplib.HeaderValue{req.Url.String() + "/"},
"Content-Type": gortsplib.HeaderValue{"application/sdp"},
},
Content: describeRes.sdp,
})
return true
case gortsplib.ANNOUNCE:
if c.state != clientStateInitial {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial))
return false
}
if len(path) == 0 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("empty base path"))
return false
}
err := checkPathName(path)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid path name: %s (%s)", err, path))
return false
}
confp := c.p.findConfForPathName(path)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
return false
}
err = c.authenticate(confp.publishIpsParsed, confp.PublishUser, confp.PublishPass, req)
if err != nil {
if err == errAuthCritical {
return false
}
return true
}
ct, ok := req.Header["Content-Type"]
if !ok || len(ct) != 1 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("Content-Type header missing"))
return false
}
if ct[0] != "application/sdp" {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("unsupported Content-Type '%s'", ct))
return false
}
sdpParsed := &sdp.SessionDescription{}
err = sdpParsed.Unmarshal(req.Content)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid SDP: %s", err))
return false
}
if len(sdpParsed.MediaDescriptions) == 0 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("no tracks defined"))
return false
}
var tracks []*gortsplib.Track
for i, media := range sdpParsed.MediaDescriptions {
tracks = append(tracks, &gortsplib.Track{
Id: i,
Media: media,
})
}
sdpParsed, req.Content = sdpForServer(tracks)
res := make(chan error)
c.p.events <- programEventClientAnnounce{res, c, path, req.Content, sdpParsed}
err = <-res
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return false
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
},
})
return true
case gortsplib.SETUP:
th, err := gortsplib.ReadHeaderTransport(req.Header["Transport"])
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header: %s", err))
return false
}
if _, ok := th["multicast"]; ok {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("multicast is not supported"))
return false
}
basePath, controlPath, err := splitPath(path)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return false
}
switch c.state {
// play
case clientStateInitial, clientStatePrePlay:
confp := c.p.findConfForPathName(basePath)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", basePath))
return false
}
err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return false
}
return true
}
if c.path != nil && basePath != c.path.name {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath))
return false
}
if !strings.HasPrefix(controlPath, "trackID=") {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid control path (%s)", controlPath))
return false
}
tmp, err := strconv.ParseInt(controlPath[len("trackID="):], 10, 64)
if err != nil || tmp < 0 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid track id (%s)", controlPath))
return false
}
trackId := int(tmp)
if _, ok := c.streamTracks[trackId]; ok {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("track %d has already been setup", trackId))
return false
}
// play via UDP
if func() bool {
_, ok := th["RTP/AVP"]
if ok {
return true
}
_, ok = th["RTP/AVP/UDP"]
if ok {
return true
}
return false
}() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return false
}
rtpPort, rtcpPort := th.Ports("client_port")
if rtpPort == 0 || rtcpPort == 0 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%v)", req.Header["Transport"]))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupPlay{res, c, basePath, trackId}
err = <-res
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return false
}
c.streamProtocol = gortsplib.StreamProtocolUdp
c.streamTracks[trackId] = &clientTrack{
rtpPort: rtpPort,
rtcpPort: rtcpPort,
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Transport": gortsplib.HeaderValue{strings.Join([]string{
"RTP/AVP/UDP",
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.conf.RtpPort, c.p.conf.RtcpPort),
}, ";")},
"Session": gortsplib.HeaderValue{"12345678"},
},
})
return true
// play via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupPlay{res, c, basePath, trackId}
err = <-res
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return false
}
c.streamProtocol = gortsplib.StreamProtocolTcp
c.streamTracks[trackId] = &clientTrack{
rtpPort: 0,
rtcpPort: 0,
}
interleaved := fmt.Sprintf("%d-%d", ((trackId) * 2), ((trackId)*2)+1)
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Transport": gortsplib.HeaderValue{strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%s", interleaved),
}, ";")},
"Session": gortsplib.HeaderValue{"12345678"},
},
})
return true
} else {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", req.Header["Transport"]))
return false
}
// record
case clientStateAnnounce, clientStatePreRecord:
if strings.ToLower(th.Value("mode")) != "record" {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record"))
return false
}
// after ANNOUNCE, c.path is already set
if basePath != c.path.name {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath))
return false
}
// record via UDP
if func() bool {
_, ok := th["RTP/AVP"]
if ok {
return true
}
_, ok = th["RTP/AVP/UDP"]
if ok {
return true
}
return false
}() {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return false
}
rtpPort, rtcpPort := th.Ports("client_port")
if rtpPort == 0 || rtcpPort == 0 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"]))
return false
}
if len(c.streamTracks) >= len(c.path.publisherSdpParsed.MediaDescriptions) {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupRecord{res, c}
err := <-res
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return false
}
c.streamProtocol = gortsplib.StreamProtocolUdp
c.streamTracks[len(c.streamTracks)] = &clientTrack{
rtpPort: rtpPort,
rtcpPort: rtcpPort,
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Transport": gortsplib.HeaderValue{strings.Join([]string{
"RTP/AVP/UDP",
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.conf.RtpPort, c.p.conf.RtcpPort),
}, ";")},
"Session": gortsplib.HeaderValue{"12345678"},
},
})
return true
// record via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return false
}
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return false
}
interleaved := th.Value("interleaved")
if interleaved == "" {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain the interleaved field"))
return false
}
expInterleaved := fmt.Sprintf("%d-%d", 0+len(c.streamTracks)*2, 1+len(c.streamTracks)*2)
if interleaved != expInterleaved {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("wrong interleaved value, expected '%s', got '%s'", expInterleaved, interleaved))
return false
}
if len(c.streamTracks) >= len(c.path.publisherSdpParsed.MediaDescriptions) {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return false
}
res := make(chan error)
c.p.events <- programEventClientSetupRecord{res, c}
err := <-res
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return false
}
c.streamProtocol = gortsplib.StreamProtocolTcp
c.streamTracks[len(c.streamTracks)] = &clientTrack{
rtpPort: 0,
rtcpPort: 0,
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Transport": gortsplib.HeaderValue{strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%s", interleaved),
}, ";")},
"Session": gortsplib.HeaderValue{"12345678"},
},
})
return true
} else {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", req.Header["Transport"]))
return false
}
default:
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s'", c.state))
return false
}
case gortsplib.PLAY:
if c.state != clientStatePrePlay {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePrePlay))
return false
}
// path can end with a slash, remove it
path = strings.TrimSuffix(path, "/")
if path != c.path.name {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, path))
return false
}
if len(c.streamTracks) == 0 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("no tracks have been setup"))
return false
}
// write response before setting state
// otherwise, in case of TCP connections, RTP packets could be sent
// before the response
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Session": gortsplib.HeaderValue{"12345678"},
},
})
c.runPlay(path)
return false
case gortsplib.RECORD:
if c.state != clientStatePreRecord {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePreRecord))
return false
}
// path can end with a slash, remove it
path = strings.TrimSuffix(path, "/")
if path != c.path.name {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, path))
return false
}
if len(c.streamTracks) != len(c.path.publisherSdpParsed.MediaDescriptions) {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup"))
return false
}
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
"CSeq": cseq,
"Session": gortsplib.HeaderValue{"12345678"},
},
})
c.runRecord(path)
return false
case gortsplib.TEARDOWN:
// close connection silently
return false
default:
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("unhandled method '%s'", req.Method))
return false
}
}
func (c *client) runPlay(path string) {
confp := c.p.findConfForPathName(path)
if c.streamProtocol == gortsplib.StreamProtocolTcp {
c.events = make(chan clientEvent)
}
// start sending frames only after sending the response to the PLAY request
done := make(chan struct{})
c.p.events <- programEventClientPlay{done, c}
<-done
c.log("is receiving on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
return "tracks"
}(), c.streamProtocol)
var onReadCmd *exec.Cmd
if confp.RunOnRead != "" {
onReadCmd = exec.Command("/bin/sh", "-c", confp.RunOnRead)
onReadCmd.Env = append(os.Environ(),
"RTSP_SERVER_PATH="+path,
)
onReadCmd.Stdout = os.Stdout
onReadCmd.Stderr = os.Stderr
err := onReadCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}
if c.streamProtocol == gortsplib.StreamProtocolUdp {
for {
req, err := c.conn.ReadRequest()
if err != nil {
if err != io.EOF {
c.log("ERR: %s", err)
}
break
}
ok := c.handleRequest(req)
if !ok {
break
}
}
done := make(chan struct{})
c.p.events <- programEventClientPlayStop{done, c}
<-done
} else {
readDone := make(chan error)
go func() {
frame := &gortsplib.InterleavedFrame{}
readBuf := make([]byte, clientTcpReadBufferSize)
for {
frame.Content = readBuf
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame, false)
if err != nil {
readDone <- err
break
}
switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame:
// rtcp feedback is handled by gortsplib
case *gortsplib.Request:
ok := c.handleRequest(recvt)
if !ok {
readDone <- nil
break
}
}
}
}()
outer:
for {
select {
case err := <-readDone:
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
}
break outer
case rawEvt := <-c.events:
switch evt := rawEvt.(type) {
case clientEventFrameTcp:
c.conn.WriteFrame(evt.frame)
}
}
}
go func() {
for range c.events {
}
}()
done := make(chan struct{})
c.p.events <- programEventClientPlayStop{done, c}
<-done
close(c.events)
}
if onReadCmd != nil {
onReadCmd.Process.Signal(os.Interrupt)
onReadCmd.Wait()
}
}
func (c *client) runRecord(path string) {
confp := c.p.findConfForPathName(path)
c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks {
c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
}
done := make(chan struct{})
c.p.events <- programEventClientRecord{done, c}
<-done
c.log("is publishing on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
return "tracks"
}(), c.streamProtocol)
var onPublishCmd *exec.Cmd
if confp.RunOnPublish != "" {
onPublishCmd = exec.Command("/bin/sh", "-c", confp.RunOnPublish)
onPublishCmd.Env = append(os.Environ(),
"RTSP_SERVER_PATH="+path,
)
onPublishCmd.Stdout = os.Stdout
onPublishCmd.Stderr = os.Stderr
err := onPublishCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}
if c.streamProtocol == gortsplib.StreamProtocolUdp {
readDone := make(chan error)
go func() {
for {
req, err := c.conn.ReadRequest()
if err != nil {
readDone <- err
break
}
ok := c.handleRequest(req)
if !ok {
readDone <- nil
break
}
}
}()
checkStreamTicker := time.NewTicker(clientCheckStreamInterval)
receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
outer2:
for {
select {
case err := <-readDone:
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
}
break outer2
case <-checkStreamTicker.C:
for trackId := range c.streamTracks {
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout {
c.log("ERR: stream is dead")
c.conn.NetConn().Close()
<-readDone
break outer2
}
}
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
frame := c.rtcpReceivers[trackId].Report()
c.p.serverRtcp.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[trackId].rtcpPort,
},
buf: frame,
}
}
}
}
checkStreamTicker.Stop()
receiverReportTicker.Stop()
} else {
frame := &gortsplib.InterleavedFrame{}
readBuf := newMultiBuffer(3, clientTcpReadBufferSize)
readDone := make(chan error)
go func() {
for {
frame.Content = readBuf.next()
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame, true)
if err != nil {
readDone <- err
break
}
switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame:
if frame.TrackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", frame.TrackId)
readDone <- nil
break
}
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.events <- programEventClientFrameTcp{
c.path,
frame.TrackId,
frame.StreamType,
frame.Content,
}
case *gortsplib.Request:
ok := c.handleRequest(recvt)
if !ok {
readDone <- nil
break
}
}
}
}()
receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
outer1:
for {
select {
case err := <-readDone:
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
}
break outer1
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
frame := c.rtcpReceivers[trackId].Report()
c.conn.WriteFrame(&gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: gortsplib.StreamTypeRtcp,
Content: frame,
})
}
}
}
receiverReportTicker.Stop()
}
done = make(chan struct{})
c.p.events <- programEventClientRecordStop{done, c}
<-done
for trackId := range c.streamTracks {
c.rtcpReceivers[trackId].Close()
}
if onPublishCmd != nil {
onPublishCmd.Process.Signal(os.Interrupt)
onPublishCmd.Wait()
}
}