Browse Source

move buffer handling into gortsplib

pull/101/head
aler9 5 years ago
parent
commit
693aa9118d
  1. 35
      client.go
  2. 2
      go.mod
  3. 4
      go.sum
  4. 33
      proxy.go
  5. 10
      serverudp.go
  6. 25
      utils.go

35
client.go

@ -16,10 +16,6 @@ import (
const ( const (
clientCheckStreamInterval = 5 * time.Second clientCheckStreamInterval = 5 * time.Second
clientReceiverReportInterval = 10 * time.Second clientReceiverReportInterval = 10 * time.Second
clientTCPReadBufferSize = 128 * 1024
clientTCPWriteBufferSize = 128 * 1024
clientUDPReadBufferSize = 2048
clientUDPWriteBufferSize = 128 * 1024
) )
type clientDescribeReq struct { type clientDescribeReq struct {
@ -119,9 +115,10 @@ func newClient(p *program, nconn net.Conn) *client {
c := &client{ c := &client{
p: p, p: p,
conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{ conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{
Conn: nconn, Conn: nconn,
ReadTimeout: p.conf.ReadTimeout, ReadTimeout: p.conf.ReadTimeout,
WriteTimeout: p.conf.WriteTimeout, WriteTimeout: p.conf.WriteTimeout,
ReadBufferCount: 2,
}), }),
state: clientStateInitial, state: clientStateInitial,
streamTracks: make(map[int]*clientTrack), streamTracks: make(map[int]*clientTrack),
@ -946,14 +943,8 @@ func (c *client) runPlayTCP() {
readDone := make(chan error) readDone := make(chan error)
go func() { go func() {
frame := &gortsplib.InterleavedFrame{}
readBuf := make([]byte, clientTCPReadBufferSize)
for { for {
frame.Content = readBuf recv, err := c.conn.ReadFrameOrRequest(false)
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame, false)
if err != nil { if err != nil {
readDone <- err readDone <- err
break break
@ -1144,19 +1135,13 @@ func (c *client) runRecordUDP() {
} }
func (c *client) runRecordTCP() { func (c *client) runRecordTCP() {
frame := &gortsplib.InterleavedFrame{}
readBuf := newMultiBuffer(2, clientTCPReadBufferSize)
readRequest := make(chan readRequestPair) readRequest := make(chan readRequestPair)
defer close(readRequest) defer close(readRequest)
readDone := make(chan error) readDone := make(chan error)
go func() { go func() {
for { for {
frame.Content = readBuf.next() recv, err := c.conn.ReadFrameOrRequest(true)
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame, true)
if err != nil { if err != nil {
readDone <- err readDone <- err
break break
@ -1164,14 +1149,14 @@ func (c *client) runRecordTCP() {
switch recvt := recv.(type) { switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame: case *gortsplib.InterleavedFrame:
if frame.TrackId >= len(c.streamTracks) { if recvt.TrackId >= len(c.streamTracks) {
readDone <- fmt.Errorf("invalid track id '%d'", frame.TrackId) readDone <- fmt.Errorf("invalid track id '%d'", recvt.TrackId)
break break
} }
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.rtcpReceivers[recvt.TrackId].OnFrame(recvt.StreamType, recvt.Content)
c.p.readersMap.forwardFrame(c.path, frame.TrackId, frame.StreamType, frame.Content) c.p.readersMap.forwardFrame(c.path, recvt.TrackId, recvt.StreamType, recvt.Content)
case *gortsplib.Request: case *gortsplib.Request:
err := c.handleRequest(recvt) err := c.handleRequest(recvt)

2
go.mod

@ -5,7 +5,7 @@ go 1.12
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200920093758-10469faa0777 github.com/aler9/gortsplib v0.0.0-20200920120711-81b5754b0b0c
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.6.1 github.com/stretchr/testify v1.6.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/alecthomas/kingpin.v2 v2.2.6

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200920093758-10469faa0777 h1:WpP46odBYEPXa1GLtGrf8W8gZGLohDJCJTYmlPJCo2w= github.com/aler9/gortsplib v0.0.0-20200920120711-81b5754b0b0c h1:tXQF5q+mlu5KEALSgHR+ReWfF+cS7h3oM4+nJuTBf7E=
github.com/aler9/gortsplib v0.0.0-20200920093758-10469faa0777/go.mod h1:IQy51zikcH4wQFNwYPHtC0+HTcPlahJcxcYiMqlCyiw= github.com/aler9/gortsplib v0.0.0-20200920120711-81b5754b0b0c/go.mod h1:IQy51zikcH4wQFNwYPHtC0+HTcPlahJcxcYiMqlCyiw=
github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625 h1:A3upkpYzceQTuBPvVleu1zd6R8jInhg5ifimSO7ku/o= github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625 h1:A3upkpYzceQTuBPvVleu1zd6R8jInhg5ifimSO7ku/o=
github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI= github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

33
proxy.go

@ -9,9 +9,7 @@ import (
) )
const ( const (
proxyRetryInterval = 5 * time.Second proxyRetryInterval = 5 * time.Second
proxyUDPReadBufferSize = 2048
proxyTCPReadBufferSize = 128 * 1024
) )
type proxyState int type proxyState int
@ -137,9 +135,10 @@ func (s *proxy) runInnerInner() bool {
dialDone := make(chan struct{}) dialDone := make(chan struct{})
go func() { go func() {
conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{ conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{
Host: s.pathConf.sourceUrl.Host, Host: s.pathConf.sourceUrl.Host,
ReadTimeout: s.p.conf.ReadTimeout, ReadTimeout: s.p.conf.ReadTimeout,
WriteTimeout: s.p.conf.WriteTimeout, WriteTimeout: s.p.conf.WriteTimeout,
ReadBufferCount: 2,
}) })
close(dialDone) close(dialDone)
}() }()
@ -216,17 +215,14 @@ func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool {
go func(trackId int, rtpRead gortsplib.UDPReadFunc) { go func(trackId int, rtpRead gortsplib.UDPReadFunc) {
defer wg.Done() defer wg.Done()
multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize)
for { for {
buf := multiBuf.next() buf, err := rtpRead()
n, err := rtpRead(buf)
if err != nil { if err != nil {
break break
} }
s.p.readersMap.forwardFrame(s.path, trackId, s.p.readersMap.forwardFrame(s.path, trackId,
gortsplib.StreamTypeRtp, buf[:n]) gortsplib.StreamTypeRtp, buf)
} }
}(trackId, rtpRead) }(trackId, rtpRead)
} }
@ -237,17 +233,14 @@ func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool {
go func(trackId int, rtcpRead gortsplib.UDPReadFunc) { go func(trackId int, rtcpRead gortsplib.UDPReadFunc) {
defer wg.Done() defer wg.Done()
multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize)
for { for {
buf := multiBuf.next() buf, err := rtcpRead()
n, err := rtcpRead(buf)
if err != nil { if err != nil {
break break
} }
s.p.readersMap.forwardFrame(s.path, trackId, s.p.readersMap.forwardFrame(s.path, trackId,
gortsplib.StreamTypeRtcp, buf[:n]) gortsplib.StreamTypeRtcp, buf)
} }
}(trackId, rtcpRead) }(trackId, rtcpRead)
} }
@ -302,16 +295,10 @@ func (s *proxy) runTCP(conn *gortsplib.ConnClient) bool {
s.p.proxyReady <- s s.p.proxyReady <- s
frame := &gortsplib.InterleavedFrame{}
multiBuf := newMultiBuffer(2, proxyTCPReadBufferSize)
tcpConnDone := make(chan error) tcpConnDone := make(chan error)
go func() { go func() {
for { for {
frame.Content = multiBuf.next() frame, err := conn.ReadFrame()
frame.Content = frame.Content[:cap(frame.Content)]
err := conn.ReadFrame(frame)
if err != nil { if err != nil {
tcpConnDone <- err tcpConnDone <- err
return return

10
serverudp.go

@ -8,6 +8,10 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
) )
const (
udpReadBufferSize = 2048
)
type udpBufAddrPair struct { type udpBufAddrPair struct {
buf []byte buf []byte
addr *net.UDPAddr addr *net.UDPAddr
@ -17,7 +21,7 @@ type serverUDP struct {
p *program p *program
pc *net.UDPConn pc *net.UDPConn
streamType gortsplib.StreamType streamType gortsplib.StreamType
readBuf *multiBuffer readBuf *gortsplib.MultiBuffer
writec chan udpBufAddrPair writec chan udpBufAddrPair
done chan struct{} done chan struct{}
@ -35,7 +39,7 @@ func newServerUDP(p *program, port int, streamType gortsplib.StreamType) (*serve
p: p, p: p,
pc: pc, pc: pc,
streamType: streamType, streamType: streamType,
readBuf: newMultiBuffer(2, clientUDPReadBufferSize), readBuf: gortsplib.NewMultiBuffer(2, udpReadBufferSize),
writec: make(chan udpBufAddrPair), writec: make(chan udpBufAddrPair),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -65,7 +69,7 @@ func (l *serverUDP) run() {
}() }()
for { for {
buf := l.readBuf.next() buf := l.readBuf.Next()
n, addr, err := l.pc.ReadFromUDP(buf) n, addr, err := l.pc.ReadFromUDP(buf)
if err != nil { if err != nil {
break break

25
utils.go

@ -51,31 +51,6 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool {
return false return false
} }
type multiBuffer struct {
buffers [][]byte
curBuf int
}
func newMultiBuffer(count int, size int) *multiBuffer {
buffers := make([][]byte, count)
for i := 0; i < count; i++ {
buffers[i] = make([]byte, size)
}
return &multiBuffer{
buffers: buffers,
}
}
func (mb *multiBuffer) next() []byte {
ret := mb.buffers[mb.curBuf]
mb.curBuf += 1
if mb.curBuf >= len(mb.buffers) {
mb.curBuf = 0
}
return ret
}
func splitPath(path string) (string, string, error) { func splitPath(path string) (string, string, error) {
pos := func() int { pos := func() int {
for i := len(path) - 1; i >= 0; i-- { for i := len(path) - 1; i >= 0; i-- {

Loading…
Cancel
Save