Browse Source

use a common object for double buffering

pull/31/head
aler9 6 years ago
parent
commit
c0d94929a6
  1. 32
      server-client.go
  2. 32
      server-udpl.go
  3. 16
      streamer-udpl.go
  4. 24
      streamer.go
  5. 24
      utils.go

32
server-client.go

@ -67,12 +67,8 @@ type serverClient struct {
streamTracks []*track streamTracks []*track
udpLastFrameTime time.Time udpLastFrameTime time.Time
udpCheckStreamTicker *time.Ticker udpCheckStreamTicker *time.Ticker
readBuf1 []byte readBuf *doubleBuffer
readBuf2 []byte writeBuf *doubleBuffer
readCurBuf bool
writeBuf1 []byte
writeBuf2 []byte
writeCurBuf bool
writeChan chan *gortsplib.InterleavedFrame writeChan chan *gortsplib.InterleavedFrame
done chan struct{} done chan struct{}
@ -87,10 +83,8 @@ func newServerClient(p *program, nconn net.Conn) *serverClient {
WriteTimeout: p.conf.WriteTimeout, WriteTimeout: p.conf.WriteTimeout,
}), }),
state: _CLIENT_STATE_STARTING, state: _CLIENT_STATE_STARTING,
readBuf1: make([]byte, 0, 512*1024), readBuf: newDoubleBuffer(512 * 1024),
readBuf2: make([]byte, 0, 512*1024), writeBuf: newDoubleBuffer(2048),
writeBuf1: make([]byte, 2048),
writeBuf2: make([]byte, 2048),
writeChan: make(chan *gortsplib.InterleavedFrame), writeChan: make(chan *gortsplib.InterleavedFrame),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -182,16 +176,9 @@ func (c *serverClient) close() {
} }
func (c *serverClient) writeFrame(channel uint8, inbuf []byte) { func (c *serverClient) writeFrame(channel uint8, inbuf []byte) {
var buf []byte buf := c.writeBuf.swap()
if !c.writeCurBuf {
buf = c.writeBuf1
} else {
buf = c.writeBuf2
}
buf = buf[:len(inbuf)] buf = buf[:len(inbuf)]
copy(buf, inbuf) copy(buf, inbuf)
c.writeCurBuf = !c.writeCurBuf
c.writeChan <- &gortsplib.InterleavedFrame{ c.writeChan <- &gortsplib.InterleavedFrame{
Channel: channel, Channel: channel,
@ -868,15 +855,8 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
if c.streamProtocol == _STREAM_PROTOCOL_TCP { if c.streamProtocol == _STREAM_PROTOCOL_TCP {
frame := &gortsplib.InterleavedFrame{} frame := &gortsplib.InterleavedFrame{}
for { for {
if !c.readCurBuf { frame.Content = c.readBuf.swap()
frame.Content = c.readBuf1
} else {
frame.Content = c.readBuf2
}
frame.Content = frame.Content[:cap(frame.Content)] frame.Content = frame.Content[:cap(frame.Content)]
c.readCurBuf = !c.readCurBuf
recv, err := c.conn.ReadInterleavedFrameOrRequest(frame) recv, err := c.conn.ReadInterleavedFrameOrRequest(frame)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {

32
server-udpl.go

@ -14,12 +14,8 @@ type serverUdpListener struct {
p *program p *program
nconn *net.UDPConn nconn *net.UDPConn
trackFlowType trackFlowType trackFlowType trackFlowType
readBuf1 []byte readBuf *doubleBuffer
readBuf2 []byte writeBuf *doubleBuffer
readCurBuf bool
writeBuf1 []byte
writeBuf2 []byte
writeCurBuf bool
writeChan chan *udpAddrFramePair writeChan chan *udpAddrFramePair
done chan struct{} done chan struct{}
@ -37,10 +33,8 @@ func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*s
p: p, p: p,
nconn: nconn, nconn: nconn,
trackFlowType: trackFlowType, trackFlowType: trackFlowType,
readBuf1: make([]byte, 2048), readBuf: newDoubleBuffer(2048),
readBuf2: make([]byte, 2048), writeBuf: newDoubleBuffer(2048),
writeBuf1: make([]byte, 2048),
writeBuf2: make([]byte, 2048),
writeChan: make(chan *udpAddrFramePair), writeChan: make(chan *udpAddrFramePair),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -68,14 +62,7 @@ func (l *serverUdpListener) run() {
}() }()
for { for {
var buf []byte buf := l.readBuf.swap()
if !l.readCurBuf {
buf = l.readBuf1
} else {
buf = l.readBuf2
}
l.readCurBuf = !l.readCurBuf
n, addr, err := l.nconn.ReadFromUDP(buf) n, addr, err := l.nconn.ReadFromUDP(buf)
if err != nil { if err != nil {
break break
@ -99,16 +86,9 @@ func (l *serverUdpListener) close() {
} }
func (l *serverUdpListener) write(addr *net.UDPAddr, inbuf []byte) { func (l *serverUdpListener) write(addr *net.UDPAddr, inbuf []byte) {
var buf []byte buf := l.writeBuf.swap()
if !l.writeCurBuf {
buf = l.writeBuf1
} else {
buf = l.writeBuf2
}
buf = buf[:len(inbuf)] buf = buf[:len(inbuf)]
copy(buf, inbuf) copy(buf, inbuf)
l.writeCurBuf = !l.writeCurBuf
l.writeChan <- &udpAddrFramePair{ l.writeChan <- &udpAddrFramePair{
addr: addr, addr: addr,

16
streamer-udpl.go

@ -14,9 +14,7 @@ type streamerUdpListener struct {
publisherPort int publisherPort int
nconn *net.UDPConn nconn *net.UDPConn
running bool running bool
readBuf1 []byte readBuf *doubleBuffer
readBuf2 []byte
readCurBuf bool
lastFrameTime time.Time lastFrameTime time.Time
done chan struct{} done chan struct{}
@ -38,8 +36,7 @@ func newStreamerUdpListener(p *program, port int, streamer *streamer,
trackFlowType: trackFlowType, trackFlowType: trackFlowType,
publisherIp: publisherIp, publisherIp: publisherIp,
nconn: nconn, nconn: nconn,
readBuf1: make([]byte, 2048), readBuf: newDoubleBuffer(2048),
readBuf2: make([]byte, 2048),
lastFrameTime: time.Now(), lastFrameTime: time.Now(),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -62,14 +59,7 @@ func (l *streamerUdpListener) start() {
func (l *streamerUdpListener) run() { func (l *streamerUdpListener) run() {
for { for {
var buf []byte buf := l.readBuf.swap()
if !l.readCurBuf {
buf = l.readBuf1
} else {
buf = l.readBuf2
}
l.readCurBuf = !l.readCurBuf
n, addr, err := l.nconn.ReadFromUDP(buf) n, addr, err := l.nconn.ReadFromUDP(buf)
if err != nil { if err != nil {
break break

24
streamer.go

@ -36,9 +36,7 @@ type streamer struct {
serverSdpText []byte serverSdpText []byte
serverSdpParsed *sdp.Message serverSdpParsed *sdp.Message
firstTime bool firstTime bool
readBuf1 []byte readBuf *doubleBuffer
readBuf2 []byte
readCurBuf bool
terminate chan struct{} terminate chan struct{}
done chan struct{} done chan struct{}
@ -84,8 +82,7 @@ func newStreamer(p *program, path string, source string, sourceProtocol string)
ur: ur, ur: ur,
proto: proto, proto: proto,
firstTime: true, firstTime: true,
readBuf1: make([]byte, 0, 512*1024), readBuf: newDoubleBuffer(512 * 1024),
readBuf2: make([]byte, 0, 512*1024),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -553,14 +550,8 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool {
outer: outer:
for { for {
if !s.readCurBuf { frame.Content = s.readBuf.swap()
frame.Content = s.readBuf1
} else {
frame.Content = s.readBuf2
}
frame.Content = frame.Content[:cap(frame.Content)] frame.Content = frame.Content[:cap(frame.Content)]
s.readCurBuf = !s.readCurBuf
recv, err := conn.ReadInterleavedFrameOrResponse(frame) recv, err := conn.ReadInterleavedFrameOrResponse(frame)
if err != nil { if err != nil {
@ -590,15 +581,8 @@ outer:
chanConnError := make(chan struct{}) chanConnError := make(chan struct{})
go func() { go func() {
for { for {
if !s.readCurBuf { frame.Content = s.readBuf.swap()
frame.Content = s.readBuf1
} else {
frame.Content = s.readBuf2
}
frame.Content = frame.Content[:cap(frame.Content)] frame.Content = frame.Content[:cap(frame.Content)]
s.readCurBuf = !s.readCurBuf
err := conn.ReadInterleavedFrame(frame) err := conn.ReadInterleavedFrame(frame)
if err != nil { if err != nil {
s.log("ERR: %s", err) s.log("ERR: %s", err)

24
utils.go

@ -49,3 +49,27 @@ func trackFlowTypeToInterleavedChannel(id int, trackFlowType trackFlowType) uint
} }
return uint8((id * 2) + 1) return uint8((id * 2) + 1)
} }
type doubleBuffer struct {
buf1 []byte
buf2 []byte
curBuf bool
}
func newDoubleBuffer(size int) *doubleBuffer {
return &doubleBuffer{
buf1: make([]byte, size),
buf2: make([]byte, size),
}
}
func (db *doubleBuffer) swap() []byte {
var ret []byte
if !db.curBuf {
ret = db.buf1
} else {
ret = db.buf2
}
db.curBuf = !db.curBuf
return ret
}

Loading…
Cancel
Save