diff --git a/Makefile b/Makefile index 5ca0a632..5d9781e4 100644 --- a/Makefile +++ b/Makefile @@ -69,7 +69,7 @@ define CONFIG_RUN #rtpPort: 8002 #rtcpPort: 8003 #metrics: yes -logDestinations: [stdout, file] +pprof: yes paths: all: diff --git a/client.go b/client.go index f49d372a..45afbdfa 100644 --- a/client.go +++ b/client.go @@ -118,8 +118,6 @@ type client struct { streamProtocol gortsplib.StreamProtocol streamTracks map[int]*clientTrack rtcpReceivers []*gortsplib.RtcpReceiver - readBuf *doubleBuffer - writeBuf *doubleBuffer describeRes chan describeRes events chan clientEvent // only if state = Play and gortsplib.StreamProtocol = TCP @@ -136,7 +134,6 @@ func newClient(p *program, nconn net.Conn) *client { }), state: clientStateInitial, streamTracks: make(map[int]*clientTrack), - readBuf: newDoubleBuffer(clientTcpReadBufferSize), done: make(chan struct{}), } @@ -838,7 +835,6 @@ func (c *client) runPlay(path string) { confp := c.p.findConfForPath(path) if c.streamProtocol == gortsplib.StreamProtocolTcp { - c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize) c.events = make(chan clientEvent) } @@ -1030,11 +1026,12 @@ func (c *client) runRecord(path string) { } else { frame := &gortsplib.InterleavedFrame{} + readBuf := newMultiBuffer(3, clientTcpReadBufferSize) readDone := make(chan error) go func() { for { - frame.Content = c.readBuf.swap() + frame.Content = readBuf.next() frame.Content = frame.Content[:cap(frame.Content)] recv, err := c.conn.ReadFrameOrRequest(frame) diff --git a/main.go b/main.go index ac76d898..0e36dace 100644 --- a/main.go +++ b/main.go @@ -621,15 +621,11 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St } } else { - buf := c.writeBuf.swap() - buf = buf[:len(frame)] - copy(buf, frame) - c.events <- clientEventFrameTcp{ frame: &gortsplib.InterleavedFrame{ TrackId: trackId, StreamType: streamType, - Content: buf, + Content: frame, }, } } diff --git a/server-udp.go b/server-udp.go index 1c296255..ea0a19c9 100644 --- a/server-udp.go +++ b/server-udp.go @@ -16,8 +16,7 @@ type serverUdp struct { p *program nconn *net.UDPConn streamType gortsplib.StreamType - readBuf *doubleBuffer - writeBuf *doubleBuffer + readBuf *multiBuffer writeChan chan *udpAddrBufPair done chan struct{} @@ -35,8 +34,7 @@ func newServerUdp(p *program, port int, streamType gortsplib.StreamType) (*serve p: p, nconn: nconn, streamType: streamType, - readBuf: newDoubleBuffer(clientUdpReadBufferSize), - writeBuf: newDoubleBuffer(clientUdpWriteBufferSize), + readBuf: newMultiBuffer(3, clientUdpReadBufferSize), writeChan: make(chan *udpAddrBufPair), done: make(chan struct{}), } @@ -66,7 +64,7 @@ func (l *serverUdp) run() { }() for { - buf := l.readBuf.swap() + buf := l.readBuf.next() n, addr, err := l.nconn.ReadFromUDP(buf) if err != nil { break @@ -91,11 +89,5 @@ func (l *serverUdp) close() { } func (l *serverUdp) write(pair *udpAddrBufPair) { - // replace input buffer with write buffer - buf := l.writeBuf.swap() - buf = buf[:len(pair.buf)] - copy(buf, pair.buf) - pair.buf = buf - l.writeChan <- pair } diff --git a/source.go b/source.go index eab9f147..9cc69820 100644 --- a/source.go +++ b/source.go @@ -257,9 +257,9 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo go func(trackId int, l *gortsplib.ConnClientUdpListener) { defer wg.Done() - doubleBuf := newDoubleBuffer(sourceUdpReadBufferSize) + multiBuf := newMultiBuffer(3, sourceUdpReadBufferSize) for { - buf := doubleBuf.swap() + buf := multiBuf.next() n, err := l.Read(buf) if err != nil { @@ -274,9 +274,9 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo go func(trackId int, l *gortsplib.ConnClientUdpListener) { defer wg.Done() - doubleBuf := newDoubleBuffer(sourceUdpReadBufferSize) + multiBuf := newMultiBuffer(3, sourceUdpReadBufferSize) for { - buf := doubleBuf.swap() + buf := multiBuf.next() n, err := l.Read(buf) if err != nil { @@ -340,12 +340,12 @@ func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) boo s.p.events <- programEventSourceReady{s} frame := &gortsplib.InterleavedFrame{} - doubleBuf := newDoubleBuffer(sourceTcpReadBufferSize) + multiBuf := newMultiBuffer(3, sourceTcpReadBufferSize) tcpConnDone := make(chan error) go func() { for { - frame.Content = doubleBuf.swap() + frame.Content = multiBuf.next() frame.Content = frame.Content[:cap(frame.Content)] err := conn.ReadFrame(frame) diff --git a/utils.go b/utils.go index 3ce1906b..15b76335 100644 --- a/utils.go +++ b/utils.go @@ -52,27 +52,28 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool { return false } -type doubleBuffer struct { - buf1 []byte - buf2 []byte - curBuf bool +type multiBuffer struct { + buffers [][]byte + curBuf int } -func newDoubleBuffer(size int) *doubleBuffer { - return &doubleBuffer{ - buf1: make([]byte, size), - buf2: make([]byte, size), +func newMultiBuffer(count int, size int) *multiBuffer { + buffers := make([][]byte, count) + for i := 0; i < count; i++ { + buffers[i] = make([]byte, size) + } + + return &multiBuffer{ + buffers: buffers, } } -func (db *doubleBuffer) swap() []byte { - var ret []byte - if !db.curBuf { - ret = db.buf1 - } else { - ret = db.buf2 +func (mb *multiBuffer) next() []byte { + ret := mb.buffers[mb.curBuf] + mb.curBuf += 1 + if mb.curBuf >= len(mb.buffers) { + mb.curBuf = 0 } - db.curBuf = !db.curBuf return ret }