Browse Source

update gortsplib

pull/31/head
aler9 5 years ago
parent
commit
d6994c4e31
  1. 4
      conf.go
  2. 3
      go.mod
  3. 4
      go.sum
  4. 37
      main.go
  5. 128
      server-client.go
  6. 2
      streamer-udpl.go
  7. 72
      streamer.go
  8. 140
      utils.go

4
conf.go

@ -86,10 +86,10 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
for _, proto := range conf.Protocols { for _, proto := range conf.Protocols {
switch proto { switch proto {
case "udp": case "udp":
conf.protocolsParsed[_STREAM_PROTOCOL_UDP] = struct{}{} conf.protocolsParsed[streamProtocolUdp] = struct{}{}
case "tcp": case "tcp":
conf.protocolsParsed[_STREAM_PROTOCOL_TCP] = struct{}{} conf.protocolsParsed[streamProtocolTcp] = struct{}{}
default: default:
return nil, fmt.Errorf("unsupported protocol: %s", proto) return nil, fmt.Errorf("unsupported protocol: %s", proto)

3
go.mod

@ -5,8 +5,7 @@ go 1.13
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0 github.com/aler9/gortsplib v0.0.0-20200713074216-643dce1dd787
github.com/pion/rtcp v1.2.3
github.com/pion/sdp v1.3.0 github.com/pion/sdp v1.3.0
github.com/stretchr/testify v1.5.1 github.com/stretchr/testify v1.5.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/alecthomas/kingpin.v2 v2.2.6

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0 h1:9Ph5Zl7JkTIEXkot6Q3Acag+9klW9cpwI9navzSE0gs= github.com/aler9/gortsplib v0.0.0-20200713074216-643dce1dd787 h1:6svRLsZW0bSOLSU/P3KjkLDJdU0KQjJkfc+Ttcd0veg=
github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0/go.mod h1:VsK6bzyxOh2ymYRX/U7ZfM4fEsXKXd1ylL73c2eNzUA= github.com/aler9/gortsplib v0.0.0-20200713074216-643dce1dd787/go.mod h1:17dcA4Qak5TLqgun8OR0wnSbFQIg4cvYVSf1nbCt+qU=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA=

37
main.go

@ -24,12 +24,12 @@ type track struct {
type streamProtocol int type streamProtocol int
const ( const (
_STREAM_PROTOCOL_UDP streamProtocol = iota streamProtocolUdp streamProtocol = iota
_STREAM_PROTOCOL_TCP streamProtocolTcp
) )
func (s streamProtocol) String() string { func (s streamProtocol) String() string {
if s == _STREAM_PROTOCOL_UDP { if s == streamProtocolUdp {
return "udp" return "udp"
} }
return "tcp" return "tcp"
@ -292,10 +292,10 @@ outer:
} }
switch evt.client.state { switch evt.client.state {
case _CLIENT_STATE_PLAY: case clientStatePlay:
p.receiverCount -= 1 p.receiverCount -= 1
case _CLIENT_STATE_RECORD: case clientStateRecord:
p.publisherCount -= 1 p.publisherCount -= 1
} }
@ -319,7 +319,7 @@ outer:
} }
evt.client.path = evt.path evt.client.path = evt.path
evt.client.state = _CLIENT_STATE_ANNOUNCE evt.client.state = clientStateAnnounce
p.publishers[evt.path] = evt.client p.publishers[evt.path] = evt.client
evt.res <- nil evt.res <- nil
@ -343,7 +343,7 @@ outer:
rtpPort: evt.rtpPort, rtpPort: evt.rtpPort,
rtcpPort: evt.rtcpPort, rtcpPort: evt.rtcpPort,
}) })
evt.client.state = _CLIENT_STATE_PRE_PLAY evt.client.state = clientStatePrePlay
evt.res <- nil evt.res <- nil
case programEventClientSetupRecord: case programEventClientSetupRecord:
@ -352,7 +352,7 @@ outer:
rtpPort: evt.rtpPort, rtpPort: evt.rtpPort,
rtcpPort: evt.rtcpPort, rtcpPort: evt.rtcpPort,
}) })
evt.client.state = _CLIENT_STATE_PRE_RECORD evt.client.state = clientStatePreRecord
evt.res <- nil evt.res <- nil
case programEventClientPlay1: case programEventClientPlay1:
@ -373,12 +373,12 @@ outer:
case programEventClientPlay2: case programEventClientPlay2:
p.receiverCount += 1 p.receiverCount += 1
evt.client.state = _CLIENT_STATE_PLAY evt.client.state = clientStatePlay
evt.res <- nil evt.res <- nil
case programEventClientRecord: case programEventClientRecord:
p.publisherCount += 1 p.publisherCount += 1
evt.client.state = _CLIENT_STATE_RECORD evt.client.state = clientStateRecord
evt.res <- nil evt.res <- nil
case programEventClientFrameUdp: case programEventClientFrameUdp:
@ -387,7 +387,7 @@ outer:
continue continue
} }
client.rtcpReceivers[trackId].onFrame(evt.streamType, evt.buf) client.RtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf)
p.forwardFrame(client.path, trackId, evt.streamType, evt.buf) p.forwardFrame(client.path, trackId, evt.streamType, evt.buf)
case programEventClientFrameTcp: case programEventClientFrameTcp:
@ -476,8 +476,8 @@ func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamTy
continue continue
} }
if cl.streamProtocol != _STREAM_PROTOCOL_UDP || if cl.streamProtocol != streamProtocolUdp ||
cl.state != _CLIENT_STATE_RECORD || cl.state != clientStateRecord ||
!cl.ip().Equal(addr.IP) { !cl.ip().Equal(addr.IP) {
continue continue
} }
@ -499,8 +499,8 @@ func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamTy
func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) { func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) {
for client := range p.clients { for client := range p.clients {
if client.path == path && client.state == _CLIENT_STATE_PLAY { if client.path == path && client.state == clientStatePlay {
if client.streamProtocol == _STREAM_PROTOCOL_UDP { if client.streamProtocol == streamProtocolUdp {
if streamType == gortsplib.StreamTypeRtp { if streamType == gortsplib.StreamTypeRtp {
p.rtpl.write(&udpAddrBufPair{ p.rtpl.write(&udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
@ -522,16 +522,15 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St
} }
} else { } else {
channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, streamType)
buf := client.writeBuf.swap() buf := client.writeBuf.swap()
buf = buf[:len(frame)] buf = buf[:len(frame)]
copy(buf, frame) copy(buf, frame)
client.events <- serverClientEventFrameTcp{ client.events <- serverClientEventFrameTcp{
frame: &gortsplib.InterleavedFrame{ frame: &gortsplib.InterleavedFrame{
Channel: channel, TrackId: trackId,
Content: buf, StreamType: streamType,
Content: buf,
}, },
} }
} }

128
server-client.go

@ -14,8 +14,8 @@ import (
) )
const ( const (
_CLIENT_CHECK_STREAM_INTERVAL = 5 * time.Second clientCheckStreamInterval = 5 * time.Second
_CLIENT_RECEIVER_REPORT_INTERVAL = 10 * time.Second clientReceiverReportInterval = 10 * time.Second
) )
type serverClientEvent interface { type serverClientEvent interface {
@ -31,32 +31,32 @@ func (serverClientEventFrameTcp) isServerClientEvent() {}
type serverClientState int type serverClientState int
const ( const (
_CLIENT_STATE_STARTING serverClientState = iota clientStateStarting serverClientState = iota
_CLIENT_STATE_ANNOUNCE clientStateAnnounce
_CLIENT_STATE_PRE_PLAY clientStatePrePlay
_CLIENT_STATE_PLAY clientStatePlay
_CLIENT_STATE_PRE_RECORD clientStatePreRecord
_CLIENT_STATE_RECORD clientStateRecord
) )
func (cs serverClientState) String() string { func (cs serverClientState) String() string {
switch cs { switch cs {
case _CLIENT_STATE_STARTING: case clientStateStarting:
return "STARTING" return "STARTING"
case _CLIENT_STATE_ANNOUNCE: case clientStateAnnounce:
return "ANNOUNCE" return "ANNOUNCE"
case _CLIENT_STATE_PRE_PLAY: case clientStatePrePlay:
return "PRE_PLAY" return "PRE_PLAY"
case _CLIENT_STATE_PLAY: case clientStatePlay:
return "PLAY" return "PLAY"
case _CLIENT_STATE_PRE_RECORD: case clientStatePreRecord:
return "PRE_RECORD" return "PRE_RECORD"
case _CLIENT_STATE_RECORD: case clientStateRecord:
return "RECORD" return "RECORD"
} }
return "UNKNOWN" return "UNKNOWN"
@ -75,7 +75,7 @@ type serverClient struct {
streamSdpParsed *sdp.SessionDescription // only if publisher streamSdpParsed *sdp.SessionDescription // only if publisher
streamProtocol streamProtocol streamProtocol streamProtocol
streamTracks []*track streamTracks []*track
rtcpReceivers []*rtcpReceiver RtcpReceivers []*gortsplib.RtcpReceiver
readBuf *doubleBuffer readBuf *doubleBuffer
writeBuf *doubleBuffer writeBuf *doubleBuffer
@ -91,7 +91,7 @@ func newServerClient(p *program, nconn net.Conn) *serverClient {
ReadTimeout: p.conf.ReadTimeout, ReadTimeout: p.conf.ReadTimeout,
WriteTimeout: p.conf.WriteTimeout, WriteTimeout: p.conf.WriteTimeout,
}), }),
state: _CLIENT_STATE_STARTING, state: clientStateStarting,
readBuf: newDoubleBuffer(512 * 1024), readBuf: newDoubleBuffer(512 * 1024),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -113,7 +113,7 @@ func (c *serverClient) zone() string {
} }
func (c *serverClient) publisherIsReady() bool { func (c *serverClient) publisherIsReady() bool {
return c.state == _CLIENT_STATE_RECORD return c.state == clientStateRecord
} }
func (c *serverClient) publisherSdpText() []byte { func (c *serverClient) publisherSdpText() []byte {
@ -136,13 +136,13 @@ func (c *serverClient) run() {
outer: outer:
for { for {
switch c.state { switch c.state {
case _CLIENT_STATE_PLAY: case clientStatePlay:
ok := c.runPlay() ok := c.runPlay()
if !ok { if !ok {
break outer break outer
} }
case _CLIENT_STATE_RECORD: case clientStateRecord:
ok := c.runRecord() ok := c.runRecord()
if !ok { if !ok {
break outer break outer
@ -210,7 +210,7 @@ outer:
} }
func (c *serverClient) runPlay() bool { func (c *serverClient) runPlay() bool {
if c.streamProtocol == _STREAM_PROTOCOL_TCP { if c.streamProtocol == streamProtocolTcp {
readDone := make(chan error) readDone := make(chan error)
go func() { go func() {
buf := make([]byte, 2048) buf := make([]byte, 2048)
@ -276,7 +276,7 @@ func (c *serverClient) runPlay() bool {
} }
func (c *serverClient) runRecord() bool { func (c *serverClient) runRecord() bool {
if c.streamProtocol == _STREAM_PROTOCOL_TCP { if c.streamProtocol == streamProtocolTcp {
frame := &gortsplib.InterleavedFrame{} frame := &gortsplib.InterleavedFrame{}
readDone := make(chan error) readDone := make(chan error)
@ -292,18 +292,17 @@ func (c *serverClient) runRecord() bool {
switch recvt := recv.(type) { switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame: case *gortsplib.InterleavedFrame:
trackId, streamType := gortsplib.ConvChannelToTrackIdAndStreamType(frame.Channel) if frame.TrackId >= len(c.streamTracks) {
if trackId >= len(c.streamTracks) { c.log("ERR: invalid track id '%d'", frame.TrackId)
c.log("ERR: invalid track id '%d'", trackId)
readDone <- nil readDone <- nil
break break
} }
c.rtcpReceivers[trackId].onFrame(streamType, frame.Content) c.RtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.events <- programEventClientFrameTcp{ c.p.events <- programEventClientFrameTcp{
c.path, c.path,
trackId, frame.TrackId,
streamType, frame.StreamType,
frame.Content, frame.Content,
} }
@ -317,8 +316,8 @@ func (c *serverClient) runRecord() bool {
} }
}() }()
checkStreamTicker := time.NewTicker(_CLIENT_CHECK_STREAM_INTERVAL) checkStreamTicker := time.NewTicker(clientCheckStreamInterval)
receiverReportTicker := time.NewTicker(_CLIENT_RECEIVER_REPORT_INTERVAL) receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
outer1: outer1:
for { for {
@ -331,7 +330,7 @@ func (c *serverClient) runRecord() bool {
case <-checkStreamTicker.C: case <-checkStreamTicker.C:
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
if time.Since(c.rtcpReceivers[trackId].lastFrameTime()) >= c.p.conf.StreamDeadAfter { if time.Since(c.RtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter {
c.log("ERR: stream is dead") c.log("ERR: stream is dead")
c.conn.NetConn().Close() c.conn.NetConn().Close()
<-readDone <-readDone
@ -341,12 +340,11 @@ func (c *serverClient) runRecord() bool {
case <-receiverReportTicker.C: case <-receiverReportTicker.C:
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, gortsplib.StreamTypeRtcp) frame := c.RtcpReceivers[trackId].Report()
frame := c.rtcpReceivers[trackId].report()
c.conn.WriteFrame(&gortsplib.InterleavedFrame{ c.conn.WriteFrame(&gortsplib.InterleavedFrame{
Channel: channel, TrackId: trackId,
Content: frame, StreamType: gortsplib.StreamTypeRtcp,
Content: frame,
}) })
} }
} }
@ -373,8 +371,8 @@ func (c *serverClient) runRecord() bool {
} }
}() }()
checkStreamTicker := time.NewTicker(_CLIENT_CHECK_STREAM_INTERVAL) checkStreamTicker := time.NewTicker(clientCheckStreamInterval)
receiverReportTicker := time.NewTicker(_CLIENT_RECEIVER_REPORT_INTERVAL) receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
outer2: outer2:
for { for {
@ -387,7 +385,7 @@ func (c *serverClient) runRecord() bool {
case <-checkStreamTicker.C: case <-checkStreamTicker.C:
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
if time.Since(c.rtcpReceivers[trackId].lastFrameTime()) >= c.p.conf.StreamDeadAfter { if time.Since(c.RtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter {
c.log("ERR: stream is dead") c.log("ERR: stream is dead")
c.conn.NetConn().Close() c.conn.NetConn().Close()
<-readDone <-readDone
@ -397,7 +395,7 @@ func (c *serverClient) runRecord() bool {
case <-receiverReportTicker.C: case <-receiverReportTicker.C:
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
frame := c.rtcpReceivers[trackId].report() frame := c.RtcpReceivers[trackId].Report()
c.p.rtcpl.writeChan <- &udpAddrBufPair{ c.p.rtcpl.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
IP: c.ip(), IP: c.ip(),
@ -419,7 +417,7 @@ func (c *serverClient) runRecord() bool {
<-done <-done
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
c.rtcpReceivers[trackId].close() c.RtcpReceivers[trackId].Close()
} }
return false return false
@ -584,9 +582,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
return nil return nil
case gortsplib.DESCRIBE: case gortsplib.DESCRIBE:
if c.state != _CLIENT_STATE_STARTING { if c.state != clientStateStarting {
c.writeResError(req, gortsplib.StatusBadRequest, c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, _CLIENT_STATE_STARTING)) fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting))
return errClientTerminate return errClientTerminate
} }
@ -625,9 +623,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
return nil return nil
case gortsplib.ANNOUNCE: case gortsplib.ANNOUNCE:
if c.state != _CLIENT_STATE_STARTING { if c.state != clientStateStarting {
c.writeResError(req, gortsplib.StatusBadRequest, c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, _CLIENT_STATE_STARTING)) fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting))
return errClientTerminate return errClientTerminate
} }
@ -709,7 +707,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
switch c.state { switch c.state {
// play // play
case _CLIENT_STATE_STARTING, _CLIENT_STATE_PRE_PLAY: case clientStateStarting, clientStatePrePlay:
pconf := c.findConfForPath(path) pconf := c.findConfForPath(path)
if pconf == nil { if pconf == nil {
c.writeResError(req, gortsplib.StatusBadRequest, c.writeResError(req, gortsplib.StatusBadRequest,
@ -737,7 +735,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
} }
return false return false
}() { }() {
if _, ok := c.p.conf.protocolsParsed[_STREAM_PROTOCOL_UDP]; !ok { if _, ok := c.p.conf.protocolsParsed[streamProtocolUdp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return errClientTerminate return errClientTerminate
} }
@ -753,13 +751,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
return errClientTerminate return errClientTerminate
} }
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolUdp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return errClientTerminate return errClientTerminate
} }
res := make(chan error) res := make(chan error)
c.p.events <- programEventClientSetupPlay{res, c, path, _STREAM_PROTOCOL_UDP, rtpPort, rtcpPort} c.p.events <- programEventClientSetupPlay{res, c, path, streamProtocolUdp, rtpPort, rtcpPort}
err = <-res err = <-res
if err != nil { if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err) c.writeResError(req, gortsplib.StatusBadRequest, err)
@ -783,7 +781,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
// play via TCP // play via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok { } else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.conf.protocolsParsed[_STREAM_PROTOCOL_TCP]; !ok { if _, ok := c.p.conf.protocolsParsed[streamProtocolTcp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return errClientTerminate return errClientTerminate
} }
@ -793,13 +791,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
return errClientTerminate return errClientTerminate
} }
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolTcp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
return errClientTerminate return errClientTerminate
} }
res := make(chan error) res := make(chan error)
c.p.events <- programEventClientSetupPlay{res, c, path, _STREAM_PROTOCOL_TCP, 0, 0} c.p.events <- programEventClientSetupPlay{res, c, path, streamProtocolTcp, 0, 0}
err = <-res err = <-res
if err != nil { if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err) c.writeResError(req, gortsplib.StatusBadRequest, err)
@ -828,7 +826,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
} }
// record // record
case _CLIENT_STATE_ANNOUNCE, _CLIENT_STATE_PRE_RECORD: case clientStateAnnounce, clientStatePreRecord:
if _, ok := th["mode=record"]; !ok { if _, ok := th["mode=record"]; !ok {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record")) c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record"))
return errClientTerminate return errClientTerminate
@ -852,7 +850,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
} }
return false return false
}() { }() {
if _, ok := c.p.conf.protocolsParsed[_STREAM_PROTOCOL_UDP]; !ok { if _, ok := c.p.conf.protocolsParsed[streamProtocolUdp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
return errClientTerminate return errClientTerminate
} }
@ -863,7 +861,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
return errClientTerminate return errClientTerminate
} }
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolUdp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return errClientTerminate return errClientTerminate
} }
@ -874,7 +872,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
} }
res := make(chan error) res := make(chan error)
c.p.events <- programEventClientSetupRecord{res, c, _STREAM_PROTOCOL_UDP, rtpPort, rtcpPort} c.p.events <- programEventClientSetupRecord{res, c, streamProtocolUdp, rtpPort, rtcpPort}
err := <-res err := <-res
if err != nil { if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err) c.writeResError(req, gortsplib.StatusBadRequest, err)
@ -898,12 +896,12 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
// record via TCP // record via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok { } else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.conf.protocolsParsed[_STREAM_PROTOCOL_TCP]; !ok { if _, ok := c.p.conf.protocolsParsed[streamProtocolTcp]; !ok {
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
return errClientTerminate return errClientTerminate
} }
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolTcp {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
return errClientTerminate return errClientTerminate
} }
@ -926,7 +924,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
} }
res := make(chan error) res := make(chan error)
c.p.events <- programEventClientSetupRecord{res, c, _STREAM_PROTOCOL_TCP, 0, 0} c.p.events <- programEventClientSetupRecord{res, c, streamProtocolTcp, 0, 0}
err := <-res err := <-res
if err != nil { if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err) c.writeResError(req, gortsplib.StatusBadRequest, err)
@ -958,9 +956,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
} }
case gortsplib.PLAY: case gortsplib.PLAY:
if c.state != _CLIENT_STATE_PRE_PLAY { if c.state != clientStatePrePlay {
c.writeResError(req, gortsplib.StatusBadRequest, c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, _CLIENT_STATE_PRE_PLAY)) fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePrePlay))
return errClientTerminate return errClientTerminate
} }
@ -989,7 +987,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
}, },
}) })
if c.streamProtocol == _STREAM_PROTOCOL_TCP { if c.streamProtocol == streamProtocolTcp {
c.writeBuf = newDoubleBuffer(2048) c.writeBuf = newDoubleBuffer(2048)
c.events = make(chan serverClientEvent) c.events = make(chan serverClientEvent)
} }
@ -1009,9 +1007,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
return errClientChangeRunMode return errClientChangeRunMode
case gortsplib.RECORD: case gortsplib.RECORD:
if c.state != _CLIENT_STATE_PRE_RECORD { if c.state != clientStatePreRecord {
c.writeResError(req, gortsplib.StatusBadRequest, c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, _CLIENT_STATE_PRE_RECORD)) fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePreRecord))
return errClientTerminate return errClientTerminate
} }
@ -1033,9 +1031,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
}, },
}) })
c.rtcpReceivers = make([]*rtcpReceiver, len(c.streamTracks)) c.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
c.rtcpReceivers[trackId] = newRtcpReceiver() c.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
} }
res := make(chan error) res := make(chan error)

2
streamer-udpl.go

@ -80,7 +80,7 @@ func (l *streamerUdpListener) run() {
continue continue
} }
l.streamer.rtcpReceivers[l.trackId].onFrame(l.streamType, buf[:n]) l.streamer.RtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n])
l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.streamType, buf[:n]} l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.streamType, buf[:n]}
} }

72
streamer.go

@ -12,10 +12,10 @@ import (
) )
const ( const (
_STREAMER_RETRY_INTERVAL = 5 * time.Second streamerRetryInterval = 5 * time.Second
_STREAMER_CHECK_STREAM_INTERVAL = 5 * time.Second streamerCheckStreamInterval = 5 * time.Second
_STREAMER_KEEPALIVE_INTERVAL = 60 * time.Second streamerKeepaliveInterval = 60 * time.Second
_STREAMER_RECEIVER_REPORT_INTERVAL = 10 * time.Second streamerReceiverReportInterval = 10 * time.Second
) )
type streamerUdpListenerPair struct { type streamerUdpListenerPair struct {
@ -32,7 +32,7 @@ type streamer struct {
clientSdpParsed *sdp.SessionDescription clientSdpParsed *sdp.SessionDescription
serverSdpText []byte serverSdpText []byte
serverSdpParsed *sdp.SessionDescription serverSdpParsed *sdp.SessionDescription
rtcpReceivers []*rtcpReceiver RtcpReceivers []*gortsplib.RtcpReceiver
readBuf *doubleBuffer readBuf *doubleBuffer
terminate chan struct{} terminate chan struct{}
@ -62,10 +62,10 @@ func newStreamer(p *program, path string, source string, sourceProtocol string)
proto, err := func() (streamProtocol, error) { proto, err := func() (streamProtocol, error) {
switch sourceProtocol { switch sourceProtocol {
case "udp": case "udp":
return _STREAM_PROTOCOL_UDP, nil return streamProtocolUdp, nil
case "tcp": case "tcp":
return _STREAM_PROTOCOL_TCP, nil return streamProtocolTcp, nil
} }
return streamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol) return streamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol)
}() }()
@ -109,7 +109,7 @@ func (s *streamer) run() {
break break
} }
t := time.NewTimer(_STREAMER_RETRY_INTERVAL) t := time.NewTimer(streamerRetryInterval)
select { select {
case <-s.terminate: case <-s.terminate:
break break
@ -143,15 +143,11 @@ func (s *streamer) do() bool {
} }
defer nconn.Close() defer nconn.Close()
conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{ conn := gortsplib.NewConnClient(gortsplib.ConnClientConf{
Conn: nconn, Conn: nconn,
ReadTimeout: s.p.conf.ReadTimeout, ReadTimeout: s.p.conf.ReadTimeout,
WriteTimeout: s.p.conf.WriteTimeout, WriteTimeout: s.p.conf.WriteTimeout,
}) })
if err != nil {
s.log("ERR: %s", err)
return true
}
_, err = conn.Options(s.u) _, err = conn.Options(s.u)
if err != nil { if err != nil {
@ -172,7 +168,7 @@ func (s *streamer) do() bool {
s.serverSdpText = serverSdpText s.serverSdpText = serverSdpText
s.serverSdpParsed = serverSdpParsed s.serverSdpParsed = serverSdpParsed
if s.proto == _STREAM_PROTOCOL_UDP { if s.proto == streamProtocolUdp {
return s.runUdp(conn) return s.runUdp(conn)
} else { } else {
return s.runTcp(conn) return s.runTcp(conn)
@ -244,9 +240,9 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
return true return true
} }
s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.MediaDescriptions)) s.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.clientSdpParsed.MediaDescriptions))
for trackId := range s.clientSdpParsed.MediaDescriptions { for trackId := range s.clientSdpParsed.MediaDescriptions {
s.rtcpReceivers[trackId] = newRtcpReceiver() s.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
} }
for _, pair := range streamerUdpListenerPairs { for _, pair := range streamerUdpListenerPairs {
@ -254,9 +250,9 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
pair.rtcpl.start() pair.rtcpl.start()
} }
sendKeepaliveTicker := time.NewTicker(_STREAMER_KEEPALIVE_INTERVAL) sendKeepaliveTicker := time.NewTicker(streamerKeepaliveInterval)
checkStreamTicker := time.NewTicker(_STREAMER_CHECK_STREAM_INTERVAL) checkStreamTicker := time.NewTicker(streamerCheckStreamInterval)
receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL) receiverReportTicker := time.NewTicker(streamerReceiverReportInterval)
s.p.events <- programEventStreamerReady{s} s.p.events <- programEventStreamerReady{s}
@ -270,14 +266,7 @@ outer:
break outer break outer
case <-sendKeepaliveTicker.C: case <-sendKeepaliveTicker.C:
_, err = conn.Do(&gortsplib.Request{ _, err := conn.Options(s.u)
Method: gortsplib.OPTIONS,
Url: &url.URL{
Scheme: "rtsp",
Host: s.u.Host,
Path: "/",
},
})
if err != nil { if err != nil {
s.log("ERR: %s", err) s.log("ERR: %s", err)
ret = true ret = true
@ -286,7 +275,7 @@ outer:
case <-checkStreamTicker.C: case <-checkStreamTicker.C:
for trackId := range s.clientSdpParsed.MediaDescriptions { for trackId := range s.clientSdpParsed.MediaDescriptions {
if time.Since(s.rtcpReceivers[trackId].lastFrameTime()) >= s.p.conf.StreamDeadAfter { if time.Since(s.RtcpReceivers[trackId].LastFrameTime()) >= s.p.conf.StreamDeadAfter {
s.log("ERR: stream is dead") s.log("ERR: stream is dead")
ret = true ret = true
break outer break outer
@ -295,7 +284,7 @@ outer:
case <-receiverReportTicker.C: case <-receiverReportTicker.C:
for trackId := range s.clientSdpParsed.MediaDescriptions { for trackId := range s.clientSdpParsed.MediaDescriptions {
frame := s.rtcpReceivers[trackId].report() frame := s.RtcpReceivers[trackId].Report()
streamerUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{ streamerUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,
@ -320,7 +309,7 @@ outer:
} }
for trackId := range s.clientSdpParsed.MediaDescriptions { for trackId := range s.clientSdpParsed.MediaDescriptions {
s.rtcpReceivers[trackId].close() s.RtcpReceivers[trackId].Close()
} }
return ret return ret
@ -341,9 +330,9 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool {
return true return true
} }
s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.MediaDescriptions)) s.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.clientSdpParsed.MediaDescriptions))
for trackId := range s.clientSdpParsed.MediaDescriptions { for trackId := range s.clientSdpParsed.MediaDescriptions {
s.rtcpReceivers[trackId] = newRtcpReceiver() s.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
} }
s.p.events <- programEventStreamerReady{s} s.p.events <- programEventStreamerReady{s}
@ -363,16 +352,14 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool {
break break
} }
trackId, streamType := gortsplib.ConvChannelToTrackIdAndStreamType(frame.Channel) s.RtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
s.p.events <- programEventStreamerFrame{s, frame.TrackId, frame.StreamType, frame.Content}
s.rtcpReceivers[trackId].onFrame(streamType, frame.Content)
s.p.events <- programEventStreamerFrame{s, trackId, streamType, frame.Content}
} }
}() }()
// a ticker to check the stream is not needed since there's already a deadline // a ticker to check the stream is not needed since there's already a deadline
// on the RTSP reads // on the RTSP reads
receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL) receiverReportTicker := time.NewTicker(streamerReceiverReportInterval)
var ret bool var ret bool
@ -389,13 +376,12 @@ outer:
case <-receiverReportTicker.C: case <-receiverReportTicker.C:
for trackId := range s.clientSdpParsed.MediaDescriptions { for trackId := range s.clientSdpParsed.MediaDescriptions {
frame := s.rtcpReceivers[trackId].report() frame := s.RtcpReceivers[trackId].Report()
channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, gortsplib.StreamTypeRtcp)
conn.WriteFrame(&gortsplib.InterleavedFrame{ conn.WriteFrame(&gortsplib.InterleavedFrame{
Channel: channel, TrackId: trackId,
Content: frame, StreamType: gortsplib.StreamTypeRtcp,
Content: frame,
}) })
} }
} }
@ -406,7 +392,7 @@ outer:
s.p.events <- programEventStreamerNotReady{s} s.p.events <- programEventStreamerNotReady{s}
for trackId := range s.clientSdpParsed.MediaDescriptions { for trackId := range s.clientSdpParsed.MediaDescriptions {
s.rtcpReceivers[trackId].close() s.RtcpReceivers[trackId].Close()
} }
return ret return ret

140
utils.go

@ -2,13 +2,9 @@ package main
import ( import (
"fmt" "fmt"
"math/rand"
"net" "net"
"strconv" "strconv"
"time"
"github.com/aler9/gortsplib"
"github.com/pion/rtcp"
"github.com/pion/sdp" "github.com/pion/sdp"
) )
@ -77,142 +73,6 @@ func (db *doubleBuffer) swap() []byte {
return ret return ret
} }
type rtcpReceiverEvent interface {
isRtpReceiverEvent()
}
type rtcpReceiverEventFrameRtp struct {
sequenceNumber uint16
}
func (rtcpReceiverEventFrameRtp) isRtpReceiverEvent() {}
type rtcpReceiverEventFrameRtcp struct {
ssrc uint32
ntpTimeMiddle uint32
}
func (rtcpReceiverEventFrameRtcp) isRtpReceiverEvent() {}
type rtcpReceiverEventLastFrameTime struct {
res chan time.Time
}
func (rtcpReceiverEventLastFrameTime) isRtpReceiverEvent() {}
type rtcpReceiverEventReport struct {
res chan []byte
}
func (rtcpReceiverEventReport) isRtpReceiverEvent() {}
type rtcpReceiverEventTerminate struct{}
func (rtcpReceiverEventTerminate) isRtpReceiverEvent() {}
type rtcpReceiver struct {
events chan rtcpReceiverEvent
done chan struct{}
}
func newRtcpReceiver() *rtcpReceiver {
rr := &rtcpReceiver{
events: make(chan rtcpReceiverEvent),
done: make(chan struct{}),
}
go rr.run()
return rr
}
func (rr *rtcpReceiver) run() {
lastFrameTime := time.Now()
publisherSSRC := uint32(0)
receiverSSRC := rand.Uint32()
sequenceNumberCycles := uint16(0)
lastSequenceNumber := uint16(0)
lastSenderReport := uint32(0)
outer:
for rawEvt := range rr.events {
switch evt := rawEvt.(type) {
case rtcpReceiverEventFrameRtp:
if evt.sequenceNumber < lastSequenceNumber {
sequenceNumberCycles += 1
}
lastSequenceNumber = evt.sequenceNumber
lastFrameTime = time.Now()
case rtcpReceiverEventFrameRtcp:
publisherSSRC = evt.ssrc
lastSenderReport = evt.ntpTimeMiddle
case rtcpReceiverEventLastFrameTime:
evt.res <- lastFrameTime
case rtcpReceiverEventReport:
rr := &rtcp.ReceiverReport{
SSRC: receiverSSRC,
Reports: []rtcp.ReceptionReport{
{
SSRC: publisherSSRC,
LastSequenceNumber: uint32(sequenceNumberCycles)<<8 | uint32(lastSequenceNumber),
LastSenderReport: lastSenderReport,
},
},
}
frame, _ := rr.Marshal()
evt.res <- frame
case rtcpReceiverEventTerminate:
break outer
}
}
close(rr.events)
close(rr.done)
}
func (rr *rtcpReceiver) close() {
rr.events <- rtcpReceiverEventTerminate{}
<-rr.done
}
func (rr *rtcpReceiver) onFrame(streamType gortsplib.StreamType, buf []byte) {
if streamType == gortsplib.StreamTypeRtp {
// extract sequence number of first frame
if len(buf) >= 3 {
sequenceNumber := uint16(uint16(buf[2])<<8 | uint16(buf[1]))
rr.events <- rtcpReceiverEventFrameRtp{sequenceNumber}
}
} else {
frames, err := rtcp.Unmarshal(buf)
if err == nil {
for _, frame := range frames {
if senderReport, ok := (frame).(*rtcp.SenderReport); ok {
rr.events <- rtcpReceiverEventFrameRtcp{
senderReport.SSRC,
uint32(senderReport.NTPTime >> 16),
}
}
}
}
}
}
func (rr *rtcpReceiver) lastFrameTime() time.Time {
res := make(chan time.Time)
rr.events <- rtcpReceiverEventLastFrameTime{res}
return <-res
}
func (rr *rtcpReceiver) report() []byte {
res := make(chan []byte)
rr.events <- rtcpReceiverEventReport{res}
return <-res
}
func sdpForServer(sin *sdp.SessionDescription) (*sdp.SessionDescription, []byte) { func sdpForServer(sin *sdp.SessionDescription) (*sdp.SessionDescription, []byte) {
sout := &sdp.SessionDescription{ sout := &sdp.SessionDescription{
SessionName: "Stream", SessionName: "Stream",

Loading…
Cancel
Save