Browse Source

implement rtcp receiver reports; fix #22

pull/31/head
aler9 5 years ago
parent
commit
03f020ec33
  1. 8
      Makefile
  2. 14
      conf.yml
  3. 3
      go.mod
  4. 4
      go.sum
  5. 62
      main.go
  6. 273
      server-client.go
  7. 3
      server-udpl.go
  8. 31
      streamer-udpl.go
  9. 231
      streamer.go
  10. 140
      utils.go

8
Makefile

@ -65,10 +65,18 @@ endef
export DOCKERFILE_RUN export DOCKERFILE_RUN
define CONFIG_RUN define CONFIG_RUN
#rtspPort: 8555
#rtpPort: 8002
#rtcpPort: 8003
paths: paths:
all: all:
readUser: test readUser: test
readPass: tast readPass: tast
proxied:
source: rtsp://192.168.10.1/unicast
sourceProtocol: udp
endef endef
export CONFIG_RUN export CONFIG_RUN

14
conf.yml

@ -1,20 +1,20 @@
# supported stream protocols (the handshake is always performed with TCP) # supported stream protocols (the handshake is always performed with TCP)
protocols: [udp, tcp] protocols: [udp, tcp]
# port of the TCP rtsp listener # port of the TCP RTSP listener
rtspPort: 8554 rtspPort: 8554
# port of the UDP rtp listener # port of the UDP RTP listener
rtpPort: 8000 rtpPort: 8000
# port of the UDP rtcp listener # port of the UDP RTCP listener
rtcpPort: 8001 rtcpPort: 8001
# timeout of read operations
readTimeout: 5s
# timeout of write operations
writeTimeout: 5s
# script to run when a client connects # script to run when a client connects
preScript: preScript:
# script to run when a client disconnects # script to run when a client disconnects
postScript: postScript:
# timeout of read operations
readTimeout: 5s
# timeout of write operations
writeTimeout: 5s
# enable pprof on port 9999 to monitor performance # enable pprof on port 9999 to monitor performance
pprof: false pprof: false

3
go.mod

@ -6,7 +6,8 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200710091324-fb7d7b008e68 github.com/aler9/gortsplib v0.0.0-20200710091324-fb7d7b008e68
github.com/stretchr/testify v1.4.0 github.com/pion/rtcp v1.2.3
github.com/stretchr/testify v1.5.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.2.2 gopkg.in/yaml.v2 v2.2.2
gortc.io/sdp v0.18.2 gortc.io/sdp v0.18.2

4
go.sum

@ -6,6 +6,8 @@ github.com/aler9/gortsplib v0.0.0-20200710091324-fb7d7b008e68 h1:apyYugiG/luHl0X
github.com/aler9/gortsplib v0.0.0-20200710091324-fb7d7b008e68/go.mod h1:sL64nUkmrTVhlT/GCaxRXyI2Xk7m8XSdw5Uv8xKGPdc= github.com/aler9/gortsplib v0.0.0-20200710091324-fb7d7b008e68/go.mod h1:sL64nUkmrTVhlT/GCaxRXyI2Xk7m8XSdw5Uv8xKGPdc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA=
github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@ -13,6 +15,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

62
main.go

@ -11,6 +11,7 @@ import (
"regexp" "regexp"
"time" "time"
"github.com/aler9/gortsplib"
"gopkg.in/alecthomas/kingpin.v2" "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"gortc.io/sdp" "gortc.io/sdp"
@ -223,9 +224,9 @@ type publisher interface {
type program struct { type program struct {
conf *conf conf *conf
protocols map[streamProtocol]struct{} protocols map[streamProtocol]struct{}
tcpl *serverTcpListener rtspl *serverTcpListener
udplRtp *serverUdpListener rtpl *serverUdpListener
udplRtcp *serverUdpListener rtcpl *serverUdpListener
clients map[*serverClient]struct{} clients map[*serverClient]struct{}
streamers []*streamer streamers []*streamer
publishers map[string]publisher publishers map[string]publisher
@ -387,24 +388,24 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) {
http.DefaultServeMux = http.NewServeMux() http.DefaultServeMux = http.NewServeMux()
} }
p.udplRtp, err = newServerUdpListener(p, conf.RtpPort, _TRACK_FLOW_TYPE_RTP) p.rtpl, err = newServerUdpListener(p, conf.RtpPort, _TRACK_FLOW_TYPE_RTP)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.udplRtcp, err = newServerUdpListener(p, conf.RtcpPort, _TRACK_FLOW_TYPE_RTCP) p.rtcpl, err = newServerUdpListener(p, conf.RtcpPort, _TRACK_FLOW_TYPE_RTCP)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.tcpl, err = newServerTcpListener(p) p.rtspl, err = newServerTcpListener(p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
go p.udplRtp.run() go p.rtpl.run()
go p.udplRtcp.run() go p.rtcpl.run()
go p.tcpl.run() go p.rtspl.run()
for _, s := range p.streamers { for _, s := range p.streamers {
go s.run() go s.run()
} }
@ -548,7 +549,7 @@ outer:
continue continue
} }
client.udpLastFrameTime = time.Now() client.rtcpReceivers[trackId].onFrame(evt.trackFlowType, evt.buf)
p.forwardFrame(client.path, trackId, evt.trackFlowType, evt.buf) p.forwardFrame(client.path, trackId, evt.trackFlowType, evt.buf)
case programEventClientFrameTcp: case programEventClientFrameTcp:
@ -613,9 +614,9 @@ outer:
s.close() s.close()
} }
p.tcpl.close() p.rtspl.close()
p.udplRtcp.close() p.rtcpl.close()
p.udplRtp.close() p.rtpl.close()
for c := range p.clients { for c := range p.clients {
c.close() c.close()
@ -659,31 +660,42 @@ func (p *program) findPublisher(addr *net.UDPAddr, trackFlowType trackFlowType)
} }
func (p *program) forwardFrame(path string, trackId int, trackFlowType trackFlowType, frame []byte) { func (p *program) forwardFrame(path string, trackId int, trackFlowType trackFlowType, frame []byte) {
for c := range p.clients { for client := range p.clients {
if c.path == path && c.state == _CLIENT_STATE_PLAY { if client.path == path && client.state == _CLIENT_STATE_PLAY {
if c.streamProtocol == _STREAM_PROTOCOL_UDP { if client.streamProtocol == _STREAM_PROTOCOL_UDP {
if trackFlowType == _TRACK_FLOW_TYPE_RTP { if trackFlowType == _TRACK_FLOW_TYPE_RTP {
p.udplRtp.write(&udpAddrBufPair{ p.rtpl.write(&udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
IP: c.ip(), IP: client.ip(),
Zone: c.zone(), Zone: client.zone(),
Port: c.streamTracks[trackId].rtpPort, Port: client.streamTracks[trackId].rtpPort,
}, },
buf: frame, buf: frame,
}) })
} else { } else {
p.udplRtcp.write(&udpAddrBufPair{ p.rtcpl.write(&udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
IP: c.ip(), IP: client.ip(),
Zone: c.zone(), Zone: client.zone(),
Port: c.streamTracks[trackId].rtcpPort, Port: client.streamTracks[trackId].rtcpPort,
}, },
buf: frame, buf: frame,
}) })
} }
} else { } else {
c.writeFrame(trackFlowTypeToInterleavedChannel(trackId, trackFlowType), frame) channel := trackFlowTypeToInterleavedChannel(trackId, trackFlowType)
buf := client.writeBuf.swap()
buf = buf[:len(frame)]
copy(buf, frame)
client.events <- serverClientEventFrameTcp{
frame: &gortsplib.InterleavedFrame{
Channel: channel,
Content: buf,
},
}
} }
} }
} }

273
server-client.go

@ -14,10 +14,21 @@ import (
) )
const ( const (
_UDP_CHECK_STREAM_INTERVAL = 5 * time.Second _CLIENT_CHECK_STREAM_INTERVAL = 5 * time.Second
_UDP_STREAM_DEAD_AFTER = 10 * time.Second _CLIENT_STREAM_DEAD_AFTER = 15 * time.Second
_CLIENT_RECEIVER_REPORT_INTERVAL = 10 * time.Second
) )
type serverClientEvent interface {
isServerClientEvent()
}
type serverClientEventFrameTcp struct {
frame *gortsplib.InterleavedFrame
}
func (serverClientEventFrameTcp) isServerClientEvent() {}
type serverClientState int type serverClientState int
const ( const (
@ -53,24 +64,24 @@ func (cs serverClientState) String() string {
} }
type serverClient struct { type serverClient struct {
p *program p *program
conn *gortsplib.ConnServer conn *gortsplib.ConnServer
state serverClientState state serverClientState
path string path string
authUser string authUser string
authPass string authPass string
authHelper *gortsplib.AuthServer authHelper *gortsplib.AuthServer
authFailures int authFailures int
streamSdpText []byte // only if publisher streamSdpText []byte // only if publisher
streamSdpParsed *sdp.Message // only if publisher streamSdpParsed *sdp.Message // only if publisher
streamProtocol streamProtocol streamProtocol streamProtocol
streamTracks []*track streamTracks []*track
udpLastFrameTime time.Time rtcpReceivers []*rtcpReceiver
readBuf *doubleBuffer readBuf *doubleBuffer
writeBuf *doubleBuffer writeBuf *doubleBuffer
writeChan chan *gortsplib.InterleavedFrame // only if state = _CLIENT_STATE_PLAY events chan serverClientEvent
done chan struct{} done chan struct{}
} }
func newServerClient(p *program, nconn net.Conn) *serverClient { func newServerClient(p *program, nconn net.Conn) *serverClient {
@ -201,31 +212,45 @@ outer:
func (c *serverClient) runPlay() bool { func (c *serverClient) runPlay() bool {
if c.streamProtocol == _STREAM_PROTOCOL_TCP { if c.streamProtocol == _STREAM_PROTOCOL_TCP {
writeDone := make(chan struct{}) readDone := make(chan error)
go func() { go func() {
defer close(writeDone) buf := make([]byte, 2048)
for frame := range c.writeChan { for {
c.conn.WriteInterleavedFrame(frame) _, err := c.conn.NetConn().Read(buf)
if err != nil {
readDone <- err
break
}
} }
}() }()
buf := make([]byte, 2048) outer:
for { for {
_, err := c.conn.NetConn().Read(buf) select {
if err != nil { case err := <-readDone:
if err != io.EOF { if err != io.EOF {
c.log("ERR: %s", err) c.log("ERR: %s", err)
} }
break break outer
case rawEvt := <-c.events:
switch evt := rawEvt.(type) {
case serverClientEventFrameTcp:
c.conn.WriteInterleavedFrame(evt.frame)
}
} }
} }
go func() {
for range c.events {
}
}()
done := make(chan struct{}) done := make(chan struct{})
c.p.events <- programEventClientClose{done, c} c.p.events <- programEventClientClose{done, c}
<-done <-done
close(c.writeChan) close(c.events)
<-writeDone
} else { } else {
for { for {
@ -255,83 +280,147 @@ func (c *serverClient) runRecord() bool {
if c.streamProtocol == _STREAM_PROTOCOL_TCP { if c.streamProtocol == _STREAM_PROTOCOL_TCP {
frame := &gortsplib.InterleavedFrame{} frame := &gortsplib.InterleavedFrame{}
outer: readDone := make(chan error)
for { go func() {
frame.Content = c.readBuf.swap() for {
frame.Content = frame.Content[:cap(frame.Content)] frame.Content = c.readBuf.swap()
recv, err := c.conn.ReadInterleavedFrameOrRequest(frame) frame.Content = frame.Content[:cap(frame.Content)]
if err != nil { recv, err := c.conn.ReadInterleavedFrameOrRequest(frame)
if err != io.EOF { if err != nil {
c.log("ERR: %s", err) readDone <- err
break
}
switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame:
trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel)
if trackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", trackId)
readDone <- nil
break
}
c.rtcpReceivers[trackId].onFrame(trackFlowType, frame.Content)
c.p.events <- programEventClientFrameTcp{
c.path,
trackId,
trackFlowType,
frame.Content,
}
case *gortsplib.Request:
err := c.handleRequest(recvt)
if err != nil {
readDone <- nil
break
}
} }
break outer
} }
}()
switch recvt := recv.(type) { checkStreamTicker := time.NewTicker(_CLIENT_CHECK_STREAM_INTERVAL)
case *gortsplib.InterleavedFrame: receiverReportTicker := time.NewTicker(_CLIENT_RECEIVER_REPORT_INTERVAL)
trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel)
if trackId >= len(c.streamTracks) { outer1:
c.log("ERR: invalid track id '%d'", trackId) for {
break outer select {
case err := <-readDone:
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
} }
break outer1
c.p.events <- programEventClientFrameTcp{
c.path, case <-checkStreamTicker.C:
trackId, for trackId := range c.streamTracks {
trackFlowType, if time.Since(c.rtcpReceivers[trackId].lastFrameTime()) >= _CLIENT_STREAM_DEAD_AFTER {
frame.Content, c.log("ERR: stream is dead")
c.conn.NetConn().Close()
<-readDone
break outer1
}
} }
case *gortsplib.Request: case <-receiverReportTicker.C:
err := c.handleRequest(recvt) for trackId := range c.streamTracks {
if err != nil { channel := trackFlowTypeToInterleavedChannel(trackId, _TRACK_FLOW_TYPE_RTCP)
break outer
frame := c.rtcpReceivers[trackId].report()
c.conn.WriteInterleavedFrame(&gortsplib.InterleavedFrame{
Channel: channel,
Content: frame,
})
} }
} }
} }
done := make(chan struct{}) checkStreamTicker.Stop()
c.p.events <- programEventClientClose{done, c} receiverReportTicker.Stop()
<-done
} else { } else {
c.udpLastFrameTime = time.Now() readDone := make(chan error)
udpCheckStreamTicker := time.NewTicker(_UDP_CHECK_STREAM_INTERVAL)
udpCheckStreamDone := make(chan struct{})
go func() { go func() {
defer close(udpCheckStreamDone) for {
for range udpCheckStreamTicker.C { req, err := c.conn.ReadRequest()
if time.Since(c.udpLastFrameTime) >= _UDP_STREAM_DEAD_AFTER { if err != nil {
c.log("ERR: stream is dead") readDone <- err
c.conn.NetConn().Close() break
}
err = c.handleRequest(req)
if err != nil {
readDone <- nil // err is not needed
break break
} }
} }
}() }()
checkStreamTicker := time.NewTicker(_CLIENT_CHECK_STREAM_INTERVAL)
receiverReportTicker := time.NewTicker(_CLIENT_RECEIVER_REPORT_INTERVAL)
outer2:
for { for {
req, err := c.conn.ReadRequest() select {
if err != nil { case err := <-readDone:
if err != io.EOF { if err != nil && err != io.EOF {
c.log("ERR: %s", err) c.log("ERR: %s", err)
} }
break break outer2
}
case <-checkStreamTicker.C:
for trackId := range c.streamTracks {
if time.Since(c.rtcpReceivers[trackId].lastFrameTime()) >= _CLIENT_STREAM_DEAD_AFTER {
c.log("ERR: stream is dead")
c.conn.NetConn().Close()
<-readDone
break outer2
}
}
err = c.handleRequest(req) case <-receiverReportTicker.C:
if err != nil { for trackId := range c.streamTracks {
break frame := c.rtcpReceivers[trackId].report()
c.p.rtcpl.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[trackId].rtcpPort,
},
buf: frame,
}
}
} }
} }
done := make(chan struct{}) checkStreamTicker.Stop()
c.p.events <- programEventClientClose{done, c} receiverReportTicker.Stop()
<-done }
udpCheckStreamTicker.Stop() done := make(chan struct{})
<-udpCheckStreamDone c.p.events <- programEventClientClose{done, c}
<-done
for trackId := range c.streamTracks {
c.rtcpReceivers[trackId].close()
} }
return false return false
@ -342,17 +431,6 @@ func (c *serverClient) close() {
<-c.done <-c.done
} }
func (c *serverClient) writeFrame(channel uint8, inbuf []byte) {
buf := c.writeBuf.swap()
buf = buf[:len(inbuf)]
copy(buf, inbuf)
c.writeChan <- &gortsplib.InterleavedFrame{
Channel: channel,
Content: buf,
}
}
func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) { func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) {
c.log("ERR: %s", err) c.log("ERR: %s", err)
@ -532,7 +610,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
c.p.events <- programEventClientDescribe{path, res} c.p.events <- programEventClientDescribe{path, res}
sdp := <-res sdp := <-res
if sdp == nil { if sdp == nil {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("no one is streaming on path '%s'", path)) c.writeResError(req, gortsplib.StatusNotFound, fmt.Errorf("no one is publishing on path '%s'", path))
return errClientTerminate return errClientTerminate
} }
@ -906,8 +984,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
}, },
}) })
c.writeBuf = newDoubleBuffer(2048) if c.streamProtocol == _STREAM_PROTOCOL_TCP {
c.writeChan = make(chan *gortsplib.InterleavedFrame) c.writeBuf = newDoubleBuffer(2048)
c.events = make(chan serverClientEvent)
}
// set state // set state
res = make(chan error) res = make(chan error)
@ -948,6 +1028,11 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error {
}, },
}) })
c.rtcpReceivers = make([]*rtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks {
c.rtcpReceivers[trackId] = newRtcpReceiver()
}
res := make(chan error) res := make(chan error)
c.p.events <- programEventClientRecord{res, c} c.p.events <- programEventClientRecord{res, c}
<-res <-res

3
server-udpl.go

@ -54,7 +54,9 @@ func (l *serverUdpListener) log(format string, args ...interface{}) {
} }
func (l *serverUdpListener) run() { func (l *serverUdpListener) run() {
writeDone := make(chan struct{})
go func() { go func() {
defer close(writeDone)
for w := range l.writeChan { for w := range l.writeChan {
l.nconn.SetWriteDeadline(time.Now().Add(l.p.conf.WriteTimeout)) l.nconn.SetWriteDeadline(time.Now().Add(l.p.conf.WriteTimeout))
l.nconn.WriteTo(w.buf, w.addr) l.nconn.WriteTo(w.buf, w.addr)
@ -76,6 +78,7 @@ func (l *serverUdpListener) run() {
} }
close(l.writeChan) close(l.writeChan)
<-writeDone
close(l.done) close(l.done)
} }

31
streamer-udpl.go

@ -16,7 +16,8 @@ type streamerUdpListener struct {
running bool running bool
readBuf *doubleBuffer readBuf *doubleBuffer
done chan struct{} writeChan chan *udpAddrBufPair
done chan struct{}
} }
func newStreamerUdpListener(p *program, port int, streamer *streamer, func newStreamerUdpListener(p *program, port int, streamer *streamer,
@ -36,6 +37,7 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer,
publisherIp: publisherIp, publisherIp: publisherIp,
nconn: nconn, nconn: nconn,
readBuf: newDoubleBuffer(2048), readBuf: newDoubleBuffer(2048),
writeChan: make(chan *udpAddrBufPair),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -43,19 +45,28 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer,
} }
func (l *streamerUdpListener) close() { func (l *streamerUdpListener) close() {
l.nconn.Close() l.nconn.Close() // close twice
if l.running {
<-l.done
}
} }
func (l *streamerUdpListener) start() { func (l *streamerUdpListener) start() {
l.running = true
go l.run() go l.run()
} }
func (l *streamerUdpListener) stop() {
l.nconn.Close()
<-l.done
}
func (l *streamerUdpListener) run() { func (l *streamerUdpListener) run() {
writeDone := make(chan struct{})
go func() {
defer close(writeDone)
for w := range l.writeChan {
l.nconn.SetWriteDeadline(time.Now().Add(l.p.conf.WriteTimeout))
l.nconn.WriteTo(w.buf, w.addr)
}
}()
for { for {
buf := l.readBuf.swap() buf := l.readBuf.swap()
n, addr, err := l.nconn.ReadFromUDP(buf) n, addr, err := l.nconn.ReadFromUDP(buf)
@ -67,10 +78,12 @@ func (l *streamerUdpListener) run() {
continue continue
} }
l.streamer.udpLastFrameTime = time.Now() l.streamer.rtcpReceivers[l.trackId].onFrame(l.trackFlowType, buf[:n])
l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.trackFlowType, buf[:n]} l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.trackFlowType, buf[:n]}
} }
close(l.writeChan)
<-writeDone
close(l.done) close(l.done)
} }

231
streamer.go

@ -14,30 +14,29 @@ import (
) )
const ( const (
_DIAL_TIMEOUT = 10 * time.Second _STREAMER_RETRY_INTERVAL = 5 * time.Second
_RETRY_INTERVAL = 5 * time.Second _STREAMER_CHECK_STREAM_INTERVAL = 5 * time.Second
_CHECK_STREAM_INTERVAL = 6 * time.Second _STREAMER_STREAM_DEAD_AFTER = 15 * time.Second
_STREAM_DEAD_AFTER = 5 * time.Second _STREAMER_KEEPALIVE_INTERVAL = 60 * time.Second
_KEEPALIVE_INTERVAL = 60 * time.Second _STREAMER_RECEIVER_REPORT_INTERVAL = 10 * time.Second
) )
type streamerUdpListenerPair struct { type streamerUdpListenerPair struct {
udplRtp *streamerUdpListener rtpl *streamerUdpListener
udplRtcp *streamerUdpListener rtcpl *streamerUdpListener
} }
type streamer struct { type streamer struct {
p *program p *program
path string path string
ur *url.URL ur *url.URL
proto streamProtocol proto streamProtocol
ready bool ready bool
clientSdpParsed *sdp.Message clientSdpParsed *sdp.Message
serverSdpText []byte serverSdpText []byte
serverSdpParsed *sdp.Message serverSdpParsed *sdp.Message
firstTime bool rtcpReceivers []*rtcpReceiver
udpLastFrameTime time.Time readBuf *doubleBuffer
readBuf *doubleBuffer
terminate chan struct{} terminate chan struct{}
done chan struct{} done chan struct{}
@ -82,7 +81,6 @@ func newStreamer(p *program, path string, source string, sourceProtocol string)
path: path, path: path,
ur: ur, ur: ur,
proto: proto, proto: proto,
firstTime: true,
readBuf: newDoubleBuffer(512 * 1024), readBuf: newDoubleBuffer(512 * 1024),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
@ -113,30 +111,26 @@ func (s *streamer) run() {
if !ok { if !ok {
break break
} }
}
close(s.done)
}
func (s *streamer) do() bool { t := time.NewTimer(_STREAMER_RETRY_INTERVAL)
if s.firstTime {
s.firstTime = false
} else {
t := time.NewTimer(_RETRY_INTERVAL)
select { select {
case <-s.terminate: case <-s.terminate:
return false break
case <-t.C: case <-t.C:
} }
} }
close(s.done)
}
func (s *streamer) do() bool {
s.log("initializing with protocol %s", s.proto) s.log("initializing with protocol %s", s.proto)
var nconn net.Conn var nconn net.Conn
var err error var err error
dialDone := make(chan struct{}) dialDone := make(chan struct{})
go func() { go func() {
nconn, err = net.DialTimeout("tcp", s.ur.Host, _DIAL_TIMEOUT) nconn, err = net.DialTimeout("tcp", s.ur.Host, s.p.conf.ReadTimeout)
close(dialDone) close(dialDone)
}() }()
@ -251,16 +245,16 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
defer func() { defer func() {
for _, pair := range streamerUdpListenerPairs { for _, pair := range streamerUdpListenerPairs {
pair.udplRtp.close() pair.rtpl.close()
pair.udplRtcp.close() pair.rtcpl.close()
} }
}() }()
for i, media := range s.clientSdpParsed.Medias { for i, media := range s.clientSdpParsed.Medias {
var rtpPort int var rtpPort int
var rtcpPort int var rtcpPort int
var udplRtp *streamerUdpListener var rtpl *streamerUdpListener
var udplRtcp *streamerUdpListener var rtcpl *streamerUdpListener
func() { func() {
for { for {
// choose two consecutive ports in range 65536-10000 // choose two consecutive ports in range 65536-10000
@ -269,16 +263,16 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
rtcpPort = rtpPort + 1 rtcpPort = rtpPort + 1
var err error var err error
udplRtp, err = newStreamerUdpListener(s.p, rtpPort, s, i, rtpl, err = newStreamerUdpListener(s.p, rtpPort, s, i,
_TRACK_FLOW_TYPE_RTP, publisherIp) _TRACK_FLOW_TYPE_RTP, publisherIp)
if err != nil { if err != nil {
continue continue
} }
udplRtcp, err = newStreamerUdpListener(s.p, rtcpPort, s, i, rtcpl, err = newStreamerUdpListener(s.p, rtcpPort, s, i,
_TRACK_FLOW_TYPE_RTCP, publisherIp) _TRACK_FLOW_TYPE_RTCP, publisherIp)
if err != nil { if err != nil {
udplRtp.close() rtpl.close()
continue continue
} }
@ -338,23 +332,23 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
}) })
if err != nil { if err != nil {
s.log("ERR: %s", err) s.log("ERR: %s", err)
udplRtp.close() rtpl.close()
udplRtcp.close() rtcpl.close()
return true return true
} }
if res.StatusCode != gortsplib.StatusOK { if res.StatusCode != gortsplib.StatusOK {
s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage) s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage)
udplRtp.close() rtpl.close()
udplRtcp.close() rtcpl.close()
return true return true
} }
tsRaw, ok := res.Header["Transport"] tsRaw, ok := res.Header["Transport"]
if !ok || len(tsRaw) != 1 { if !ok || len(tsRaw) != 1 {
s.log("ERR: transport header not provided") s.log("ERR: transport header not provided")
udplRtp.close() rtpl.close()
udplRtcp.close() rtcpl.close()
return true return true
} }
@ -362,17 +356,17 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
rtpServerPort, rtcpServerPort := th.GetPorts("server_port") rtpServerPort, rtcpServerPort := th.GetPorts("server_port")
if rtpServerPort == 0 { if rtpServerPort == 0 {
s.log("ERR: server ports not provided") s.log("ERR: server ports not provided")
udplRtp.close() rtpl.close()
udplRtcp.close() rtcpl.close()
return true return true
} }
udplRtp.publisherPort = rtpServerPort rtpl.publisherPort = rtpServerPort
udplRtcp.publisherPort = rtcpServerPort rtcpl.publisherPort = rtcpServerPort
streamerUdpListenerPairs = append(streamerUdpListenerPairs, streamerUdpListenerPair{ streamerUdpListenerPairs = append(streamerUdpListenerPairs, streamerUdpListenerPair{
udplRtp: udplRtp, rtpl: rtpl,
udplRtcp: udplRtcp, rtcpl: rtcpl,
}) })
} }
@ -395,30 +389,32 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
return true return true
} }
for _, pair := range streamerUdpListenerPairs { s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.Medias))
pair.udplRtp.start() for trackId := range s.clientSdpParsed.Medias {
pair.udplRtcp.start() s.rtcpReceivers[trackId] = newRtcpReceiver()
} }
tickerSendKeepalive := time.NewTicker(_KEEPALIVE_INTERVAL) for _, pair := range streamerUdpListenerPairs {
defer tickerSendKeepalive.Stop() pair.rtpl.start()
pair.rtcpl.start()
}
s.udpLastFrameTime = time.Now() sendKeepaliveTicker := time.NewTicker(_STREAMER_KEEPALIVE_INTERVAL)
tickerCheckStream := time.NewTicker(_CHECK_STREAM_INTERVAL) checkStreamTicker := time.NewTicker(_STREAMER_CHECK_STREAM_INTERVAL)
defer tickerCheckStream.Stop() receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL)
s.p.events <- programEventStreamerReady{s} s.p.events <- programEventStreamerReady{s}
defer func() { var ret bool
s.p.events <- programEventStreamerNotReady{s}
}()
outer:
for { for {
select { select {
case <-s.terminate: case <-s.terminate:
return false ret = false
break outer
case <-tickerSendKeepalive.C: case <-sendKeepaliveTicker.C:
_, err = conn.WriteRequest(&gortsplib.Request{ _, err = conn.WriteRequest(&gortsplib.Request{
Method: gortsplib.OPTIONS, Method: gortsplib.OPTIONS,
Url: &url.URL{ Url: &url.URL{
@ -429,16 +425,50 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
}) })
if err != nil { if err != nil {
s.log("ERR: %s", err) s.log("ERR: %s", err)
return true ret = true
break outer
} }
case <-tickerCheckStream.C: case <-checkStreamTicker.C:
if time.Since(s.udpLastFrameTime) >= _STREAM_DEAD_AFTER { for trackId := range s.clientSdpParsed.Medias {
s.log("ERR: stream is dead") if time.Since(s.rtcpReceivers[trackId].lastFrameTime()) >= _STREAMER_STREAM_DEAD_AFTER {
return true s.log("ERR: stream is dead")
ret = true
break outer
}
}
case <-receiverReportTicker.C:
for trackId := range s.clientSdpParsed.Medias {
frame := s.rtcpReceivers[trackId].report()
streamerUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{
IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,
Zone: conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone,
Port: streamerUdpListenerPairs[trackId].rtcpl.publisherPort,
},
buf: frame,
}
} }
} }
} }
sendKeepaliveTicker.Stop()
checkStreamTicker.Stop()
receiverReportTicker.Stop()
s.p.events <- programEventStreamerNotReady{s}
for _, pair := range streamerUdpListenerPairs {
pair.rtpl.stop()
pair.rtcpl.stop()
}
for trackId := range s.clientSdpParsed.Medias {
s.rtcpReceivers[trackId].close()
}
return ret
} }
func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool {
@ -536,7 +566,7 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool {
frame := &gortsplib.InterleavedFrame{} frame := &gortsplib.InterleavedFrame{}
outer: outer1:
for { for {
frame.Content = s.readBuf.swap() frame.Content = s.readBuf.swap()
frame.Content = frame.Content[:cap(frame.Content)] frame.Content = frame.Content[:cap(frame.Content)]
@ -553,18 +583,19 @@ outer:
s.log("ERR: PLAY returned code %d (%s)", recvt.StatusCode, recvt.StatusMessage) s.log("ERR: PLAY returned code %d (%s)", recvt.StatusCode, recvt.StatusMessage)
return true return true
} }
break outer break outer1
case *gortsplib.InterleavedFrame: case *gortsplib.InterleavedFrame:
// ignore the frames sent before the response // ignore the frames sent before the response
} }
} }
s.p.events <- programEventStreamerReady{s} s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.Medias))
for trackId := range s.clientSdpParsed.Medias {
s.rtcpReceivers[trackId] = newRtcpReceiver()
}
defer func() { s.p.events <- programEventStreamerReady{s}
s.p.events <- programEventStreamerNotReady{s}
}()
chanConnError := make(chan struct{}) chanConnError := make(chan struct{})
go func() { go func() {
@ -580,16 +611,60 @@ outer:
trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel) trackId, trackFlowType := interleavedChannelToTrackFlowType(frame.Channel)
s.rtcpReceivers[trackId].onFrame(trackFlowType, frame.Content)
s.p.events <- programEventStreamerFrame{s, trackId, trackFlowType, frame.Content} s.p.events <- programEventStreamerFrame{s, trackId, trackFlowType, frame.Content}
} }
}() }()
select { checkStreamTicker := time.NewTicker(_STREAMER_CHECK_STREAM_INTERVAL)
case <-s.terminate: receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL)
return false
case <-chanConnError: var ret bool
return true
outer2:
for {
select {
case <-s.terminate:
ret = false
break outer2
case <-chanConnError:
ret = true
break outer2
case <-checkStreamTicker.C:
for trackId := range s.clientSdpParsed.Medias {
if time.Since(s.rtcpReceivers[trackId].lastFrameTime()) >= _STREAMER_STREAM_DEAD_AFTER {
s.log("ERR: stream is dead")
ret = true
break outer2
}
}
case <-receiverReportTicker.C:
for trackId := range s.clientSdpParsed.Medias {
frame := s.rtcpReceivers[trackId].report()
channel := trackFlowTypeToInterleavedChannel(trackId, _TRACK_FLOW_TYPE_RTCP)
conn.WriteInterleavedFrame(&gortsplib.InterleavedFrame{
Channel: channel,
Content: frame,
})
}
}
} }
checkStreamTicker.Stop()
receiverReportTicker.Stop()
s.p.events <- programEventStreamerNotReady{s}
for trackId := range s.clientSdpParsed.Medias {
s.rtcpReceivers[trackId].close()
}
return ret
} }
func (s *streamer) close() { func (s *streamer) close() {

140
utils.go

@ -2,7 +2,11 @@ package main
import ( import (
"fmt" "fmt"
"math/rand"
"net" "net"
"time"
"github.com/pion/rtcp"
) )
func parseIpCidrList(in []string) ([]interface{}, error) { func parseIpCidrList(in []string) ([]interface{}, error) {
@ -90,3 +94,139 @@ func (db *doubleBuffer) swap() []byte {
db.curBuf = !db.curBuf db.curBuf = !db.curBuf
return ret return ret
} }
type rtcpReceiverEvent interface {
isRtpReceiverEvent()
}
type rtcpReceiverEventFrameRtp struct {
sequenceNumber uint16
}
func (rtcpReceiverEventFrameRtp) isRtpReceiverEvent() {}
type rtcpReceiverEventFrameRtcp struct {
ssrc uint32
ntpTimeMiddle uint32
}
func (rtcpReceiverEventFrameRtcp) isRtpReceiverEvent() {}
type rtcpReceiverEventLastFrameTime struct {
res chan time.Time
}
func (rtcpReceiverEventLastFrameTime) isRtpReceiverEvent() {}
type rtcpReceiverEventReport struct {
res chan []byte
}
func (rtcpReceiverEventReport) isRtpReceiverEvent() {}
type rtcpReceiverEventTerminate struct{}
func (rtcpReceiverEventTerminate) isRtpReceiverEvent() {}
type rtcpReceiver struct {
events chan rtcpReceiverEvent
done chan struct{}
}
func newRtcpReceiver() *rtcpReceiver {
rr := &rtcpReceiver{
events: make(chan rtcpReceiverEvent),
done: make(chan struct{}),
}
go rr.run()
return rr
}
func (rr *rtcpReceiver) run() {
lastFrameTime := time.Now()
publisherSSRC := uint32(0)
receiverSSRC := rand.Uint32()
sequenceNumberCycles := uint16(0)
lastSequenceNumber := uint16(0)
lastSenderReport := uint32(0)
outer:
for rawEvt := range rr.events {
switch evt := rawEvt.(type) {
case rtcpReceiverEventFrameRtp:
if evt.sequenceNumber < lastSequenceNumber {
sequenceNumberCycles += 1
}
lastSequenceNumber = evt.sequenceNumber
lastFrameTime = time.Now()
case rtcpReceiverEventFrameRtcp:
publisherSSRC = evt.ssrc
lastSenderReport = evt.ntpTimeMiddle
case rtcpReceiverEventLastFrameTime:
evt.res <- lastFrameTime
case rtcpReceiverEventReport:
rr := &rtcp.ReceiverReport{
SSRC: receiverSSRC,
Reports: []rtcp.ReceptionReport{
{
SSRC: publisherSSRC,
LastSequenceNumber: uint32(sequenceNumberCycles)<<8 | uint32(lastSequenceNumber),
LastSenderReport: lastSenderReport,
},
},
}
frame, _ := rr.Marshal()
evt.res <- frame
case rtcpReceiverEventTerminate:
break outer
}
}
close(rr.events)
close(rr.done)
}
func (rr *rtcpReceiver) close() {
rr.events <- rtcpReceiverEventTerminate{}
<-rr.done
}
func (rr *rtcpReceiver) onFrame(trackFlowType trackFlowType, buf []byte) {
if trackFlowType == _TRACK_FLOW_TYPE_RTP {
// extract sequence number of first frame
if len(buf) >= 3 {
sequenceNumber := uint16(uint16(buf[2])<<8 | uint16(buf[1]))
rr.events <- rtcpReceiverEventFrameRtp{sequenceNumber}
}
} else {
frames, err := rtcp.Unmarshal(buf)
if err == nil {
for _, frame := range frames {
if senderReport, ok := (frame).(*rtcp.SenderReport); ok {
rr.events <- rtcpReceiverEventFrameRtcp{
senderReport.SSRC,
uint32(senderReport.NTPTime >> 16),
}
}
}
}
}
}
func (rr *rtcpReceiver) lastFrameTime() time.Time {
res := make(chan time.Time)
rr.events <- rtcpReceiverEventLastFrameTime{res}
return <-res
}
func (rr *rtcpReceiver) report() []byte {
res := make(chan []byte)
rr.events <- rtcpReceiverEventReport{res}
return <-res
}

Loading…
Cancel
Save