Browse Source

drastically improve performance when publishing or proxying streams

pull/97/head
aler9 6 years ago
parent
commit
f5936d67ed
  1. 56
      client.go
  2. 92
      main.go
  3. 23
      server-udp.go
  4. 15
      source.go
  5. 130
      utils.go

56
client.go

@ -44,50 +44,11 @@ type clientSetupPlayReq struct {
trackId int trackId int
} }
type clientFrameUDPReq struct {
addr *net.UDPAddr
streamType gortsplib.StreamType
buf []byte
}
type clientFrameTCPReq struct {
path *path
trackId int
streamType gortsplib.StreamType
buf []byte
}
type readRequestPair struct { type readRequestPair struct {
req *gortsplib.Request req *gortsplib.Request
res chan error res chan error
} }
type udpClient struct {
client *client
trackId int
streamType gortsplib.StreamType
}
type udpClientAddr struct {
ip [net.IPv6len]byte // use a fixed-size array to enable the equality operator
port int
}
func makeUDPClientAddr(ip net.IP, port int) udpClientAddr {
ret := udpClientAddr{
port: port,
}
if len(ip) == net.IPv4len {
copy(ret.ip[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix
copy(ret.ip[12:], ip)
} else {
copy(ret.ip[:], ip)
}
return ret
}
type clientTrack struct { type clientTrack struct {
rtpPort int rtpPort int
rtcpPort int rtcpPort int
@ -182,17 +143,18 @@ func (c *client) close() {
switch c.state { switch c.state {
case clientStatePlay: case clientStatePlay:
atomic.AddInt64(&c.p.countReader, -1) atomic.AddInt64(&c.p.countReader, -1)
c.p.readersMap.remove(c)
case clientStateRecord: case clientStateRecord:
atomic.AddInt64(&c.p.countPublisher, -1) atomic.AddInt64(&c.p.countPublisher, -1)
if c.streamProtocol == gortsplib.StreamProtocolUDP { if c.streamProtocol == gortsplib.StreamProtocolUDP {
for _, track := range c.streamTracks { for _, track := range c.streamTracks {
key := makeUDPClientAddr(c.ip(), track.rtpPort) addr := makeUDPPublisherAddr(c.ip(), track.rtpPort)
delete(c.p.udpClientsByAddr, key) c.p.udpPublishersMap.remove(addr)
key = makeUDPClientAddr(c.ip(), track.rtcpPort) addr = makeUDPPublisherAddr(c.ip(), track.rtcpPort)
delete(c.p.udpClientsByAddr, key) c.p.udpPublishersMap.remove(addr)
} }
} }
@ -1225,12 +1187,8 @@ func (c *client) runRecordTCP() {
} }
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.clientFrameTCP <- clientFrameTCPReq{
c.path, c.p.readersMap.forwardFrame(c.path, frame.TrackId, frame.StreamType, frame.Content)
frame.TrackId,
frame.StreamType,
frame.Content,
}
case *gortsplib.Request: case *gortsplib.Request:
err := c.handleRequest(recvt) err := c.handleRequest(recvt)

92
main.go

@ -42,7 +42,8 @@ type program struct {
serverRtcp *serverUDP serverRtcp *serverUDP
serverRtsp *serverTCP serverRtsp *serverTCP
clients map[*client]struct{} clients map[*client]struct{}
udpClientsByAddr map[udpClientAddr]*udpClient udpPublishersMap *udpPublishersMap
readersMap *readersMap
countClient int64 countClient int64
countPublisher int64 countPublisher int64
countReader int64 countReader int64
@ -55,11 +56,8 @@ type program struct {
clientSetupPlay chan clientSetupPlayReq clientSetupPlay chan clientSetupPlayReq
clientPlay chan *client clientPlay chan *client
clientRecord chan *client clientRecord chan *client
clientFrameUDP chan clientFrameUDPReq
clientFrameTCP chan clientFrameTCPReq
sourceReady chan *source sourceReady chan *source
sourceNotReady chan *source sourceNotReady chan *source
sourceFrame chan sourceFrameReq
terminate chan struct{} terminate chan struct{}
done chan struct{} done chan struct{}
} }
@ -87,7 +85,8 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
conf: conf, conf: conf,
paths: make(map[string]*path), paths: make(map[string]*path),
clients: make(map[*client]struct{}), clients: make(map[*client]struct{}),
udpClientsByAddr: make(map[udpClientAddr]*udpClient), udpPublishersMap: newUdpPublisherMap(),
readersMap: newReadersMap(),
metricsGather: make(chan metricsGatherReq), metricsGather: make(chan metricsGatherReq),
clientNew: make(chan net.Conn), clientNew: make(chan net.Conn),
clientClose: make(chan *client), clientClose: make(chan *client),
@ -96,11 +95,8 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
clientSetupPlay: make(chan clientSetupPlayReq), clientSetupPlay: make(chan clientSetupPlayReq),
clientPlay: make(chan *client), clientPlay: make(chan *client),
clientRecord: make(chan *client), clientRecord: make(chan *client),
clientFrameUDP: make(chan clientFrameUDPReq),
clientFrameTCP: make(chan clientFrameTCPReq),
sourceReady: make(chan *source), sourceReady: make(chan *source),
sourceNotReady: make(chan *source), sourceNotReady: make(chan *source),
sourceFrame: make(chan sourceFrameReq),
terminate: make(chan struct{}), terminate: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -280,6 +276,7 @@ outer:
case client := <-p.clientPlay: case client := <-p.clientPlay:
atomic.AddInt64(&p.countReader, 1) atomic.AddInt64(&p.countReader, 1)
client.state = clientStatePlay client.state = clientStatePlay
p.readersMap.add(client)
case client := <-p.clientRecord: case client := <-p.clientRecord:
atomic.AddInt64(&p.countPublisher, 1) atomic.AddInt64(&p.countPublisher, 1)
@ -287,44 +284,24 @@ outer:
if client.streamProtocol == gortsplib.StreamProtocolUDP { if client.streamProtocol == gortsplib.StreamProtocolUDP {
for trackId, track := range client.streamTracks { for trackId, track := range client.streamTracks {
key := makeUDPClientAddr(client.ip(), track.rtpPort) addr := makeUDPPublisherAddr(client.ip(), track.rtpPort)
p.udpClientsByAddr[key] = &udpClient{ p.udpPublishersMap.add(addr, &udpPublisher{
client: client, client: client,
trackId: trackId, trackId: trackId,
streamType: gortsplib.StreamTypeRtp, streamType: gortsplib.StreamTypeRtp,
} })
key = makeUDPClientAddr(client.ip(), track.rtcpPort) addr = makeUDPPublisherAddr(client.ip(), track.rtcpPort)
p.udpClientsByAddr[key] = &udpClient{ p.udpPublishersMap.add(addr, &udpPublisher{
client: client, client: client,
trackId: trackId, trackId: trackId,
streamType: gortsplib.StreamTypeRtcp, streamType: gortsplib.StreamTypeRtcp,
} })
} }
} }
client.path.onPublisherSetReady() client.path.onPublisherSetReady()
case req := <-p.clientFrameUDP:
pub, ok := p.udpClientsByAddr[makeUDPClientAddr(req.addr.IP, req.addr.Port)]
if !ok {
continue
}
// client sent RTP on RTCP port or vice-versa
if pub.streamType != req.streamType {
continue
}
atomic.StoreInt64(pub.client.udpLastFrameTimes[pub.trackId], time.Now().Unix())
pub.client.rtcpReceivers[pub.trackId].OnFrame(req.streamType, req.buf)
p.forwardFrame(pub.client.path, pub.trackId, req.streamType, req.buf)
case req := <-p.clientFrameTCP:
p.forwardFrame(req.path, req.trackId, req.streamType, req.buf)
case source := <-p.sourceReady: case source := <-p.sourceReady:
source.path.log("source ready") source.path.log("source ready")
source.path.onPublisherSetReady() source.path.onPublisherSetReady()
@ -333,9 +310,6 @@ outer:
source.path.log("source not ready") source.path.log("source not ready")
source.path.onPublisherSetNotReady() source.path.onPublisherSetNotReady()
case req := <-p.sourceFrame:
p.forwardFrame(req.source.path, req.trackId, req.streamType, req.buf)
case <-p.terminate: case <-p.terminate:
break outer break outer
} }
@ -362,11 +336,8 @@ outer:
case <-p.clientPlay: case <-p.clientPlay:
case <-p.clientRecord: case <-p.clientRecord:
case <-p.clientFrameUDP:
case <-p.clientFrameTCP:
case <-p.sourceReady: case <-p.sourceReady:
case <-p.sourceNotReady: case <-p.sourceNotReady:
case <-p.sourceFrame:
} }
} }
}() }()
@ -410,11 +381,8 @@ outer:
close(p.clientSetupPlay) close(p.clientSetupPlay)
close(p.clientPlay) close(p.clientPlay)
close(p.clientRecord) close(p.clientRecord)
close(p.clientFrameUDP)
close(p.clientFrameTCP)
close(p.sourceReady) close(p.sourceReady)
close(p.sourceNotReady) close(p.sourceNotReady)
close(p.sourceFrame)
close(p.done) close(p.done)
} }
@ -435,44 +403,6 @@ func (p *program) findConfForPathName(name string) *confPath {
return nil return nil
} }
func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.StreamType, frame []byte) {
for c := range p.clients {
if c.path != path ||
c.state != clientStatePlay {
continue
}
track, ok := c.streamTracks[trackId]
if !ok {
continue
}
if c.streamProtocol == gortsplib.StreamProtocolUDP {
if streamType == gortsplib.StreamTypeRtp {
p.serverRtp.write(frame, &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: track.rtpPort,
})
} else {
p.serverRtcp.write(frame, &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: track.rtcpPort,
})
}
} else {
c.tcpFrame <- &gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,
Content: frame,
}
}
}
}
func main() { func main() {
_, err := newProgram(os.Args[1:], os.Stdin) _, err := newProgram(os.Args[1:], os.Stdin)
if err != nil { if err != nil {

23
server-udp.go

@ -2,6 +2,7 @@ package main
import ( import (
"net" "net"
"sync/atomic"
"time" "time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
@ -70,11 +71,25 @@ func (l *serverUDP) run() {
break break
} }
l.p.clientFrameUDP <- clientFrameUDPReq{ pub := l.p.udpPublishersMap.get(makeUDPPublisherAddr(addr.IP, addr.Port))
addr, if pub == nil {
l.streamType, continue
buf[:n], }
// client sent RTP on RTCP port or vice-versa
if pub.streamType != l.streamType {
continue
} }
atomic.StoreInt64(pub.client.udpLastFrameTimes[pub.trackId], time.Now().Unix())
pub.client.rtcpReceivers[pub.trackId].OnFrame(l.streamType, buf[:n])
l.p.readersMap.forwardFrame(pub.client.path,
pub.trackId,
l.streamType,
buf[:n])
} }
close(l.writec) close(l.writec)

15
source.go

@ -14,13 +14,6 @@ const (
sourceTCPReadBufferSize = 128 * 1024 sourceTCPReadBufferSize = 128 * 1024
) )
type sourceFrameReq struct {
source *source
trackId int
streamType gortsplib.StreamType
buf []byte
}
type sourceState int type sourceState int
const ( const (
@ -241,7 +234,8 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
break break
} }
s.p.sourceFrame <- sourceFrameReq{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} s.p.readersMap.forwardFrame(s.path, trackId,
gortsplib.StreamTypeRtp, buf[:n])
} }
}(trackId, rtpRead) }(trackId, rtpRead)
} }
@ -261,7 +255,8 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
break break
} }
s.p.sourceFrame <- sourceFrameReq{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} s.p.readersMap.forwardFrame(s.path, trackId,
gortsplib.StreamTypeRtcp, buf[:n])
} }
}(trackId, rtcpRead) }(trackId, rtcpRead)
} }
@ -331,7 +326,7 @@ func (s *source) runTCP(conn *gortsplib.ConnClient) bool {
return return
} }
s.p.sourceFrame <- sourceFrameReq{s, frame.TrackId, frame.StreamType, frame.Content} s.p.readersMap.forwardFrame(s.path, frame.TrackId, frame.StreamType, frame.Content)
} }
}() }()

130
utils.go

@ -8,6 +8,9 @@ import (
"regexp" "regexp"
"runtime" "runtime"
"strings" "strings"
"sync"
"github.com/aler9/gortsplib"
) )
func parseIpCidrList(in []string) ([]interface{}, error) { func parseIpCidrList(in []string) ([]interface{}, error) {
@ -170,3 +173,130 @@ func isBindError(err error) bool {
} }
return false return false
} }
type udpPublisherAddr struct {
ip [net.IPv6len]byte // use a fixed-size array to enable the equality operator
port int
}
func makeUDPPublisherAddr(ip net.IP, port int) udpPublisherAddr {
ret := udpPublisherAddr{
port: port,
}
if len(ip) == net.IPv4len {
copy(ret.ip[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix
copy(ret.ip[12:], ip)
} else {
copy(ret.ip[:], ip)
}
return ret
}
type udpPublisher struct {
client *client
trackId int
streamType gortsplib.StreamType
}
type udpPublishersMap struct {
mutex sync.RWMutex
ma map[udpPublisherAddr]*udpPublisher
}
func newUdpPublisherMap() *udpPublishersMap {
return &udpPublishersMap{
ma: make(map[udpPublisherAddr]*udpPublisher),
}
}
func (m *udpPublishersMap) get(addr udpPublisherAddr) *udpPublisher {
m.mutex.RLock()
defer m.mutex.RUnlock()
el, ok := m.ma[addr]
if !ok {
return nil
}
return el
}
func (m *udpPublishersMap) add(addr udpPublisherAddr, pub *udpPublisher) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.ma[addr] = pub
}
func (m *udpPublishersMap) remove(addr udpPublisherAddr) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.ma, addr)
}
type readersMap struct {
mutex sync.RWMutex
ma map[*client]struct{}
}
func newReadersMap() *readersMap {
return &readersMap{
ma: make(map[*client]struct{}),
}
}
func (m *readersMap) add(reader *client) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.ma[reader] = struct{}{}
}
func (m *readersMap) remove(reader *client) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.ma, reader)
}
func (m *readersMap) forwardFrame(path *path, trackId int, streamType gortsplib.StreamType, frame []byte) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for c := range m.ma {
if c.path != path {
continue
}
track, ok := c.streamTracks[trackId]
if !ok {
continue
}
if c.streamProtocol == gortsplib.StreamProtocolUDP {
if streamType == gortsplib.StreamTypeRtp {
c.p.serverRtp.write(frame, &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: track.rtpPort,
})
} else {
c.p.serverRtcp.write(frame, &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: track.rtcpPort,
})
}
} else {
c.tcpFrame <- &gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,
Content: frame,
}
}
}
}

Loading…
Cancel
Save